diff --git a/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsTaskServiceImpl.java b/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsTaskServiceImpl.java index 9689437..3162f0e 100644 --- a/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsTaskServiceImpl.java +++ b/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsTaskServiceImpl.java @@ -5,11 +5,13 @@ import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import cn.hutool.json.JSONObject; import cn.hutool.core.util.StrUtil; +import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.databind.ObjectMapper; import com.yfd.platform.config.ResponseResult; +import com.yfd.platform.modules.common.exception.BizException; import com.yfd.platform.modules.experimentalData.domain.TsFiles; import com.yfd.platform.modules.experimentalData.domain.TsNodes; import com.yfd.platform.modules.experimentalData.domain.TsTask; @@ -112,7 +114,7 @@ public class TsTaskServiceImpl extends ServiceImpl impleme private DataSource dataSource; @Autowired private JdbcTemplate jdbcTemplate; - + private static final int FILE_INSERT_BATCH_SIZE = 500; private static final String INITIAL_CODE = "00001"; private static final int MAX_CODE_VALUE = 99999; @@ -929,6 +931,13 @@ public class TsTaskServiceImpl extends ServiceImpl impleme if (field.getName().equals("serialVersionUID")) { continue; } + + // 检查 TableField 注解,跳过不存在于数据库的字段 + TableField tableFieldAnnotation = field.getAnnotation(TableField.class); + if (tableFieldAnnotation != null && tableFieldAnnotation.exist() == false) { + continue; + } + // 跳过非数据库字段(可以根据实际需要调整条件) if (field.getName().equals("key")) { continue; @@ -1001,74 +1010,12 @@ public class TsTaskServiceImpl extends ServiceImpl impleme return value.toString(); } - - - - // 生成CREATE TABLE SQL语句的方法 private String generateCreateTableSql(String tableName) { // 使用 LIKE 语句创建与 ts_files 结构相同的新表 return "CREATE TABLE IF NOT EXISTS `" + tableName + "` LIKE `ts_files`;"; } - /** - * 从ZIP文件中导入任务SQL数据 - * - * @param file ZIP文件,包含SQL文件 - * @param taskCode 目标任务编号,用于替换SQL中的原始任务编号 - * @param localStorageId 本地存储ID - * @param backupStorageId 备份存储ID - * @return 导入成功返回true,失败返回false - * @throws IOException 文件读取或解压过程中发生IO异常 - */ - @Override - public boolean importTaskSqlFromZip(MultipartFile file, String taskCode, int localStorageId, int backupStorageId) throws IOException { - - // 1. 解压ZIP文件 - byte[] zipBytes = file.getBytes(); - //根据文件名称获取原始任务编号:00002_data.sql.zip - String originalFilename = file.getOriginalFilename(); - String originalTaskCode = extractTaskCodeFromFileName(originalFilename); - List sqlFiles = extractSqlFromZip(zipBytes); - - // 2. 替换 SQL 文件中的任务编号,存储空间编号。 - List allSqlStatements = new ArrayList<>(); - for (SqlFileContent sqlFile : sqlFiles) { - LOGGER.debug("原始SQL内容: {}", sqlFile.getContent().substring(0, Math.min(200, sqlFile.getContent().length()))); - String modifiedContent = replaceTaskCodeInSql(sqlFile.getContent(), originalTaskCode, taskCode, localStorageId, backupStorageId); - LOGGER.debug("替换后SQL内容: {}", modifiedContent.substring(0, Math.min(200, modifiedContent.length()))); - allSqlStatements.addAll(parseSqlStatements(modifiedContent)); - } - - // 3. 拆分 DDL 与 INSERT 语句,合并相同表的 INSERT,提高执行效率 - List ddlSqlList = new ArrayList<>(); - List dmlSqlList = new ArrayList<>(); - - for (String sql : allSqlStatements) { - String upper = sql.trim().toUpperCase(); - if (upper.startsWith("CREATE") || upper.startsWith("DROP") || upper.startsWith("ALTER") || upper.startsWith("SET")) { - ddlSqlList.add(sql); - } else if (upper.startsWith("INSERT")) { - dmlSqlList.add(sql); - } - } - - // 先执行 DDL - LOGGER.info("开始执行DDL语句,共 {} 条", ddlSqlList.size()); - executeSqlImport(ddlSqlList); - - // 再合并并执行 DML - List mergedSqlStatements = mergeInsertStatements(dmlSqlList); - LOGGER.info("开始导入数据,共 {} 条INSERT语句,合并后 {} 条", dmlSqlList.size(), mergedSqlStatements.size()); - long startTime = System.currentTimeMillis(); - - // 4. 执行 SQL 批量导入 - boolean result = executeSqlImport(mergedSqlStatements); - - long endTime = System.currentTimeMillis(); - LOGGER.info("SQL导入完成,耗时 {} ms", (endTime - startTime)); - return result; - } /** * 从文件名中提取原始任务编号 @@ -1123,91 +1070,51 @@ public class TsTaskServiceImpl extends ServiceImpl impleme return sqlFiles; } - private String replaceTaskCodeInSql(String sqlContent, String originalTaskCode, String newTaskCode, int localStorageId, int backupStorageId) { - if (StrUtil.isBlank(originalTaskCode)) { + + private String replaceTaskCodeInSql( + String sqlContent, + String originalTaskCode, + String newTaskCode, + int localStorageId, + int backupStorageId + ) { + if (StrUtil.isBlank(originalTaskCode) || StrUtil.isBlank(sqlContent)) { return sqlContent; } - // 同时替换 ts_task 表中的 task_code 值和存储空间ID值 - Pattern taskPattern = Pattern.compile( - "(INSERT\\s+INTO\\s+`?ts_task`?\\s*\\([^)]*?task_code[^)]*?\\)\\s*VALUES\\s*\\([^)]*?)('"+Pattern.quote(originalTaskCode)+"')([^)]*)(? parseSqlStatements(String sqlContent) { List sqlList = new ArrayList<>(); @@ -1332,62 +1239,6 @@ public class TsTaskServiceImpl extends ServiceImpl impleme return failedCount; } - // =========================== INSERT 合并优化 =========================== - private List mergeInsertStatements(List sqlStatements) { - Map> groupedStatements = new LinkedHashMap<>(); - - Pattern pattern = Pattern.compile( - "INSERT\\s+INTO\\s+`?(\\w+)`?\\s*\\(([^)]+)\\)\\s*VALUES\\s*\\((.*)\\)", - Pattern.CASE_INSENSITIVE - ); - - for (String sql : sqlStatements) { - Matcher matcher = pattern.matcher(sql); - if (matcher.find()) { - String table = matcher.group(1); - String columns = matcher.group(2); - String values = matcher.group(3); - - // 使用表名+列名作为分组键 - String key = table + ":" + columns; - - groupedStatements.computeIfAbsent(key, k -> new ArrayList<>()).add(values); - } else { - // 无法匹配的语句单独处理 - groupedStatements.computeIfAbsent("_others", k -> new ArrayList<>()).add(sql); - } - } - - List mergedSql = new ArrayList<>(); - for (Map.Entry> entry : groupedStatements.entrySet()) { - String key = entry.getKey(); - List valuesList = entry.getValue(); - - if ("_others".equals(key)) { - mergedSql.addAll(valuesList); - } else { - String[] parts = key.split(":", 2); - String table = parts[0]; - String columns = parts[1]; - - StringBuilder sb = new StringBuilder(); - sb.append("INSERT INTO `").append(table).append("` (").append(columns).append(") VALUES "); - - for (int i = 0; i < valuesList.size(); i++) { - if (i > 0) { - sb.append(","); - } - sb.append("(").append(valuesList.get(i)).append(")"); - } - sb.append(";"); - - mergedSql.add(sb.toString()); - } - } - - return mergedSql; - } - // =========================== 内部类定义 =========================== @@ -1437,6 +1288,154 @@ public class TsTaskServiceImpl extends ServiceImpl impleme } } + @Transactional + public boolean importTaskSqlFromZip(MultipartFile file, + String taskCode, + int localStorageId, + int backupStorageId) throws IOException { + if ( file == null || file.isEmpty()) { + throw new IllegalArgumentException("ZIP 文件不能为空"); + } + long startTime = System.currentTimeMillis(); + LOGGER.info("开始导入任务 SQL,taskCode={}, 文件名={}", taskCode, file.getOriginalFilename()); + // 1. 获取原始任务编号 + String originalFilename = file.getOriginalFilename(); + String originalTaskCode = extractTaskCodeFromFileName(originalFilename); // 例如 00002 + LOGGER.debug("原始任务编号: {}", originalTaskCode); + // 2. 解压 ZIP 并获取 SQL 文件内容 + byte[] zipBytes = file.getBytes(); + List sqlFiles = extractSqlFromZip(zipBytes); + LOGGER.info("解压完成,共找到 {} 个 SQL 文件", sqlFiles.size()); + + // 3. 替换 SQL 中的任务编号、存储空间 + List allSqlStatements = new ArrayList<>(); + for (SqlFileContent sqlFile : sqlFiles) { + String modifiedContent = replaceTaskCodeInSql(sqlFile.getContent(), originalTaskCode, taskCode, localStorageId, backupStorageId); + allSqlStatements.addAll(parseSqlStatements(modifiedContent)); + LOGGER.debug("处理 SQL 文件 长度 {} 字符", modifiedContent.length()); + } + LOGGER.info("SQL 替换完成,总语句数: {}", allSqlStatements.size()); + + // 4. 拆分 DDL、TS_TASK、TS_NODES、TS_FILES + List ddlSqlList = new ArrayList<>(); + String taskInsertSql = null; + List nodeInsertSqls = new ArrayList<>(); + List fileInsertSqls = new ArrayList<>(); + + for (String sql : allSqlStatements) { + String upper = sql.trim().toUpperCase(); + if (upper.startsWith("CREATE") || upper.startsWith("DROP") || upper.startsWith("ALTER") || upper.startsWith("SET")) { + ddlSqlList.add(sql); + } else if (upper.startsWith("INSERT INTO TS_TASK")) { + taskInsertSql = sql; + } else if (upper.startsWith("INSERT INTO TS_NODES")) { + nodeInsertSqls.add(sql); + } else if (upper.startsWith("INSERT INTO TS_FILES_")) { + fileInsertSqls.add(sql); + } + } + + LOGGER.info("拆分 SQL 完成:DDL {} 条,TS_TASK {} 条,TS_NODES {} 条,TS_FILES {} 条", + ddlSqlList.size(), + taskInsertSql != null ? 1 : 0, + nodeInsertSqls.size(), + fileInsertSqls.size()); + + + // 5. 执行 DDL + if (!ddlSqlList.isEmpty()) { + LOGGER.info("开始执行 {} 条 DDL 语句", ddlSqlList.size()); + executeSqlImport(ddlSqlList); + LOGGER.info("DDL 执行完成"); + } + + // 6. 执行 TS_TASK + if (taskInsertSql != null) { + LOGGER.info("插入 TS_TASK 记录"); + jdbcTemplate.update(taskInsertSql); + LOGGER.info("TS_TASK 插入完成"); + } + + // 7. 执行 TS_NODES + if(!nodeInsertSqls.isEmpty()) { + LOGGER.info("插入 TS_NODES {} 条", nodeInsertSqls.size()); + for (String nodeSql : nodeInsertSqls) { + jdbcTemplate.update(nodeSql); + } + LOGGER.info("TS_NODES 插入完成"); + } + + // 8. 流式分批插入 TS_FILES + if (!fileInsertSqls.isEmpty()) { + LOGGER.info("开始分批插入 TS_FILES,共 {} 条", fileInsertSqls.size()); + int batchSize = 2000; // 可调 + List batch = new ArrayList<>(batchSize); + int batchCount = 0; + for (String fileSql : fileInsertSqls) { + batch.add(fileSql); + if (batch.size() >= batchSize) { + batchCount++; + String mergedSql = mergeFileInsert(batch); + jdbcTemplate.update(mergedSql); + LOGGER.info("已插入 TS_FILES 第 {} 批 ({} 条记录)", batchCount, batch.size()); + batch.clear(); + } + } + if (!batch.isEmpty()) { + batchCount++; + String mergedSql = mergeFileInsert(batch); + jdbcTemplate.update(mergedSql); + LOGGER.info("已插入 TS_FILES 第 {} 批 ({} 条记录)", batchCount, batch.size()); + } + LOGGER.info("TS_FILES 插入完成,共 {} 批", batchCount); + } + + long endTime = System.currentTimeMillis(); + LOGGER.info("任务 SQL 导入完成,taskCode={},耗时 {} ms", taskCode, (endTime - startTime)); + + return true; + } + + + + /** + * 合并同一表的 INSERT VALUES + */ + private String mergeFileInsert(List insertSqls) { + if (insertSqls.isEmpty()) return ""; + + String first = insertSqls.get(0); + Matcher matcher = Pattern.compile( + "INSERT\\s+INTO\\s+`?(\\w+)`?\\s*\\(([^)]+)\\)\\s*VALUES\\s*\\((.*)\\)", + Pattern.CASE_INSENSITIVE + ).matcher(first); + + if (!matcher.find()) { + return String.join(";", insertSqls); // 无法解析,原样返回 + } + + String table = matcher.group(1); + String columns = matcher.group(2); + + StringBuilder sb = new StringBuilder("INSERT IGNORE INTO `").append(table).append("` (") + .append(columns).append(") VALUES "); + + for (int i = 0; i < insertSqls.size(); i++) { + Matcher m = Pattern.compile( + "INSERT\\s+INTO\\s+`?\\w+`?\\s*\\([^)]*\\)\\s*VALUES\\s*\\((.*)\\)", + Pattern.CASE_INSENSITIVE + ).matcher(insertSqls.get(i)); + + if (m.find()) { + if (i > 0) sb.append(","); + sb.append("(").append(m.group(1)).append(")"); + } + } + sb.append(";"); + return sb.toString(); + } + + }