BodyBalanceEvaluation/backend/devices/remote_control_manager.py

267 lines
9.4 KiB
Python
Raw Permalink Normal View History

2026-01-09 09:40:00 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from typing import Optional, Dict, Any
import logging
import re
try:
import serial # pyserial
except Exception:
serial = None
try:
from .base_device import BaseDevice
from .utils.config_manager import ConfigManager
except ImportError:
from base_device import BaseDevice
from utils.config_manager import ConfigManager
def _modbus_crc16(data: bytes) -> int:
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
class RemoteControlManager(BaseDevice):
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
self.config_manager = config_manager or ConfigManager()
port = self.config_manager.get_config_value('REMOTE', 'port', fallback='COM6')
baudrate = int(self.config_manager.get_config_value('REMOTE', 'baudrate', fallback='115200'))
timeout = float(self.config_manager.get_config_value('REMOTE', 'timeout', fallback='0.1'))
instance_config: Dict[str, Any] = {
2026-01-10 22:28:56 +08:00
'enable': True,
2026-01-09 09:40:00 +08:00
'port': port,
'baudrate': baudrate,
'timeout': timeout,
'strict_crc': bool(str(self.config_manager.get_config_value('REMOTE', 'strict_crc', fallback='False')).lower() == 'true'),
}
super().__init__("remote", instance_config)
self._socketio = socketio
self.logger = logging.getLogger(self.__class__.__name__)
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.bytesize = getattr(serial, 'EIGHTBITS', 8)
self.parity = getattr(serial, 'PARITY_NONE', 'N')
self.stopbits = getattr(serial, 'STOPBITS_ONE', 1)
self.strict_crc = instance_config['strict_crc']
self._ser: Optional[serial.Serial] = None
self._thread: Optional[threading.Thread] = None
self._running = False
self._buffer = bytearray()
def initialize(self) -> bool:
try:
self.logger.info(f"初始化遥控器串口: {self.port}, {self.baudrate}bps, 8N1")
self.set_connected(True)
self._device_info['initialized_at'] = time.time()
return True
except Exception as e:
self.logger.error(f"遥控器初始化失败: {e}")
self.set_connected(False)
return False
def start_streaming(self) -> bool:
try:
if self._running:
return True
if serial is None:
raise RuntimeError("pyserial 未安装或不可用")
self._ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
timeout=self.timeout,
)
self._running = True
self._thread = threading.Thread(target=self._worker_loop, daemon=True)
self._thread.start()
self.logger.info("遥控器串口监听已启动")
return True
except Exception as e:
self.logger.error(f"启动遥控器监听失败: {e}")
self._running = False
try:
if self._ser and self._ser.is_open:
self._ser.close()
except Exception:
pass
return False
def stop_streaming(self) -> bool:
try:
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
if self._ser and self._ser.is_open:
self._ser.close()
self.logger.info("遥控器串口监听已停止")
return True
except Exception as e:
self.logger.error(f"停止遥控器监听失败: {e}")
return False
def disconnect(self):
try:
self.stop_streaming()
self.set_connected(False)
except Exception as e:
self.logger.error(f"断开遥控器失败: {e}")
def reload_config(self) -> bool:
try:
self.logger.info("重新加载遥控器配置")
self.port = self.config_manager.get_config_value('REMOTE', 'port', fallback=self.port) or self.port
self.baudrate = int(self.config_manager.get_config_value('REMOTE', 'baudrate', fallback=self.baudrate))
self.timeout = float(self.config_manager.get_config_value('REMOTE', 'timeout', fallback=self.timeout))
self._device_info.update({
'port': self.port,
'baudrate': self.baudrate,
'timeout': self.timeout,
})
return True
except Exception as e:
self.logger.error(f"重新加载遥控器配置失败: {e}")
return False
def calibrate(self) -> Dict[str, Any]:
return {'status': 'success'}
def get_status(self) -> Dict[str, Any]:
return {
'name': self.device_name,
'port': self.port,
'baudrate': self.baudrate,
'timeout': self.timeout,
'is_connected': self.is_connected,
'is_streaming': self._running,
'has_serial': bool(self._ser and getattr(self._ser, 'is_open', False)),
'last_error': self._device_info.get('last_error')
}
def cleanup(self) -> None:
try:
self.stop_streaming()
except Exception:
pass
def check_hardware_connection(self) -> bool:
try:
return bool(self._ser and self._ser.is_open)
except Exception:
return False
def _emit_code(self, key_code: int):
code_hex = f"{key_code:02X}"
name_map = {
0x11: "start",
0x14: "stop",
0x13: "up",
0x15: "down",
0x12: "center",
0x0E: "power",
0x0F: "screenshot",
}
name = name_map.get(key_code, 'unknown')
payload = {
'code': code_hex,
'name': name,
'timestamp': time.time()
}
try:
msg = f"接收到遥控器按键: code={code_hex}, name={name}"
print(msg)
self.logger.info(msg)
except Exception:
pass
try:
if self._socketio:
self._socketio.emit('remote_control', payload, namespace='/devices')
except Exception as e:
self.logger.error(f"推送遥控器事件失败: {e}")
def _try_parse_frames(self):
while True:
if len(self._buffer) < 7:
break
idx = self._buffer.find(b'\x01\x04\x02\x00')
if idx >= 0 and len(self._buffer) - idx >= 7:
frame = bytes(self._buffer[idx:idx + 7])
calc_crc = _modbus_crc16(frame[:5])
recv_crc = frame[5] | (frame[6] << 8)
if calc_crc == recv_crc:
key_code = frame[4]
self._emit_code(key_code)
del self._buffer[:idx + 7]
continue
else:
if not self.strict_crc and len(frame) >= 7:
key_code = frame[4]
self._emit_code(key_code)
del self._buffer[:idx + 7]
continue
del self._buffer[idx:idx + 1]
continue
# ASCII HEX fallback
try:
text = bytes(self._buffer).decode(errors='ignore')
except Exception:
break
m = re.search(r'01\s*04\s*02\s*00\s*([0-9A-Fa-f]{2})\s*([0-9A-Fa-f]{2})\s*([0-9A-Fa-f]{2})', text)
if m:
k = int(m.group(1), 16)
crcL = int(m.group(2), 16)
crcH = int(m.group(3), 16)
calc = _modbus_crc16(bytes.fromhex(f'01 04 02 00 {m.group(1)}'))
recv = crcL | (crcH << 8)
if calc == recv:
self._emit_code(k)
elif not self.strict_crc:
self._emit_code(k)
self._buffer.clear()
break
# no header; trim
del self._buffer[:max(0, len(self._buffer) - 3)]
break
def _worker_loop(self):
self.logger.info("遥控器串口线程启动")
last_rx_ts = time.time()
while self._running:
try:
if not self._ser or not self._ser.is_open:
time.sleep(0.05)
continue
chunk = self._ser.read(64)
if chunk:
try:
hexstr = ' '.join(f'{b:02X}' for b in chunk)
except Exception:
pass
self._buffer.extend(chunk)
self._try_parse_frames()
last_rx_ts = time.time()
else:
time.sleep(0.01)
if time.time() - last_rx_ts > 5.0:
self.logger.debug("遥控器串口暂无数据")
except Exception as e:
self.logger.error(f"遥控器串口读取异常: {e}")
time.sleep(0.1)
self.logger.info("遥控器串口线程结束")