Add traffic light GPIO control (PIN11=red, PIN13=green, 5min timeout, blink on fail)

This commit is contained in:
2026-06-07 08:52:00 +00:00
parent b214c76ef1
commit 207f8e0b0c
+233 -81
View File
@@ -1,32 +1,43 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Весовой сервис для Orange Pi Весовой сервис для Orange Pi — с управлением светофором
Читает данные с весового терминала A9 через USB-RS232 (WaveShare конвертер) Читает данные с весового терминала A9 через USB-RS232 (WaveShare конвертер)
и публикует вес в MQTT топик scale/weight и публикует вес в MQTT топик scale/weight
Установка: Машина состояний:
EMPTY → машины нет, оба реле выключены
LOADING → машина на платформе, красный ON, ждём стабилизации
WAIT_GO → вес отправлен, красный ON, ждём GO от сервера (макс 5 мин)
GREEN → GO получен, зелёный ON, ждём пока машина уедет
Установка зависимостей:
pip3 install pyserial paho-mqtt --break-system-packages pip3 install pyserial paho-mqtt --break-system-packages
Автозапуск: Автозапуск:
systemctl enable scales systemctl enable scales
systemctl start scales systemctl start scales
""" """
import subprocess import subprocess
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import re import re
import time import time
import datetime import datetime
import threading
import select
import os
# ── Серийный порт ────────────────────────────────────────────────────────────
SERIAL_PORT = '/dev/ttyUSB0' SERIAL_PORT = '/dev/ttyUSB0'
SERIAL_BAUD = 9600 SERIAL_BAUD = 9600
# MQTT локальный (сервер с ИИ системой) # ── MQTT локальный (сервер с ИИ системой) ────────────────────────────────────
MQTT1_HOST = '192.168.20.9' MQTT1_HOST = '192.168.20.9'
MQTT1_PORT = 1883 MQTT1_PORT = 1883
MQTT1_USER = '' MQTT1_USER = ''
MQTT1_PASS = '' MQTT1_PASS = ''
# MQTT наш VPS # ── MQTT наш VPS ─────────────────────────────────────────────────────────────
MQTT2_HOST = '77.222.43.248' MQTT2_HOST = '77.222.43.248'
MQTT2_PORT = 1884 MQTT2_PORT = 1884
MQTT2_USER = 'esp32' MQTT2_USER = 'esp32'
@@ -35,16 +46,99 @@ MQTT2_PASS = 'Esp32Scales#2026'
MQTT_WEIGHT_TOPIC = 'scale/weight' MQTT_WEIGHT_TOPIC = 'scale/weight'
MQTT_GO_TOPIC = 'scale/traffic/go' MQTT_GO_TOPIC = 'scale/traffic/go'
# ── Параметры взвешивания ─────────────────────────────────────────────────────
MIN_WEIGHT = 200 # кг — минимум для определения машины на платформе MIN_WEIGHT = 200 # кг — минимум для определения машины на платформе
STABLE_DELTA = 50 # кг — допустимый разброс для стабилизации STABLE_DELTA = 50 # кг — допустимый разброс при стабилизации
STABLE_TIME = 10 # сек — время стабилизации STABLE_TIME = 10 # сек — время окна стабилизации
# ── Светофор (GPIO через sysfs) ───────────────────────────────────────────────
GPIO_RED = 0 # sysfs номер GPIO для PIN11 (красный)
GPIO_GREEN = 2 # sysfs номер GPIO для PIN13 (зелёный)
RELAY_ACTIVE_LOW = True # True = реле включается при LOW (стандартный модуль с оптопарой)
# ── Таймаут ожидания GO ───────────────────────────────────────────────────────
GO_TIMEOUT = 5 * 60 # 5 минут
BLINK_COUNT = 5 # количество миганий при таймауте
BLINK_INTERVAL = 1.0 # сек между миганиями
# ── Событие GO от MQTT ────────────────────────────────────────────────────────
go_event = threading.Event()
# ════════════════════════════════════════════════════════════════════════════════
# GPIO через sysfs
# ════════════════════════════════════════════════════════════════════════════════
gpio_ok = False gpio_ok = False
def _gpio_write_raw(pin, value):
with open(f'/sys/class/gpio/gpio{pin}/value', 'w') as f:
f.write('1' if value else '0')
def gpio_init():
global gpio_ok
try:
for pin in (GPIO_RED, GPIO_GREEN):
gpio_path = f'/sys/class/gpio/gpio{pin}'
if not os.path.exists(gpio_path):
with open('/sys/class/gpio/export', 'w') as f:
f.write(str(pin))
time.sleep(0.15)
with open(f'{gpio_path}/direction', 'w') as f:
f.write('out')
# Сразу гасим оба реле
_gpio_write_raw(GPIO_RED, not RELAY_ACTIVE_LOW)
_gpio_write_raw(GPIO_GREEN, not RELAY_ACTIVE_LOW)
gpio_ok = True
print("[GPIO] Инициализация OK (RED=pin11, GREEN=pin13)")
except Exception as e:
print(f"[GPIO] Недоступен, светофор работать не будет: {e}")
def _relay(pin, on):
if not gpio_ok:
return
try:
if RELAY_ACTIVE_LOW:
_gpio_write_raw(pin, not on) # LOW = включено
else:
_gpio_write_raw(pin, on)
except Exception as e:
print(f"[GPIO] Ошибка записи pin{pin}: {e}")
def set_red(on):
_relay(GPIO_RED, on)
print(f"[TRAFFIC] Красный {'ON' if on else 'OFF'}")
def set_green(on):
_relay(GPIO_GREEN, on)
print(f"[TRAFFIC] Зелёный {'ON' if on else 'OFF'}")
def lights_off():
_relay(GPIO_RED, False)
_relay(GPIO_GREEN, False)
print("[TRAFFIC] Оба реле OFF")
def blink_red_async():
"""Мигание красным при таймауте — в отдельном потоке"""
def _blink():
print(f"[TRAFFIC] Мигание красным ({BLINK_COUNT}x)")
for _ in range(BLINK_COUNT):
_relay(GPIO_RED, True)
time.sleep(BLINK_INTERVAL)
_relay(GPIO_RED, False)
time.sleep(BLINK_INTERVAL)
threading.Thread(target=_blink, daemon=True).start()
# ════════════════════════════════════════════════════════════════════════════════
# MQTT
# ════════════════════════════════════════════════════════════════════════════════
def on_message(client, userdata, msg): def on_message(client, userdata, msg):
print(f"[MQTT] {msg.topic}: {msg.payload}") print(f"[MQTT] {msg.topic}: {msg.payload.decode(errors='ignore')}")
if msg.topic == MQTT_GO_TOPIC: if msg.topic == MQTT_GO_TOPIC:
print("[LIGHT] Получен GO — зелёный!") print("[MQTT] GO получен — переключаем светофор")
go_event.set()
mqtt1 = mqtt.Client(client_id="scales_opi_1") mqtt1 = mqtt.Client(client_id="scales_opi_1")
mqtt2 = mqtt.Client(client_id="scales_opi_2") mqtt2 = mqtt.Client(client_id="scales_opi_2")
@@ -55,114 +149,172 @@ def mqtt_connect():
mqtt1.connect(MQTT1_HOST, MQTT1_PORT, keepalive=60) mqtt1.connect(MQTT1_HOST, MQTT1_PORT, keepalive=60)
mqtt1.subscribe(MQTT_GO_TOPIC) mqtt1.subscribe(MQTT_GO_TOPIC)
mqtt1.loop_start() mqtt1.loop_start()
print("[MQTT1] Подключён к 192.168.20.9") print(f"[MQTT1] Подключён к {MQTT1_HOST}:{MQTT1_PORT}, подписан на {MQTT_GO_TOPIC}")
except Exception as e: except Exception as e:
print(f"[MQTT1] Ошибка: {e}") print(f"[MQTT1] Ошибка подключения: {e}")
try: try:
mqtt2.username_pw_set(MQTT2_USER, MQTT2_PASS) mqtt2.username_pw_set(MQTT2_USER, MQTT2_PASS)
mqtt2.connect(MQTT2_HOST, MQTT2_PORT, keepalive=60) mqtt2.connect(MQTT2_HOST, MQTT2_PORT, keepalive=60)
mqtt2.loop_start() mqtt2.loop_start()
print("[MQTT2] Подключён к VPS") print(f"[MQTT2] Подключён к {MQTT2_HOST}:{MQTT2_PORT}")
except Exception as e: except Exception as e:
print(f"[MQTT2] Ошибка: {e}") print(f"[MQTT2] Ошибка подключения: {e}")
def publish_weight(w): def publish_weight(w):
ts = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') ts = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
payload = f"{w:.1f}" payload = f"{w:.1f}"
try: for client, name in ((mqtt1, 'MQTT1'), (mqtt2, 'MQTT2')):
mqtt1.publish(MQTT_WEIGHT_TOPIC, payload) try:
print(f"[MQTT1] Отправлен вес: {payload} кг в {ts}") client.publish(MQTT_WEIGHT_TOPIC, payload)
except Exception as e: print(f"[{name}] → {MQTT_WEIGHT_TOPIC}: {payload} кг [{ts}]")
print(f"[MQTT1] Ошибка: {e}") except Exception as e:
try: print(f"[{name}] Ошибка публикации: {e}")
mqtt2.publish(MQTT_WEIGHT_TOPIC, payload)
print(f"[MQTT2] Отправлен вес: {payload} кг в {ts}")
except Exception as e: # ════════════════════════════════════════════════════════════════════════════════
print(f"[MQTT2] Ошибка: {e}") # Парсер протокола A9
# ════════════════════════════════════════════════════════════════════════════════
def parse_weight(data): def parse_weight(data):
""" """
Формат A9: STX + +XXXXXXX_C + ETX Формат A9: STX + знак + 7 цифр + контрольная сумма + ETX
Пример: \x02+00176001B\x03 = 1760.0 кг Пример: \x02+00176001B\x03 1760.0 кг
""" """
m = re.search(rb'\x02([+-]\d{8}[A-Z0-9])\x03', data) m = re.search(rb'\x02([+-]\d{8}[A-Z0-9])\x03', data)
if m: if m:
s = m.group(1).decode('ascii') s = m.group(1).decode('ascii')
digits = s[1:8] # 7 цифр без знака и контрольной суммы digits = s[1:8]
w = float(digits[:-1] + '.' + digits[-1]) return float(digits[:-1] + '.' + digits[-1])
return w
return None return None
def main():
mqtt_connect()
print("[BOOT] Готов, жду машину...")
subprocess.run(['stty', '-F', '/dev/ttyUSB0', 'raw', '9600', # ════════════════════════════════════════════════════════════════════════════════
'cs8', '-cstopb', '-parenb']) # Главный цикл
# ════════════════════════════════════════════════════════════════════════════════
def main():
gpio_init()
mqtt_connect()
lights_off()
print("[BOOT] Система готова. Жду машину...\n")
subprocess.run(['stty', '-F', SERIAL_PORT, 'raw', str(SERIAL_BAUD),
'cs8', '-cstopb', '-parenb'], check=False)
proc = subprocess.Popen( proc = subprocess.Popen(
['cat', '/dev/ttyUSB0'], ['cat', SERIAL_PORT],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL stderr=subprocess.DEVNULL
) )
state = 'EMPTY' state = 'EMPTY'
last_weights = [] last_weights = []
buf = b"" buf = b""
go_sent_time = None
while True: while True:
try: # ── Неблокирующее чтение с таймаутом 100 мс ──────────────────────────
chunk = proc.stdout.read(12) r, _, _ = select.select([proc.stdout], [], [], 0.1)
if not chunk:
if r:
try:
chunk = proc.stdout.read(12)
if not chunk:
time.sleep(0.1)
continue
except Exception:
time.sleep(0.1) time.sleep(0.1)
continue continue
except:
time.sleep(0.1)
continue
buf += chunk buf += chunk
while b'\x02' in buf and b'\x03' in buf:
s = buf.find(b'\x02')
e = buf.find(b'\x03', s)
if s == -1 or e == -1:
break
frame = buf[s:e+1]
buf = buf[e+1:]
w = parse_weight(frame)
if w is None:
continue
now = time.time() # Извлекаем все полные фреймы из буфера
print(f"[RAW] {w} кг") while b'\x02' in buf and b'\x03' in buf:
s = buf.find(b'\x02')
e = buf.find(b'\x03', s)
if s == -1 or e == -1:
break
frame = buf[s:e+1]
buf = buf[e+1:]
if state == 'EMPTY': w = parse_weight(frame)
if w >= MIN_WEIGHT: if w is None:
state = 'LOADING'
last_weights = []
print(f"[STATE] Машина заезжает: {w} кг")
elif state == 'LOADING':
if w < MIN_WEIGHT / 2:
state = 'EMPTY'
last_weights = []
print("[STATE] Машина уехала")
continue continue
last_weights.append((now, w))
last_weights = [(t, v) for t, v in last_weights
if now - t <= STABLE_TIME]
if len(last_weights) >= 5:
vals = [v for _, v in last_weights]
if max(vals) - min(vals) <= STABLE_DELTA:
avg = sum(vals) / len(vals)
print(f"[STATE] Вес стабилен: {avg:.1f} кг")
publish_weight(avg)
state = 'WAIT_GO'
print("[STATE] Жду GO от сервера")
elif state == 'WAIT_GO': now = time.time()
if w < MIN_WEIGHT / 2: print(f"[SCALE] {w:.1f} кг [{state}]")
state = 'EMPTY'
print("[STATE] Машина уехала без GO") # ── EMPTY: нет машины ─────────────────────────────────────────
if state == 'EMPTY':
if w >= MIN_WEIGHT:
state = 'LOADING'
last_weights = []
set_red(True)
print(f"[STATE] → LOADING: машина на платформе ({w:.1f} кг)")
# ── LOADING: машина заехала, ждём стабилизации ─────────────────
elif state == 'LOADING':
if w < MIN_WEIGHT / 2:
state = 'EMPTY'
last_weights = []
lights_off()
print("[STATE] → EMPTY: машина уехала до стабилизации")
continue
last_weights.append((now, w))
last_weights = [(t, v) for t, v in last_weights
if now - t <= STABLE_TIME]
if len(last_weights) >= 5:
vals = [v for _, v in last_weights]
if max(vals) - min(vals) <= STABLE_DELTA:
avg = sum(vals) / len(vals)
print(f"[STATE] → WAIT_GO: вес стабилен {avg:.1f} кг, отправляем")
publish_weight(avg)
state = 'WAIT_GO'
go_event.clear()
go_sent_time = time.time()
# ── WAIT_GO: ждём команду GO от сервера ───────────────────────
elif state == 'WAIT_GO':
if w < MIN_WEIGHT / 2:
state = 'EMPTY'
lights_off()
go_event.clear()
go_sent_time = None
print("[STATE] → EMPTY: машина уехала без GO")
# ── GREEN: GO получен, ждём пока машина уедет ─────────────────
elif state == 'GREEN':
if w < MIN_WEIGHT / 2:
state = 'EMPTY'
lights_off()
print("[STATE] → EMPTY: машина проехала, сброс")
# ── Проверка GO и таймаута (независимо от данных весов) ───────────────
if state == 'WAIT_GO':
if go_event.is_set():
go_event.clear()
state = 'GREEN'
set_red(False)
set_green(True)
go_sent_time = None
print("[STATE] → GREEN: GO получен!")
elif go_sent_time and (time.time() - go_sent_time) > GO_TIMEOUT:
print("[STATE] → EMPTY: таймаут 5 мин, GO не получен")
state = 'EMPTY'
go_sent_time = None
go_event.clear()
blink_red_async() # мигаем красным для оператора
# После мигания оба реле погаснут внутри blink, но зелёный
# мог остаться — явно гасим после задержки
def _cleanup():
time.sleep(BLINK_COUNT * BLINK_INTERVAL * 2 + 0.5)
lights_off()
threading.Thread(target=_cleanup, daemon=True).start()
if __name__ == '__main__': if __name__ == '__main__':
main() main()