BodyBalanceEvaluation/backend/devices/device_model.py

224 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:UTF-8
import time
import bleak
import asyncio
import logging
# 设备实例 Device instance
class DeviceModel:
# region 属性 attribute
# 设备名称 deviceName
deviceName = "我的设备"
# 设备数据字典 Device Data Dictionary
deviceData = {}
# 设备是否开启
isOpen = False
# 临时数组 Temporary array
TempBytes = []
# endregion
def __init__(self, deviceName, BLEDevice, callback_method):
self.logger = logging.getLogger("device.imu.witmotion")
self.logger.info("初始化IMU设备模型")
# 设备名称(自定义) Device Name
self.deviceName = deviceName
self.BLEDevice = BLEDevice
self.client = None
self.writer_characteristic = None
self.isOpen = False
self.callback_method = callback_method
self.deviceData = {}
# region 获取设备数据 Obtain device data
# 设置设备数据 Set device data
def set(self, key, value):
# 将设备数据存到键值 Saving device data to key values
self.deviceData[key] = value
# 获得设备数据 Obtain device data
def get(self, key):
# 从键值中获取数据没有则返回None Obtaining data from key values
if key in self.deviceData:
return self.deviceData[key]
else:
return None
# 删除设备数据 Delete device data
def remove(self, key):
# 删除设备键值
del self.deviceData[key]
# endregion
# 打开设备 open Device
async def openDevice(self):
start_ts = time.perf_counter()
self.logger.info("正在打开蓝牙IMU设备...")
connect_start = time.perf_counter()
async with bleak.BleakClient(self.BLEDevice, timeout=15) as client:
self.client = client
self.logger.info(f"蓝牙连接建立完成(耗时: {(time.perf_counter() - connect_start)*1000:.1f}ms")
self.isOpen = True
# 设备UUID常量 Device UUID constant
target_service_uuid = "0000ffe5-0000-1000-8000-00805f9a34fb"
target_characteristic_uuid_read = "0000ffe4-0000-1000-8000-00805f9a34fb"
target_characteristic_uuid_write = "0000ffe9-0000-1000-8000-00805f9a34fb"
notify_characteristic = None
self.logger.info("正在匹配服务...")
services = None
for _ in range(3):
get_services = getattr(client, 'get_services', None)
if callable(get_services):
services = await get_services()
else:
backend = getattr(client, "_backend", None)
backend_get_services = getattr(backend, "get_services", None)
if callable(backend_get_services):
services = await backend_get_services()
else:
services = getattr(client, 'services', None)
if services:
break
await asyncio.sleep(0.2)
if not services:
services = []
for service in services:
if service.uuid == target_service_uuid:
self.logger.info(f"匹配到服务: {service}")
self.logger.info("正在匹配特征...")
for characteristic in service.characteristics:
if characteristic.uuid == target_characteristic_uuid_read:
notify_characteristic = characteristic
if characteristic.uuid == target_characteristic_uuid_write:
self.writer_characteristic = characteristic
if notify_characteristic:
break
if notify_characteristic:
self.logger.info(f"匹配到特征: {notify_characteristic}")
# 设置通知以接收数据 Set up notifications to receive data
await client.start_notify(notify_characteristic.uuid, self.onDataReceived)
self.logger.info("开始接收姿态数据XYZ欧拉角")
self.logger.info(f"设备打开完成(耗时: {(time.perf_counter() - start_ts)*1000:.1f}ms")
# 保持连接打开 Keep connected and open
try:
while self.isOpen:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
# 在退出时停止通知 Stop notification on exit
await client.stop_notify(notify_characteristic.uuid)
else:
self.logger.warning("未找到匹配的服务或特征")
raise RuntimeError("未找到匹配的服务或特征")
# 关闭设备 close Device
def closeDevice(self):
self.isOpen = False
self.logger.info("设备已关闭")
# region 数据解析 data analysis
# 串口数据处理 Serial port data processing
def onDataReceived(self, sender, data):
tempdata = bytes.fromhex(data.hex())
for var in tempdata:
self.TempBytes.append(var)
if len(self.TempBytes) == 1 and self.TempBytes[0] != 0x55:
del self.TempBytes[0]
continue
if len(self.TempBytes) == 2 and self.TempBytes[1] != 0x61:
del self.TempBytes[0]
continue
if len(self.TempBytes) == 20:
self.processData(self.TempBytes)
self.TempBytes.clear()
# 数据解析 data analysis
def processData(self, Bytes):
if Bytes[1] != 0x61:
return
AngX = self.getSignInt16(Bytes[15] << 8 | Bytes[14]) / 32768 * 180
AngY = self.getSignInt16(Bytes[17] << 8 | Bytes[16]) / 32768 * 180
AngZ = self.getSignInt16(Bytes[19] << 8 | Bytes[18]) / 32768 * 180
self.set("AngX", round(AngX, 3))
self.set("AngY", round(AngY, 3))
self.set("AngZ", round(AngZ, 3))
self.callback_method(self)
# 获得int16有符号数 Obtain int16 signed number
@staticmethod
def getSignInt16(num):
if num >= pow(2, 15):
num -= pow(2, 16)
return num
# endregion
# 发送串口数据 Sending serial port data
async def sendData(self, data):
try:
if self.client.is_connected and self.writer_characteristic is not None:
await self.client.write_gatt_char(self.writer_characteristic.uuid, bytes(data))
except Exception as ex:
self.logger.warning(f"发送数据失败: {ex}")
# 读取寄存器 read register
async def readReg(self, regAddr):
# 封装读取指令并向串口发送数据 Encapsulate read instructions and send data to the serial port
await self.sendData(self.get_readBytes(regAddr))
# 写入寄存器 Write Register
async def writeReg(self, regAddr, sValue):
# 解锁 unlock
self.unlock()
# 延迟100ms Delay 100ms
time.sleep(0.1)
# 封装写入指令并向串口发送数据
await self.sendData(self.get_writeBytes(regAddr, sValue))
# 延迟100ms Delay 100ms
time.sleep(0.1)
# 保存 save
self.save()
# 读取指令封装 Read instruction encapsulation
@staticmethod
def get_readBytes(regAddr):
# 初始化
tempBytes = [None] * 5
tempBytes[0] = 0xff
tempBytes[1] = 0xaa
tempBytes[2] = 0x27
tempBytes[3] = regAddr
tempBytes[4] = 0
return tempBytes
# 写入指令封装 Write instruction encapsulation
@staticmethod
def get_writeBytes(regAddr, rValue):
# 初始化
tempBytes = [None] * 5
tempBytes[0] = 0xff
tempBytes[1] = 0xaa
tempBytes[2] = regAddr
tempBytes[3] = rValue & 0xff
tempBytes[4] = rValue >> 8
return tempBytes
# 解锁 unlock
def unlock(self):
cmd = self.get_writeBytes(0x69, 0xb588)
self.sendData(cmd)
# 保存 save
def save(self):
cmd = self.get_writeBytes(0x00, 0x0000)
self.sendData(cmd)