#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 数据库管理模块 负责SQLite数据库的创建、连接和数据操作 """ import sqlite3 import json import uuid from datetime import datetime, timezone, timedelta from typing import List, Dict, Optional, Any import logging logger = logging.getLogger(__name__) class DatabaseManager: """数据库管理器""" def __init__(self, db_path: str): self.db_path = db_path self.connection = None # 设置中国上海时区 (UTC+8) self.china_tz = timezone(timedelta(hours=8)) def get_china_time(self) -> str: """获取中国时区的当前时间字符串""" return datetime.now(self.china_tz).strftime('%Y-%m-%d %H:%M:%S') def generate_patient_id(self) -> str: """生成患者唯一标识(YYYYMMDD0000)年月日+四位序号""" conn = self.get_connection() cursor = conn.cursor() try: # 获取当前日期 china_tz = timezone(timedelta(hours=8)) today = datetime.now(china_tz).strftime('%Y%m%d') # 使用循环确保生成唯一ID,防止并发冲突 for attempt in range(10): # 最多尝试10次 # 查询今天已有的最大序号 cursor.execute(''' SELECT id FROM patients WHERE id LIKE ? ORDER BY id DESC LIMIT 1 ''', (f'{today}%',)) result = cursor.fetchone() if result: # 提取序号部分并加1 last_id = result[0] if len(last_id) >= 12 and last_id[:8] == today: last_sequence = int(last_id[8:12]) sequence = str(last_sequence + 1).zfill(4) else: sequence = '0001' else: # 今天第一个患者 sequence = '0001' patient_id = f'{today}{sequence}' # 检查ID是否已存在 cursor.execute('SELECT COUNT(*) FROM patients WHERE id = ?', (patient_id,)) if cursor.fetchone()[0] == 0: return patient_id # 如果10次尝试都失败,使用UUID作为备用方案 logger.warning('患者ID生成重试次数过多,使用UUID备用方案') return str(uuid.uuid4()) except Exception as e: logger.error(f'生成患者ID失败: {e}') # 如果生成失败,使用UUID作为备用方案 return str(uuid.uuid4()) def generate_user_id(self) -> str: """生成用户唯一标识,格式为六位数字序号(000001)""" conn = self.get_connection() cursor = conn.cursor() try: # 使用循环确保生成唯一ID,防止并发冲突 for attempt in range(10): # 最多尝试10次 # 获取当前最大的用户ID(只考虑六位数字格式的ID) cursor.execute('SELECT MAX(CAST(id AS INTEGER)) FROM users WHERE id GLOB "[0-9][0-9][0-9][0-9][0-9][0-9]"') result = cursor.fetchone()[0] if result is None: # 如果没有用户记录,从000001开始 next_id = 1 else: next_id = result + 1 # 格式化为六位数字,前面补零 user_id = f"{next_id:06d}" # 检查ID是否已存在 cursor.execute('SELECT COUNT(*) FROM users WHERE id = ?', (user_id,)) if cursor.fetchone()[0] == 0: return user_id # 如果10次尝试都失败,使用时间戳+随机数作为备用方案 logger.warning('用户ID生成重试次数过多,使用备用方案') import time import random timestamp_suffix = str(int(time.time()))[-4:] random_suffix = str(random.randint(10, 99)) return f"{timestamp_suffix}{random_suffix}" except Exception as e: logger.error(f'生成用户ID失败: {e}') # 如果出错,使用时间戳作为备用方案 import time return str(int(time.time()))[-6:] def generate_session_id(self) -> str: """生成会话唯一标识(YYYYMMDDHHMMSS)年月日时分秒""" conn = self.get_connection() cursor = conn.cursor() try: # 获取当前时间 china_tz = timezone(timedelta(hours=8)) # 使用循环确保生成唯一ID,防止并发冲突 for attempt in range(10): # 最多尝试10次 current_time = datetime.now(china_tz) session_id = current_time.strftime('%Y%m%d%H%M%S') # 检查ID是否已存在 cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE id = ?', (session_id,)) if cursor.fetchone()[0] == 0: return session_id # 如果存在冲突,等待1秒后重试 import time time.sleep(0.001) # 等待1毫秒 # 如果10次尝试都失败,在时间后添加随机后缀 import random current_time = datetime.now(china_tz) base_id = current_time.strftime('%Y%m%d%H%M%S') suffix = str(random.randint(10, 99)) session_id = f'{base_id}{suffix}' # 最后检查一次 cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE id = ?', (session_id,)) if cursor.fetchone()[0] == 0: return session_id # 如果还是冲突,使用UUID作为备用方案 logger.warning('会话ID生成重试次数过多,使用UUID备用方案') return str(uuid.uuid4()) except Exception as e: logger.error(f'生成会话ID失败: {e}') # 如果生成失败,使用UUID作为备用方案 return str(uuid.uuid4()) def generate_detection_data_id(self) -> str: """生成检测数据记录唯一标识(YYYYMMDDHHMMSS)年月日时分秒""" conn = self.get_connection() cursor = conn.cursor() try: # 获取当前时间 china_tz = timezone(timedelta(hours=8)) # 使用循环确保生成唯一ID,防止并发冲突 for attempt in range(10): # 最多尝试10次 current_time = datetime.now(china_tz) data_id = current_time.strftime('%Y%m%d%H%M%S') # 检查ID是否已存在 cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,)) if cursor.fetchone()[0] == 0: return data_id # 如果存在冲突,等待1毫秒后重试 import time time.sleep(0.001) # 如果10次尝试都失败,在时间后添加随机后缀 import random current_time = datetime.now(china_tz) base_id = current_time.strftime('%Y%m%d%H%M%S') suffix = str(random.randint(100, 999)) data_id = f'{base_id}{suffix}' # 最后检查一次 cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,)) if cursor.fetchone()[0] == 0: return data_id # 如果还是冲突,使用UUID作为备用方案 logger.warning('检测数据ID生成重试次数过多,使用UUID备用方案') return str(uuid.uuid4()) except Exception as e: logger.error(f'生成检测数据ID失败: {e}') # 如果生成失败,使用UUID作为备用方案 return str(uuid.uuid4()) def get_connection(self) -> sqlite3.Connection: """获取数据库连接""" if not self.connection: self.connection = sqlite3.connect( self.db_path, check_same_thread=False, timeout=30.0 ) self.connection.row_factory = sqlite3.Row return self.connection def init_database(self): """初始化数据库表结构""" conn = self.get_connection() cursor = conn.cursor() try: # 创建用户表(医生) cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, -- 用户唯一标识(0000000)六位序号 name TEXT NOT NULL, -- 用户真实姓名 username TEXT UNIQUE NOT NULL, -- 用户名(登录名) password TEXT NOT NULL, -- 密码 register_date TIMESTAMP, -- 注册日期 is_active BOOLEAN DEFAULT 1, -- 账户是否激活(0=未激活,1=已激活) user_type TEXT DEFAULT 'user', -- 用户类型(user/admin/doctor) phone TEXT DEFAULT '', -- 联系电话 created_at TIMESTAMP, -- 记录创建时间 updated_at TIMESTAMP -- 记录更新时间 ) ''') # 创建患者表 cursor.execute(''' CREATE TABLE IF NOT EXISTS patients ( id TEXT PRIMARY KEY, -- 患者唯一标识(YYYYMMDD0000)年月日+四位序号 name TEXT NOT NULL, -- 患者姓名 gender TEXT, -- 性别 birth_date TIMESTAMP, -- 出生日期 nationality TEXT, -- 民族 residence TEXT, -- 居住地 height REAL, -- 身高(cm) weight REAL, -- 体重(kg) shoe_size TEXT, -- 鞋码 phone TEXT, -- 电话号码 email TEXT, -- 电子邮箱 occupation TEXT, -- 职业 workplace TEXT, -- 工作单位 medical_history TEXT, -- 病史 notes TEXT, -- 备注信息 created_at TIMESTAMP, -- 记录创建时间 updated_at TIMESTAMP -- 记录更新时间 ) ''') # 创建检测会话表 cursor.execute(''' CREATE TABLE IF NOT EXISTS detection_sessions ( id TEXT PRIMARY KEY, -- 会话唯一标识(YYYYMMDDHHMMSS)年月日时分秒 patient_id TEXT NOT NULL, -- 患者ID(外键) creator_id TEXT, -- 创建人ID(医生ID,外键) start_time TIMESTAMP, -- 检测开始时间 end_time TIMESTAMP, -- 检测结束时间 duration INTEGER, -- 检测持续时间(秒) settings TEXT, -- 检测设置(JSON格式) normal_video_path TEXT, -- 足部检测视频文件路径 femtobolt_video_path TEXT, -- 深度相机视频文件路径 screen_video_path TEXT, -- 屏幕录制视频路径 diagnosis_info TEXT, -- 诊断信息 treatment_info TEXT, -- 处理信息 suggestion_info TEXT, -- 建议信息 status TEXT DEFAULT 'created', -- 会话状态(created/running/completed/failed) created_at TIMESTAMP, -- 记录创建时间 FOREIGN KEY (patient_id) REFERENCES patients (id), -- 患者表外键约束 FOREIGN KEY (creator_id) REFERENCES users (id) -- 用户表外键约束 ) ''') # 创建检测数据表 cursor.execute(''' CREATE TABLE IF NOT EXISTS detection_data ( id TEXT PRIMARY KEY, -- 记录唯一标识(YYYYMMDDHHMMSS)年月日时分秒 session_id TEXT NOT NULL, -- 检测会话ID(外键) head_pose TEXT , -- 头部姿态数据(JSON格式) body_pose TEXT , -- 身体姿态数据(JSON格式) body_image TEXT, -- 身体视频截图存储路径 foot_data TEXT , -- 足部姿态数据(JSON格式) foot_image TEXT, -- 足部监测视频截图存储路径 foot_data_image TEXT, -- 足底压力数据图存储路径 screen_image TEXT, -- 屏幕录制视频截图存储路径 timestamp TIMESTAMP, -- 数据记录时间戳 FOREIGN KEY (session_id) REFERENCES detection_sessions (id) -- 检测会话表外键约束 ) ''') # 创建系统设置表 cursor.execute(''' CREATE TABLE IF NOT EXISTS system_settings ( key TEXT PRIMARY KEY, -- 设置项键名(唯一标识) value TEXT NOT NULL, -- 设置项值 description TEXT, -- 设置项描述说明 updated_at TIMESTAMP -- 最后更新时间 ) ''') # 创建索引以提高查询性能 # 患者表索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_name ON patients (name)') # 患者姓名索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_phone ON patients (phone)') # 患者电话索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_patients_email ON patients (email)') # 患者邮箱索引 # 检测会话表索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_patient ON detection_sessions (patient_id)') # 患者ID索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_creator ON detection_sessions (creator_id)') # 创建人ID索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_time ON detection_sessions (start_time)') # 开始时间索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_status ON detection_sessions (status)') # 会话状态索引 # 检测数据表索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_session ON detection_data (session_id)') # 会话ID索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_timestamp ON detection_data (timestamp)') # 时间戳索引 # 用户表索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_username ON users (username)') # 用户名索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_type ON users (user_type)') # 用户类型索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_phone ON users (phone)') # 用户电话索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_active ON users (is_active)') # 激活状态索引 # 插入默认管理员账户(如果不存在) cursor.execute('SELECT COUNT(*) FROM users WHERE username = ?', ('admin',)) admin_exists = cursor.fetchone()[0] if admin_exists == 0: admin_id = self.generate_user_id() # 默认密码为 admin123,明文存储 admin_password = 'admin123' # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' INSERT INTO users (id, name, username, password, is_active, user_type, register_date, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (admin_id, '系统管理员', 'admin', admin_password, 1, 'admin', china_time, china_time, china_time)) logger.info('创建默认管理员账户: admin/admin123') # 插入默认系统设置 china_time = self.get_china_time() # 检查并插入默认摄像头设备索引配置 cursor.execute('SELECT COUNT(*) FROM system_settings WHERE key = ?', ('monitor_device_index',)) monitor_config_exists = cursor.fetchone()[0] if monitor_config_exists == 0: cursor.execute(''' INSERT INTO system_settings (key, value, description, updated_at) VALUES (?, ?, ?, ?) ''', ('monitor_device_index', '1', '足部监视摄像头设备索引号', china_time)) logger.info('创建默认摄像头设备索引配置: 1') conn.commit() logger.info('数据库初始化完成') except Exception as e: conn.rollback() logger.error(f'数据库初始化失败: {e}') raise # ==================== 患者管理 ==================== def create_patient(self, patient_data: Dict[str, Any]) -> str: """创建患者记录""" # 验证必填字段 if not patient_data.get('name'): raise ValueError('患者姓名不能为空') conn = self.get_connection() cursor = conn.cursor() try: patient_id = self.generate_patient_id() # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' INSERT INTO patients ( id, name, gender, birth_date, nationality, residence, height, weight, shoe_size, phone, email, occupation, workplace, medical_history, notes, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( patient_id, patient_data.get('name'), patient_data.get('gender'), patient_data.get('birth_date'), patient_data.get('nationality'), patient_data.get('residence'), patient_data.get('height'), patient_data.get('weight'), patient_data.get('shoe_size'), patient_data.get('phone'), patient_data.get('email'), patient_data.get('occupation'), patient_data.get('workplace'), patient_data.get('medical_history'), patient_data.get('notes'), china_time, china_time )) conn.commit() logger.info(f'创建患者记录: {patient_id}') return patient_id except Exception as e: conn.rollback() logger.error(f'创建患者记录失败: {e}') raise def get_patients(self, page: int = 1, size: int = 10, keyword: str = '') -> List[Dict]: """获取患者列表""" # 验证分页参数 if page < 1: page = 1 if size < 1 or size > 100: size = 10 conn = self.get_connection() cursor = conn.cursor() try: offset = (page - 1) * size if keyword: cursor.execute(''' SELECT * FROM patients WHERE name LIKE ? OR phone LIKE ? OR email LIKE ? ORDER BY created_at DESC LIMIT ? OFFSET ? ''', (f'%{keyword}%', f'%{keyword}%', f'%{keyword}%', size, offset)) else: cursor.execute(''' SELECT * FROM patients ORDER BY created_at DESC LIMIT ? OFFSET ? ''', (size, offset)) rows = cursor.fetchall() return [dict(row) for row in rows] except Exception as e: logger.error(f'获取患者列表失败: {e}') raise def get_patients_count(self, keyword: str = '') -> int: """获取患者总数""" conn = self.get_connection() cursor = conn.cursor() try: if keyword: cursor.execute(''' SELECT COUNT(*) FROM patients WHERE name LIKE ? OR phone LIKE ? ''', (f'%{keyword}%', f'%{keyword}%')) else: cursor.execute('SELECT COUNT(*) FROM patients') return cursor.fetchone()[0] except Exception as e: logger.error(f'获取患者总数失败: {e}') return 0 def get_patient(self, patient_id: str) -> Optional[Dict]: """获取单个患者信息""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute('SELECT * FROM patients WHERE id = ?', (patient_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f'获取患者信息失败: {e}') return None def update_patient(self, patient_id: str, patient_data: Dict[str, Any]): """更新患者信息""" # 验证必填字段 if not patient_data.get('name'): raise ValueError('患者姓名不能为空') # 验证患者是否存在 if not self.get_patient(patient_id): raise ValueError(f'患者不存在: {patient_id}') conn = self.get_connection() cursor = conn.cursor() try: # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' UPDATE patients SET name = ?, gender = ?, birth_date = ?, nationality = ?, residence = ?, height = ?, weight = ?, shoe_size = ?, phone = ?, email = ?, occupation = ?, workplace = ?, medical_history = ?, notes = ?, updated_at = ? WHERE id = ? ''', ( patient_data.get('name'), patient_data.get('gender'), patient_data.get('birth_date'), patient_data.get('nationality'), patient_data.get('residence'), patient_data.get('height'), patient_data.get('weight'), patient_data.get('shoe_size'), patient_data.get('phone'), patient_data.get('email'), patient_data.get('occupation'), patient_data.get('workplace'), patient_data.get('medical_history'), patient_data.get('notes'), china_time, patient_id )) conn.commit() logger.info(f'更新患者信息: {patient_id}') except Exception as e: conn.rollback() logger.error(f'更新患者信息失败: {e}') raise def delete_patient(self, patient_id: str): """删除患者记录""" # 验证患者是否存在 if not self.get_patient(patient_id): raise ValueError(f'患者不存在: {patient_id}') conn = self.get_connection() cursor = conn.cursor() try: # 删除相关的检测数据 cursor.execute(''' DELETE FROM detection_data WHERE session_id IN ( SELECT id FROM detection_sessions WHERE patient_id = ? ) ''', (patient_id,)) # 删除检测会话 cursor.execute('DELETE FROM detection_sessions WHERE patient_id = ?', (patient_id,)) # 删除患者记录 cursor.execute('DELETE FROM patients WHERE id = ?', (patient_id,)) conn.commit() logger.info(f'删除患者记录: {patient_id}') except Exception as e: conn.rollback() logger.error(f'删除患者记录失败: {e}') raise def batch_delete_patients(self, patient_ids: List[str]): """批量删除患者记录""" if not patient_ids: return conn = self.get_connection() cursor = conn.cursor() try: # 验证所有患者是否存在 placeholders = ','.join(['?' for _ in patient_ids]) cursor.execute(f'SELECT COUNT(*) FROM patients WHERE id IN ({placeholders})', patient_ids) existing_count = cursor.fetchone()[0] if existing_count != len(patient_ids): raise ValueError('部分患者记录不存在') # 批量删除相关的检测数据 cursor.execute(f''' DELETE FROM detection_data WHERE session_id IN ( SELECT id FROM detection_sessions WHERE patient_id IN ({placeholders}) ) ''', patient_ids) # 批量删除检测会话 cursor.execute(f'DELETE FROM detection_sessions WHERE patient_id IN ({placeholders})', patient_ids) # 批量删除患者记录 cursor.execute(f'DELETE FROM patients WHERE id IN ({placeholders})', patient_ids) conn.commit() logger.info(f'批量删除患者记录: {len(patient_ids)}条') except Exception as e: conn.rollback() logger.error(f'批量删除患者记录失败: {e}') raise # ==================== 检测会话管理 ==================== def create_detection_session(self, patient_id: str, settings: Dict[str, Any], creator_id: str = None) -> str: """创建检测会话""" conn = self.get_connection() cursor = conn.cursor() try: session_id = self.generate_session_id() # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' INSERT INTO detection_sessions ( id, patient_id, creator_id, duration, frequency, settings, status, diagnosis_info, treatment_info, suggestion_info, notes, start_time, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( session_id, patient_id, creator_id, settings.get('duration', 60), settings.get('frequency', 60), json.dumps(settings), 'created', settings.get('diagnosis_info', ''), settings.get('treatment_info', ''), settings.get('suggestion_info', ''), settings.get('notes', ''), china_time, china_time )) conn.commit() logger.info(f'创建检测会话: {session_id}') return session_id except Exception as e: conn.rollback() logger.error(f'创建检测会话失败: {e}') raise def update_session_status(self, session_id: str, status: str, data_points: int = 0): """更新会话状态""" conn = self.get_connection() cursor = conn.cursor() try: if status in ['completed', 'stopped', 'error']: # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' UPDATE detection_sessions SET status = ?, data_points = ?, end_time = ? WHERE id = ? ''', (status, data_points, china_time, session_id)) else: cursor.execute(''' UPDATE detection_sessions SET status = ?, data_points = ? WHERE id = ? ''', (status, data_points, session_id)) conn.commit() logger.info(f'更新会话状态: {session_id} -> {status}') except Exception as e: conn.rollback() logger.error(f'更新会话状态失败: {e}') raise def update_session_duration(self, session_id: str, duration: int): """更新会话持续时间""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' UPDATE detection_sessions SET duration = ? WHERE id = ? ''', (duration, session_id)) conn.commit() logger.info(f'更新会话持续时间: {session_id} -> {duration}秒') except Exception as e: conn.rollback() logger.error(f'更新会话持续时间失败: {e}') raise def update_session_normal_video_path(self, session_id: str, video_path: str): """更新会话足部检测视频路径""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' UPDATE detection_sessions SET normal_video_path = ? WHERE id = ? ''', (video_path, session_id)) conn.commit() logger.info(f'更新会话足部检测视频路径: {session_id} -> {video_path}') except Exception as e: conn.rollback() logger.error(f'更新会话足部检测视频路径失败: {e}') raise def update_session_femtobolt_video_path(self, session_id: str, video_path: str): """更新会话深度相机视频路径""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' UPDATE detection_sessions SET femtobolt_video_path = ? WHERE id = ? ''', (video_path, session_id)) conn.commit() logger.info(f'更新会话深度相机视频路径: {session_id} -> {video_path}') except Exception as e: conn.rollback() logger.error(f'更新会话深度相机视频路径失败: {e}') raise def update_session_screen_video_path(self, session_id: str, video_path: str): """更新会话屏幕录制视频路径""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' UPDATE detection_sessions SET screen_video_path = ? WHERE id = ? ''', (video_path, session_id)) conn.commit() logger.info(f'更新会话屏幕录制视频路径: {session_id} -> {video_path}') except Exception as e: conn.rollback() logger.error(f'更新会话屏幕录制视频路径失败: {e}') raise def update_session_diagnosis_info(self, session_id: str, diagnosis_info: str): """更新会话诊断信息""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' UPDATE detection_sessions SET diagnosis_info = ? WHERE id = ? ''', (diagnosis_info, session_id)) conn.commit() logger.info(f'更新会话诊断信息: {session_id}') except Exception as e: conn.rollback() logger.error(f'更新会话诊断信息失败: {e}') raise def update_session_treatment_info(self, session_id: str, treatment_info: str): """更新会话处理信息""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' UPDATE detection_sessions SET treatment_info = ? WHERE id = ? ''', (treatment_info, session_id)) conn.commit() logger.info(f'更新会话处理信息: {session_id}') except Exception as e: conn.rollback() logger.error(f'更新会话处理信息失败: {e}') raise def update_session_suggestion_info(self, session_id: str, suggestion_info: str): """更新会话建议信息""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' UPDATE detection_sessions SET suggestion_info = ? WHERE id = ? ''', (suggestion_info, session_id)) conn.commit() logger.info(f'更新会话建议信息: {session_id}') except Exception as e: conn.rollback() logger.error(f'更新会话建议信息失败: {e}') raise def get_detection_sessions(self, page: int = 1, size: int = 10, patient_id: str = None) -> List[Dict]: """获取检测会话列表""" conn = self.get_connection() cursor = conn.cursor() try: offset = (page - 1) * size if patient_id: cursor.execute(''' SELECT s.*, p.name as patient_name, u.name as creator_name FROM detection_sessions s LEFT JOIN patients p ON s.patient_id = p.id LEFT JOIN users u ON s.creator_id = u.id WHERE s.patient_id = ? ORDER BY s.start_time DESC LIMIT ? OFFSET ? ''', (patient_id, size, offset)) else: cursor.execute(''' SELECT s.*, p.name as patient_name, u.name as creator_name FROM detection_sessions s LEFT JOIN patients p ON s.patient_id = p.id LEFT JOIN users u ON s.creator_id = u.id ORDER BY s.start_time DESC LIMIT ? OFFSET ? ''', (size, offset)) rows = cursor.fetchall() sessions = [] for row in rows: session = dict(row) # 解析设置JSON if session['settings']: try: session['settings'] = json.loads(session['settings']) except: session['settings'] = {} sessions.append(session) return sessions except Exception as e: logger.error(f'获取检测会话列表失败: {e}') raise def get_sessions_count(self, patient_id: str = None) -> int: """获取会话总数""" conn = self.get_connection() cursor = conn.cursor() try: if patient_id: cursor.execute('SELECT COUNT(*) FROM detection_sessions WHERE patient_id = ?', (patient_id,)) else: cursor.execute('SELECT COUNT(*) FROM detection_sessions') return cursor.fetchone()[0] except Exception as e: logger.error(f'获取会话总数失败: {e}') return 0 def get_session_data(self, session_id: str) -> Optional[Dict]: """获取会话详细数据""" conn = self.get_connection() cursor = conn.cursor() try: # 获取会话基本信息 cursor.execute(''' SELECT s.*, p.name as patient_name, u.name as creator_name FROM detection_sessions s LEFT JOIN patients p ON s.patient_id = p.id LEFT JOIN users u ON s.creator_id = u.id WHERE s.id = ? ''', (session_id,)) session_row = cursor.fetchone() if not session_row: return None session = dict(session_row) # 解析设置JSON if session['settings']: try: session['settings'] = json.loads(session['settings']) except: session['settings'] = {} # 获取检测数据 cursor.execute(''' SELECT * FROM detection_data WHERE session_id = ? ORDER BY timestamp ''', (session_id,)) data_rows = cursor.fetchall() session['data'] = [] for row in data_rows: data_point = dict(row) # 解析数据JSON try: data_point['data_value'] = json.loads(data_point['data_value']) except: pass session['data'].append(data_point) return session except Exception as e: logger.error(f'获取会话数据失败: {e}') return None # ==================== 检测记录数据管理 ==================== def save_detection_data(self, session_id: str, data: Dict[str, Any]): """保存检测数据""" conn = self.get_connection() cursor = conn.cursor() try: # 使用中国时区时间 china_time = self.get_china_time() # 生成检测数据记录ID data_id = self.generate_detection_data_id() # 根据新的表结构保存数据 cursor.execute(''' INSERT INTO detection_data ( id, session_id, head_pose, body_pose, body_image, foot_data, foot_image, foot_data_image, screen_image, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( data_id, session_id, json.dumps(data.get('head_pose')) if data.get('head_pose') else None, json.dumps(data.get('body_pose')) if data.get('body_pose') else None, data.get('body_image'), json.dumps(data.get('foot_data')) if data.get('foot_data') else None, data.get('foot_image'), data.get('foot_data_image'), data.get('screen_image'), china_time )) conn.commit() logger.info(f'保存检测数据: {data_id}') except Exception as e: conn.rollback() logger.error(f'保存检测数据失败: {e}') raise def get_latest_detection_data(self, session_id: str, limit: int = 10) -> List[Dict]: """获取最新的检测数据""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT * FROM detection_data WHERE session_id = ? ORDER BY timestamp DESC LIMIT ? ''', (session_id, limit)) rows = cursor.fetchall() data_points = [] for row in rows: data_point = dict(row) # 解析JSON字段 try: if data_point.get('head_pose'): data_point['head_pose'] = json.loads(data_point['head_pose']) except: pass try: if data_point.get('body_pose'): data_point['body_pose'] = json.loads(data_point['body_pose']) except: pass try: if data_point.get('foot_data'): data_point['foot_data'] = json.loads(data_point['foot_data']) except: pass data_points.append(data_point) return data_points except Exception as e: logger.error(f'获取最新检测数据失败: {e}') return [] def delete_detection_data(self, data_id: str): """按主键删除检测数据记录""" conn = self.get_connection() cursor = conn.cursor() try: # 验证记录是否存在 cursor.execute('SELECT COUNT(*) FROM detection_data WHERE id = ?', (data_id,)) if cursor.fetchone()[0] == 0: raise ValueError(f'检测数据记录不存在: {data_id}') # 删除检测数据记录 cursor.execute('DELETE FROM detection_data WHERE id = ?', (data_id,)) conn.commit() logger.info(f'删除检测数据记录: {data_id}') except Exception as e: conn.rollback() logger.error(f'删除检测数据记录失败: {e}') raise # ==================== 系统设置管理 ==================== def get_setting(self, key: str, default_value: Any = None) -> Any: """获取系统设置""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute('SELECT value FROM system_settings WHERE key = ?', (key,)) row = cursor.fetchone() if row: try: return json.loads(row['value']) except: return row['value'] return default_value except Exception as e: logger.error(f'获取系统设置失败: {e}') return default_value def set_setting(self, key: str, value: Any, description: str = ''): """设置系统设置""" conn = self.get_connection() cursor = conn.cursor() try: value_str = json.dumps(value) if not isinstance(value, str) else value # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' INSERT OR REPLACE INTO system_settings (key, value, description, updated_at) VALUES (?, ?, ?, ?) ''', (key, value_str, description, china_time)) conn.commit() logger.info(f'设置系统设置: {key}') except Exception as e: conn.rollback() logger.error(f'设置系统设置失败: {e}') raise # ==================== 用户管理 ==================== def register_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]: """用户注册""" conn = self.get_connection() cursor = conn.cursor() try: # 检查手机号是否已存在(如果提供了手机号) if user_data.get('phone'): cursor.execute('SELECT COUNT(*) FROM users WHERE phone = ?', (user_data['phone'],)) if cursor.fetchone()[0] > 0: return { 'success': False, 'message': '手机号已存在' } user_id = self.generate_user_id() # 密码明文存储 password = user_data['password'] # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' INSERT INTO users (id, name, username, password, phone, is_active, user_type, register_date, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( user_id, user_data['name'], user_data['username'], password, user_data.get('phone'), # 手机号可选 1, # 新注册用户默认激活 'user', china_time, china_time, china_time )) conn.commit() logger.info(f'用户注册成功: {user_data["username"]}') return { 'success': True, 'user_id': user_id, 'message': '注册成功' } except sqlite3.IntegrityError: conn.rollback() logger.error(f'用户名已存在: {user_data["username"]}') return { 'success': False, 'message': '用户名已存在' } except Exception as e: conn.rollback() logger.error(f'用户注册失败: {e}') return { 'success': False, 'message': '注册失败' } def authenticate_user(self, username: str, password: str) -> Optional[Dict]: """用户登录验证""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT * FROM users WHERE username = ? AND password = ? AND is_active = 1 ''', (username, password)) row = cursor.fetchone() if row: user = dict(row) # 不返回密码 del user['password'] logger.info(f'用户登录成功: {username}') return user else: logger.warning(f'用户登录失败: {username}') return None except Exception as e: logger.error(f'用户验证失败: {e}') return None def get_user_by_phone(self, phone: str) -> Optional[Dict]: """根据手机号查询用户""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute('SELECT * FROM users WHERE phone = ?', (phone,)) row = cursor.fetchone() if row: user = dict(row) # 不返回密码 del user['password'] return user return None except Exception as e: logger.error(f'根据手机号查询用户失败: {e}') return None def get_users(self, page: int = 1, size: int = 10, status: str = 'all') -> List[Dict]: """获取用户列表""" conn = self.get_connection() cursor = conn.cursor() try: offset = (page - 1) * size if status == 'pending': cursor.execute(''' SELECT id, name, username, phone, register_date, is_active, user_type, created_at FROM users WHERE is_active = 0 AND user_type = 'user' ORDER BY created_at DESC LIMIT ? OFFSET ? ''', (size, offset)) elif status == 'active': cursor.execute(''' SELECT id, name, username, phone, register_date, is_active, user_type, created_at FROM users WHERE is_active = 1 ORDER BY created_at DESC LIMIT ? OFFSET ? ''', (size, offset)) else: cursor.execute(''' SELECT id, name, username, phone, register_date, is_active, user_type, created_at FROM users ORDER BY created_at DESC LIMIT ? OFFSET ? ''', (size, offset)) rows = cursor.fetchall() return [dict(row) for row in rows] except Exception as e: logger.error(f'获取用户列表失败: {e}') return [] def get_users_count(self, status: str = 'all') -> int: """获取用户总数""" conn = self.get_connection() cursor = conn.cursor() try: if status == 'pending': cursor.execute('SELECT COUNT(*) FROM users WHERE is_active = 0 AND user_type = "user"') elif status == 'active': cursor.execute('SELECT COUNT(*) FROM users WHERE is_active = 1') else: cursor.execute('SELECT COUNT(*) FROM users') return cursor.fetchone()[0] except Exception as e: logger.error(f'获取用户总数失败: {e}') return 0 def approve_user(self, user_id: str, approved: bool = True): """审核用户""" conn = self.get_connection() cursor = conn.cursor() try: # 使用中国时区时间 china_time = self.get_china_time() cursor.execute(''' UPDATE users SET is_active = ?, updated_at = ? WHERE id = ? ''', (1 if approved else 0, china_time, user_id)) conn.commit() status = '通过' if approved else '拒绝' logger.info(f'用户审核{status}: {user_id}') except Exception as e: conn.rollback() logger.error(f'用户审核失败: {e}') raise def get_user(self, user_id: str) -> Optional[Dict]: """获取单个用户信息""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT id, name, username, phone, register_date, is_active, user_type, created_at, updated_at FROM users WHERE id = ? ''', (user_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f'获取用户信息失败: {e}') return None def update_user(self, user_id: str, user_data: Dict[str, Any]): """更新用户信息""" conn = self.get_connection() cursor = conn.cursor() try: # 密码明文存储,无需加密处理 # 构建更新语句 fields = [] values = [] for key, value in user_data.items(): if key in ['name', 'username', 'password', 'is_active', 'user_type','phone']: fields.append(f'{key} = ?') values.append(value) if fields: # 使用中国时区时间 china_time = self.get_china_time() fields.append('updated_at = ?') values.append(china_time) values.append(user_id) sql = f"UPDATE users SET {', '.join(fields)} WHERE id = ?" cursor.execute(sql, values) conn.commit() logger.info(f'更新用户信息: {user_id}') except Exception as e: conn.rollback() logger.error(f'更新用户信息失败: {e}') raise def delete_user(self, user_id: str): """删除用户""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute('DELETE FROM users WHERE id = ?', (user_id,)) conn.commit() logger.info(f'删除用户: {user_id}') except Exception as e: conn.rollback() logger.error(f'删除用户失败: {e}') raise def get_system_setting(self, key: str, default_value: str = None) -> Optional[str]: """获取系统设置值""" conn = self.get_connection() cursor = conn.cursor() try: cursor.execute('SELECT value FROM system_settings WHERE key = ?', (key,)) result = cursor.fetchone() if result: return result[0] else: logger.warning(f'系统设置项不存在: {key}') return default_value except Exception as e: logger.error(f'获取系统设置失败: {e}') return default_value def set_system_setting(self, key: str, value: str, description: str = None): """设置系统设置值""" conn = self.get_connection() cursor = conn.cursor() try: china_time = self.get_china_time() # 检查设置项是否存在 cursor.execute('SELECT COUNT(*) FROM system_settings WHERE key = ?', (key,)) exists = cursor.fetchone()[0] if exists: # 更新现有设置 cursor.execute(''' UPDATE system_settings SET value = ?, updated_at = ? WHERE key = ? ''', (value, china_time, key)) else: # 插入新设置 cursor.execute(''' INSERT INTO system_settings (key, value, description, updated_at) VALUES (?, ?, ?, ?) ''', (key, value, description, china_time)) conn.commit() logger.info(f'设置系统配置: {key} = {value}') except Exception as e: conn.rollback() logger.error(f'设置系统配置失败: {e}') raise def close(self): """关闭数据库连接""" if self.connection: self.connection.close() self.connection = None logger.info('数据库连接已关闭')