#!/usr/bin/env python3 # -*- coding: utf-8 -*- import threading import time from typing import Optional, Dict, Any import logging import re try: import serial # pyserial except Exception: serial = None try: from .base_device import BaseDevice from .utils.config_manager import ConfigManager except ImportError: from base_device import BaseDevice from utils.config_manager import ConfigManager def _modbus_crc16(data: bytes) -> int: crc = 0xFFFF for b in data: crc ^= b for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc & 0xFFFF class RemoteControlManager(BaseDevice): def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): self.config_manager = config_manager or ConfigManager() port = self.config_manager.get_config_value('REMOTE', 'port', fallback='COM6') baudrate = int(self.config_manager.get_config_value('REMOTE', 'baudrate', fallback='115200')) timeout = float(self.config_manager.get_config_value('REMOTE', 'timeout', fallback='0.1')) instance_config: Dict[str, Any] = { 'enable': True, 'port': port, 'baudrate': baudrate, 'timeout': timeout, 'strict_crc': bool(str(self.config_manager.get_config_value('REMOTE', 'strict_crc', fallback='False')).lower() == 'true'), } super().__init__("remote", instance_config) self._socketio = socketio self.logger = logging.getLogger(self.__class__.__name__) self.port = port self.baudrate = baudrate self.timeout = timeout self.bytesize = getattr(serial, 'EIGHTBITS', 8) self.parity = getattr(serial, 'PARITY_NONE', 'N') self.stopbits = getattr(serial, 'STOPBITS_ONE', 1) self.strict_crc = instance_config['strict_crc'] self._ser: Optional[serial.Serial] = None self._thread: Optional[threading.Thread] = None self._running = False self._buffer = bytearray() def initialize(self) -> bool: try: self.logger.info(f"初始化遥控器串口: {self.port}, {self.baudrate}bps, 8N1") self.set_connected(True) self._device_info['initialized_at'] = time.time() return True except Exception as e: self.logger.error(f"遥控器初始化失败: {e}") self.set_connected(False) return False def start_streaming(self) -> bool: try: if self._running: return True if serial is None: raise RuntimeError("pyserial 未安装或不可用") self._ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, timeout=self.timeout, ) self._running = True self._thread = threading.Thread(target=self._worker_loop, daemon=True) self._thread.start() self.logger.info("遥控器串口监听已启动") return True except Exception as e: self.logger.error(f"启动遥控器监听失败: {e}") self._running = False try: if self._ser and self._ser.is_open: self._ser.close() except Exception: pass return False def stop_streaming(self) -> bool: try: self._running = False if self._thread and self._thread.is_alive(): self._thread.join(timeout=2.0) if self._ser and self._ser.is_open: self._ser.close() self.logger.info("遥控器串口监听已停止") return True except Exception as e: self.logger.error(f"停止遥控器监听失败: {e}") return False def disconnect(self): try: self.stop_streaming() self.set_connected(False) except Exception as e: self.logger.error(f"断开遥控器失败: {e}") def reload_config(self) -> bool: try: self.logger.info("重新加载遥控器配置") self.port = self.config_manager.get_config_value('REMOTE', 'port', fallback=self.port) or self.port self.baudrate = int(self.config_manager.get_config_value('REMOTE', 'baudrate', fallback=self.baudrate)) self.timeout = float(self.config_manager.get_config_value('REMOTE', 'timeout', fallback=self.timeout)) self._device_info.update({ 'port': self.port, 'baudrate': self.baudrate, 'timeout': self.timeout, }) return True except Exception as e: self.logger.error(f"重新加载遥控器配置失败: {e}") return False def calibrate(self) -> Dict[str, Any]: return {'status': 'success'} def get_status(self) -> Dict[str, Any]: return { 'name': self.device_name, 'port': self.port, 'baudrate': self.baudrate, 'timeout': self.timeout, 'is_connected': self.is_connected, 'is_streaming': self._running, 'has_serial': bool(self._ser and getattr(self._ser, 'is_open', False)), 'last_error': self._device_info.get('last_error') } def cleanup(self) -> None: try: self.stop_streaming() except Exception: pass def check_hardware_connection(self) -> bool: try: return bool(self._ser and self._ser.is_open) except Exception: return False def _emit_code(self, key_code: int): code_hex = f"{key_code:02X}" name_map = { 0x11: "start", 0x14: "stop", 0x13: "up", 0x15: "down", 0x12: "center", 0x0E: "power", 0x0F: "screenshot", } name = name_map.get(key_code, 'unknown') payload = { 'code': code_hex, 'name': name, 'timestamp': time.time() } try: msg = f"接收到遥控器按键: code={code_hex}, name={name}" print(msg) self.logger.info(msg) except Exception: pass try: if self._socketio: self._socketio.emit('remote_control', payload, namespace='/devices') except Exception as e: self.logger.error(f"推送遥控器事件失败: {e}") def _try_parse_frames(self): while True: if len(self._buffer) < 7: break idx = self._buffer.find(b'\x01\x04\x02\x00') if idx >= 0 and len(self._buffer) - idx >= 7: frame = bytes(self._buffer[idx:idx + 7]) calc_crc = _modbus_crc16(frame[:5]) recv_crc = frame[5] | (frame[6] << 8) if calc_crc == recv_crc: key_code = frame[4] self._emit_code(key_code) del self._buffer[:idx + 7] continue else: if not self.strict_crc and len(frame) >= 7: key_code = frame[4] self._emit_code(key_code) del self._buffer[:idx + 7] continue del self._buffer[idx:idx + 1] continue # ASCII HEX fallback try: text = bytes(self._buffer).decode(errors='ignore') except Exception: break m = re.search(r'01\s*04\s*02\s*00\s*([0-9A-Fa-f]{2})\s*([0-9A-Fa-f]{2})\s*([0-9A-Fa-f]{2})', text) if m: k = int(m.group(1), 16) crcL = int(m.group(2), 16) crcH = int(m.group(3), 16) calc = _modbus_crc16(bytes.fromhex(f'01 04 02 00 {m.group(1)}')) recv = crcL | (crcH << 8) if calc == recv: self._emit_code(k) elif not self.strict_crc: self._emit_code(k) self._buffer.clear() break # no header; trim del self._buffer[:max(0, len(self._buffer) - 3)] break def _worker_loop(self): self.logger.info("遥控器串口线程启动") last_rx_ts = time.time() while self._running: try: if not self._ser or not self._ser.is_open: time.sleep(0.05) continue chunk = self._ser.read(64) if chunk: try: hexstr = ' '.join(f'{b:02X}' for b in chunk) except Exception: pass self._buffer.extend(chunk) self._try_parse_frames() last_rx_ts = time.time() else: time.sleep(0.01) if time.time() - last_rx_ts > 5.0: self.logger.debug("遥控器串口暂无数据") except Exception as e: self.logger.error(f"遥控器串口读取异常: {e}") time.sleep(0.1) self.logger.info("遥控器串口线程结束")