# coding:UTF-8 import time import bleak import asyncio import logging # 设备实例 Device instance class DeviceModel: # region 属性 attribute # 设备名称 deviceName deviceName = "我的设备" # 设备数据字典 Device Data Dictionary deviceData = {} # 设备是否开启 isOpen = False # 临时数组 Temporary array TempBytes = [] # endregion def __init__(self, deviceName, BLEDevice, callback_method): self.logger = logging.getLogger("device.imu.witmotion") self.logger.info("初始化IMU设备模型") # 设备名称(自定义) Device Name self.deviceName = deviceName self.BLEDevice = BLEDevice self.client = None self.writer_characteristic = None self.isOpen = False self.callback_method = callback_method self.deviceData = {} # region 获取设备数据 Obtain device data # 设置设备数据 Set device data def set(self, key, value): # 将设备数据存到键值 Saving device data to key values self.deviceData[key] = value # 获得设备数据 Obtain device data def get(self, key): # 从键值中获取数据,没有则返回None Obtaining data from key values if key in self.deviceData: return self.deviceData[key] else: return None # 删除设备数据 Delete device data def remove(self, key): # 删除设备键值 del self.deviceData[key] # endregion # 打开设备 open Device async def openDevice(self): start_ts = time.perf_counter() self.logger.info("正在打开蓝牙IMU设备...") connect_start = time.perf_counter() async with bleak.BleakClient(self.BLEDevice, timeout=15) as client: self.client = client self.logger.info(f"蓝牙连接建立完成(耗时: {(time.perf_counter() - connect_start)*1000:.1f}ms)") self.isOpen = True # 设备UUID常量 Device UUID constant target_service_uuid = "0000ffe5-0000-1000-8000-00805f9a34fb" target_characteristic_uuid_read = "0000ffe4-0000-1000-8000-00805f9a34fb" target_characteristic_uuid_write = "0000ffe9-0000-1000-8000-00805f9a34fb" notify_characteristic = None self.logger.info("正在匹配服务...") services = None for _ in range(3): get_services = getattr(client, 'get_services', None) if callable(get_services): services = await get_services() else: backend = getattr(client, "_backend", None) backend_get_services = getattr(backend, "get_services", None) if callable(backend_get_services): services = await backend_get_services() else: services = getattr(client, 'services', None) if services: break await asyncio.sleep(0.2) if not services: services = [] for service in services: if service.uuid == target_service_uuid: self.logger.info(f"匹配到服务: {service}") self.logger.info("正在匹配特征...") for characteristic in service.characteristics: if characteristic.uuid == target_characteristic_uuid_read: notify_characteristic = characteristic if characteristic.uuid == target_characteristic_uuid_write: self.writer_characteristic = characteristic if notify_characteristic: break if notify_characteristic: self.logger.info(f"匹配到特征: {notify_characteristic}") # 设置通知以接收数据 Set up notifications to receive data await client.start_notify(notify_characteristic.uuid, self.onDataReceived) self.logger.info("开始接收姿态数据(XYZ欧拉角)") self.logger.info(f"设备打开完成(耗时: {(time.perf_counter() - start_ts)*1000:.1f}ms)") # 保持连接打开 Keep connected and open try: while self.isOpen: await asyncio.sleep(1) except asyncio.CancelledError: pass finally: # 在退出时停止通知 Stop notification on exit await client.stop_notify(notify_characteristic.uuid) else: self.logger.warning("未找到匹配的服务或特征") raise RuntimeError("未找到匹配的服务或特征") # 关闭设备 close Device def closeDevice(self): self.isOpen = False self.logger.info("设备已关闭") # region 数据解析 data analysis # 串口数据处理 Serial port data processing def onDataReceived(self, sender, data): tempdata = bytes.fromhex(data.hex()) for var in tempdata: self.TempBytes.append(var) if len(self.TempBytes) == 1 and self.TempBytes[0] != 0x55: del self.TempBytes[0] continue if len(self.TempBytes) == 2 and self.TempBytes[1] != 0x61: del self.TempBytes[0] continue if len(self.TempBytes) == 20: self.processData(self.TempBytes) self.TempBytes.clear() # 数据解析 data analysis def processData(self, Bytes): if Bytes[1] != 0x61: return AngX = self.getSignInt16(Bytes[15] << 8 | Bytes[14]) / 32768 * 180 AngY = self.getSignInt16(Bytes[17] << 8 | Bytes[16]) / 32768 * 180 AngZ = self.getSignInt16(Bytes[19] << 8 | Bytes[18]) / 32768 * 180 self.set("AngX", round(AngX, 3)) self.set("AngY", round(AngY, 3)) self.set("AngZ", round(AngZ, 3)) self.callback_method(self) # 获得int16有符号数 Obtain int16 signed number @staticmethod def getSignInt16(num): if num >= pow(2, 15): num -= pow(2, 16) return num # endregion # 发送串口数据 Sending serial port data async def sendData(self, data): try: if self.client.is_connected and self.writer_characteristic is not None: await self.client.write_gatt_char(self.writer_characteristic.uuid, bytes(data)) except Exception as ex: self.logger.warning(f"发送数据失败: {ex}") # 读取寄存器 read register async def readReg(self, regAddr): # 封装读取指令并向串口发送数据 Encapsulate read instructions and send data to the serial port await self.sendData(self.get_readBytes(regAddr)) # 写入寄存器 Write Register async def writeReg(self, regAddr, sValue): # 解锁 unlock self.unlock() # 延迟100ms Delay 100ms time.sleep(0.1) # 封装写入指令并向串口发送数据 await self.sendData(self.get_writeBytes(regAddr, sValue)) # 延迟100ms Delay 100ms time.sleep(0.1) # 保存 save self.save() # 读取指令封装 Read instruction encapsulation @staticmethod def get_readBytes(regAddr): # 初始化 tempBytes = [None] * 5 tempBytes[0] = 0xff tempBytes[1] = 0xaa tempBytes[2] = 0x27 tempBytes[3] = regAddr tempBytes[4] = 0 return tempBytes # 写入指令封装 Write instruction encapsulation @staticmethod def get_writeBytes(regAddr, rValue): # 初始化 tempBytes = [None] * 5 tempBytes[0] = 0xff tempBytes[1] = 0xaa tempBytes[2] = regAddr tempBytes[3] = rValue & 0xff tempBytes[4] = rValue >> 8 return tempBytes # 解锁 unlock def unlock(self): cmd = self.get_writeBytes(0x69, 0xb588) self.sendData(cmd) # 保存 save def save(self): cmd = self.get_writeBytes(0x00, 0x0000) self.sendData(cmd)