#!/usr/bin/env python3 """ Весовой сервис для Orange Pi — с управлением светофором Читает данные с весового терминала A9 через USB-RS232 (WaveShare конвертер) и публикует вес в MQTT топик scale/weight Машина состояний: EMPTY → машины нет, оба реле выключены LOADING → машина на платформе, красный ON, ждём стабилизации WAIT_GO → вес отправлен, красный ON, ждём GO от сервера (макс 5 мин) GREEN → GO получен, зелёный ON, ждём пока машина уедет Установка зависимостей: pip3 install pyserial paho-mqtt --break-system-packages Автозапуск: systemctl enable scales systemctl start scales """ import subprocess import paho.mqtt.client as mqtt import re import time import datetime import threading import select import os # ── Серийный порт ──────────────────────────────────────────────────────────── SERIAL_PORT = '/dev/ttyUSB0' SERIAL_BAUD = 9600 # ── MQTT локальный (сервер с ИИ системой) ──────────────────────────────────── MQTT1_HOST = '192.168.20.9' MQTT1_PORT = 1883 MQTT1_USER = '' MQTT1_PASS = '' # ── MQTT наш VPS ───────────────────────────────────────────────────────────── MQTT2_HOST = '77.222.43.248' MQTT2_PORT = 1884 MQTT2_USER = 'esp32' MQTT2_PASS = 'Esp32Scales#2026' MQTT_WEIGHT_TOPIC = 'scale/weight' MQTT_GO_TOPIC = 'scale/traffic/go' # ── Параметры взвешивания ───────────────────────────────────────────────────── MIN_WEIGHT = 200 # кг — минимум для определения машины на платформе STABLE_DELTA = 50 # кг — допустимый разброс при стабилизации STABLE_TIME = 10 # сек — время окна стабилизации # ── Светофор (GPIO через sysfs) ─────────────────────────────────────────────── GPIO_RED = 0 # sysfs номер GPIO для PIN11 (красный) GPIO_GREEN = 2 # sysfs номер GPIO для PIN13 (зелёный) RELAY_ACTIVE_LOW = True # True = реле включается при LOW (стандартный модуль с оптопарой) # ── Таймаут ожидания GO ─────────────────────────────────────────────────────── GO_TIMEOUT = 5 * 60 # 5 минут BLINK_COUNT = 5 # количество миганий при таймауте BLINK_INTERVAL = 1.0 # сек между миганиями # ── Событие GO от MQTT ──────────────────────────────────────────────────────── go_event = threading.Event() # ════════════════════════════════════════════════════════════════════════════════ # GPIO через sysfs # ════════════════════════════════════════════════════════════════════════════════ gpio_ok = False def _gpio_write_raw(pin, value): with open(f'/sys/class/gpio/gpio{pin}/value', 'w') as f: f.write('1' if value else '0') def gpio_init(): global gpio_ok try: for pin in (GPIO_RED, GPIO_GREEN): gpio_path = f'/sys/class/gpio/gpio{pin}' if not os.path.exists(gpio_path): with open('/sys/class/gpio/export', 'w') as f: f.write(str(pin)) time.sleep(0.15) with open(f'{gpio_path}/direction', 'w') as f: f.write('out') # Сразу гасим оба реле _gpio_write_raw(GPIO_RED, not RELAY_ACTIVE_LOW) _gpio_write_raw(GPIO_GREEN, not RELAY_ACTIVE_LOW) gpio_ok = True print("[GPIO] Инициализация OK (RED=pin11, GREEN=pin13)") except Exception as e: print(f"[GPIO] Недоступен, светофор работать не будет: {e}") def _relay(pin, on): if not gpio_ok: return try: if RELAY_ACTIVE_LOW: _gpio_write_raw(pin, not on) # LOW = включено else: _gpio_write_raw(pin, on) except Exception as e: print(f"[GPIO] Ошибка записи pin{pin}: {e}") def set_red(on): _relay(GPIO_RED, on) print(f"[TRAFFIC] Красный {'ON' if on else 'OFF'}") def set_green(on): _relay(GPIO_GREEN, on) print(f"[TRAFFIC] Зелёный {'ON' if on else 'OFF'}") def lights_off(): _relay(GPIO_RED, False) _relay(GPIO_GREEN, False) print("[TRAFFIC] Оба реле OFF") def blink_red_async(): """Мигание красным при таймауте — в отдельном потоке""" def _blink(): print(f"[TRAFFIC] Мигание красным ({BLINK_COUNT}x)") for _ in range(BLINK_COUNT): _relay(GPIO_RED, True) time.sleep(BLINK_INTERVAL) _relay(GPIO_RED, False) time.sleep(BLINK_INTERVAL) threading.Thread(target=_blink, daemon=True).start() # ════════════════════════════════════════════════════════════════════════════════ # MQTT # ════════════════════════════════════════════════════════════════════════════════ def on_message(client, userdata, msg): print(f"[MQTT] ← {msg.topic}: {msg.payload.decode(errors='ignore')}") if msg.topic == MQTT_GO_TOPIC: print("[MQTT] GO получен — переключаем светофор") go_event.set() mqtt1 = mqtt.Client(client_id="scales_opi_1") mqtt2 = mqtt.Client(client_id="scales_opi_2") def mqtt_connect(): try: mqtt1.on_message = on_message mqtt1.connect(MQTT1_HOST, MQTT1_PORT, keepalive=60) mqtt1.subscribe(MQTT_GO_TOPIC) mqtt1.loop_start() print(f"[MQTT1] Подключён к {MQTT1_HOST}:{MQTT1_PORT}, подписан на {MQTT_GO_TOPIC}") except Exception as e: print(f"[MQTT1] Ошибка подключения: {e}") try: mqtt2.username_pw_set(MQTT2_USER, MQTT2_PASS) mqtt2.connect(MQTT2_HOST, MQTT2_PORT, keepalive=60) mqtt2.loop_start() print(f"[MQTT2] Подключён к {MQTT2_HOST}:{MQTT2_PORT}") except Exception as e: print(f"[MQTT2] Ошибка подключения: {e}") def publish_weight(w): ts = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') payload = f"{w:.1f}" for client, name in ((mqtt1, 'MQTT1'), (mqtt2, 'MQTT2')): try: client.publish(MQTT_WEIGHT_TOPIC, payload) print(f"[{name}] → {MQTT_WEIGHT_TOPIC}: {payload} кг [{ts}]") except Exception as e: print(f"[{name}] Ошибка публикации: {e}") # ════════════════════════════════════════════════════════════════════════════════ # Парсер протокола A9 # ════════════════════════════════════════════════════════════════════════════════ def parse_weight(data): """ Формат A9: STX + знак + 7 цифр + контрольная сумма + ETX Пример: \x02+00176001B\x03 → 1760.0 кг """ m = re.search(rb'\x02([+-]\d{8}[A-Z0-9])\x03', data) if m: s = m.group(1).decode('ascii') digits = s[1:8] return float(digits[:-1] + '.' + digits[-1]) return None # ════════════════════════════════════════════════════════════════════════════════ # Главный цикл # ════════════════════════════════════════════════════════════════════════════════ def main(): gpio_init() mqtt_connect() lights_off() print("[BOOT] Система готова. Жду машину...\n") subprocess.run(['stty', '-F', SERIAL_PORT, 'raw', str(SERIAL_BAUD), 'cs8', '-cstopb', '-parenb'], check=False) proc = subprocess.Popen( ['cat', SERIAL_PORT], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) state = 'EMPTY' last_weights = [] buf = b"" go_sent_time = None last_logged_w = None # последнее залогированное значение веса while True: # ── Неблокирующее чтение с таймаутом 100 мс ────────────────────────── r, _, _ = select.select([proc.stdout], [], [], 0.1) if r: try: chunk = proc.stdout.read(12) if not chunk: time.sleep(0.1) continue except Exception: time.sleep(0.1) continue buf += chunk # Извлекаем все полные фреймы из буфера while b'\x02' in buf and b'\x03' in buf: s = buf.find(b'\x02') e = buf.find(b'\x03', s) if s == -1 or e == -1: break frame = buf[s:e+1] buf = buf[e+1:] w = parse_weight(frame) if w is None: continue now = time.time() if w != last_logged_w: print(f"[SCALE] {w:.1f} кг [{state}]") last_logged_w = w # ── EMPTY: нет машины ───────────────────────────────────────── if state == 'EMPTY': if w >= MIN_WEIGHT: state = 'LOADING' last_weights = [] set_red(True) print(f"[STATE] → LOADING: машина на платформе ({w:.1f} кг)") # ── LOADING: машина заехала, ждём стабилизации ───────────────── elif state == 'LOADING': if w < MIN_WEIGHT / 2: state = 'EMPTY' last_weights = [] lights_off() print("[STATE] → EMPTY: машина уехала до стабилизации") continue last_weights.append((now, w)) last_weights = [(t, v) for t, v in last_weights if now - t <= STABLE_TIME] if len(last_weights) >= 5: vals = [v for _, v in last_weights] if max(vals) - min(vals) <= STABLE_DELTA: avg = sum(vals) / len(vals) print(f"[STATE] → WAIT_GO: вес стабилен {avg:.1f} кг, отправляем") publish_weight(avg) state = 'WAIT_GO' go_event.clear() go_sent_time = time.time() # ── WAIT_GO: ждём команду GO от сервера ─────────────────────── elif state == 'WAIT_GO': if w < MIN_WEIGHT / 2: state = 'EMPTY' lights_off() go_event.clear() go_sent_time = None print("[STATE] → EMPTY: машина уехала без GO") # ── GREEN: GO получен, ждём пока машина уедет ───────────────── elif state == 'GREEN': if w < MIN_WEIGHT / 2: state = 'EMPTY' lights_off() print("[STATE] → EMPTY: машина проехала, сброс") # ── Проверка GO и таймаута (независимо от данных весов) ─────────────── if state == 'WAIT_GO': if go_event.is_set(): go_event.clear() state = 'GREEN' set_red(False) set_green(True) go_sent_time = None print("[STATE] → GREEN: GO получен!") elif go_sent_time and (time.time() - go_sent_time) > GO_TIMEOUT: print("[STATE] → EMPTY: таймаут 5 мин, GO не получен") state = 'EMPTY' go_sent_time = None go_event.clear() blink_red_async() # мигаем красным для оператора # После мигания оба реле погаснут внутри blink, но зелёный # мог остаться — явно гасим после задержки def _cleanup(): time.sleep(BLINK_COUNT * BLINK_INTERVAL * 2 + 0.5) lights_off() threading.Thread(target=_cleanup, daemon=True).start() if __name__ == '__main__': main()