任务导出导入优化

This commit is contained in:
wanxiaoli 2026-01-12 20:13:45 +08:00
parent 3382188eca
commit da930d6adc

View File

@ -5,11 +5,13 @@ import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yfd.platform.config.ResponseResult;
import com.yfd.platform.modules.common.exception.BizException;
import com.yfd.platform.modules.experimentalData.domain.TsFiles;
import com.yfd.platform.modules.experimentalData.domain.TsNodes;
import com.yfd.platform.modules.experimentalData.domain.TsTask;
@ -112,7 +114,7 @@ public class TsTaskServiceImpl extends ServiceImpl<TsTaskMapper, TsTask> impleme
private DataSource dataSource;
@Autowired
private JdbcTemplate jdbcTemplate;
private static final int FILE_INSERT_BATCH_SIZE = 500;
private static final String INITIAL_CODE = "00001";
private static final int MAX_CODE_VALUE = 99999;
@ -929,6 +931,13 @@ public class TsTaskServiceImpl extends ServiceImpl<TsTaskMapper, TsTask> impleme
if (field.getName().equals("serialVersionUID")) {
continue;
}
// 检查 TableField 注解跳过不存在于数据库的字段
TableField tableFieldAnnotation = field.getAnnotation(TableField.class);
if (tableFieldAnnotation != null && tableFieldAnnotation.exist() == false) {
continue;
}
// 跳过非数据库字段可以根据实际需要调整条件
if (field.getName().equals("key")) {
continue;
@ -1001,74 +1010,12 @@ public class TsTaskServiceImpl extends ServiceImpl<TsTaskMapper, TsTask> impleme
return value.toString();
}
// 生成CREATE TABLE SQL语句的方法
private String generateCreateTableSql(String tableName) {
// 使用 LIKE 语句创建与 ts_files 结构相同的新表
return "CREATE TABLE IF NOT EXISTS `" + tableName + "` LIKE `ts_files`;";
}
/**
* 从ZIP文件中导入任务SQL数据
*
* @param file ZIP文件包含SQL文件
* @param taskCode 目标任务编号用于替换SQL中的原始任务编号
* @param localStorageId 本地存储ID
* @param backupStorageId 备份存储ID
* @return 导入成功返回true失败返回false
* @throws IOException 文件读取或解压过程中发生IO异常
*/
@Override
public boolean importTaskSqlFromZip(MultipartFile file, String taskCode, int localStorageId, int backupStorageId) throws IOException {
// 1. 解压ZIP文件
byte[] zipBytes = file.getBytes();
//根据文件名称获取原始任务编号00002_data.sql.zip
String originalFilename = file.getOriginalFilename();
String originalTaskCode = extractTaskCodeFromFileName(originalFilename);
List<SqlFileContent> sqlFiles = extractSqlFromZip(zipBytes);
// 2. 替换 SQL 文件中的任务编号存储空间编号
List<String> allSqlStatements = new ArrayList<>();
for (SqlFileContent sqlFile : sqlFiles) {
LOGGER.debug("原始SQL内容: {}", sqlFile.getContent().substring(0, Math.min(200, sqlFile.getContent().length())));
String modifiedContent = replaceTaskCodeInSql(sqlFile.getContent(), originalTaskCode, taskCode, localStorageId, backupStorageId);
LOGGER.debug("替换后SQL内容: {}", modifiedContent.substring(0, Math.min(200, modifiedContent.length())));
allSqlStatements.addAll(parseSqlStatements(modifiedContent));
}
// 3. 拆分 DDL INSERT 语句合并相同表的 INSERT提高执行效率
List<String> ddlSqlList = new ArrayList<>();
List<String> dmlSqlList = new ArrayList<>();
for (String sql : allSqlStatements) {
String upper = sql.trim().toUpperCase();
if (upper.startsWith("CREATE") || upper.startsWith("DROP") || upper.startsWith("ALTER") || upper.startsWith("SET")) {
ddlSqlList.add(sql);
} else if (upper.startsWith("INSERT")) {
dmlSqlList.add(sql);
}
}
// 先执行 DDL
LOGGER.info("开始执行DDL语句共 {} 条", ddlSqlList.size());
executeSqlImport(ddlSqlList);
// 再合并并执行 DML
List<String> mergedSqlStatements = mergeInsertStatements(dmlSqlList);
LOGGER.info("开始导入数据,共 {} 条INSERT语句合并后 {} 条", dmlSqlList.size(), mergedSqlStatements.size());
long startTime = System.currentTimeMillis();
// 4. 执行 SQL 批量导入
boolean result = executeSqlImport(mergedSqlStatements);
long endTime = System.currentTimeMillis();
LOGGER.info("SQL导入完成耗时 {} ms", (endTime - startTime));
return result;
}
/**
* 从文件名中提取原始任务编号
@ -1123,91 +1070,51 @@ public class TsTaskServiceImpl extends ServiceImpl<TsTaskMapper, TsTask> impleme
return sqlFiles;
}
private String replaceTaskCodeInSql(String sqlContent, String originalTaskCode, String newTaskCode, int localStorageId, int backupStorageId) {
if (StrUtil.isBlank(originalTaskCode)) {
private String replaceTaskCodeInSql(
String sqlContent,
String originalTaskCode,
String newTaskCode,
int localStorageId,
int backupStorageId
) {
if (StrUtil.isBlank(originalTaskCode) || StrUtil.isBlank(sqlContent)) {
return sqlContent;
}
// 同时替换 ts_task 表中的 task_code 值和存储空间ID值
Pattern taskPattern = Pattern.compile(
"(INSERT\\s+INTO\\s+`?ts_task`?\\s*\\([^)]*?task_code[^)]*?\\)\\s*VALUES\\s*\\([^)]*?)('"+Pattern.quote(originalTaskCode)+"')([^)]*)(?<!\\\\)\\s*,\\s*(-?\\d+)\\s*,\\s*(-?\\d+)\\s*\\)",
Pattern.CASE_INSENSITIVE
String result = sqlContent;
/* =========================
* 1. 替换 ts_files 表名CREATE + INSERT 全覆盖
* ========================= */
result = result.replaceAll(
"(?i)\\bts_files_" + Pattern.quote(originalTaskCode) + "\\b",
"ts_files_" + newTaskCode
);
Matcher taskMatcher = taskPattern.matcher(sqlContent);
StringBuffer taskResult = new StringBuffer();
while (taskMatcher.find()) {
String prefix = taskMatcher.group(1);
String suffix = taskMatcher.group(3);
String replacement = prefix + "'" + newTaskCode + "'" + suffix + "," + localStorageId + "," + backupStorageId + ")";
taskMatcher.appendReplacement(taskResult, Matcher.quoteReplacement(replacement));
}
taskMatcher.appendTail(taskResult);
String result = taskResult.toString();
// 替换 ts_files_* 表名
Pattern tableNamePattern = Pattern.compile(
"INSERT\\s+INTO\\s+`?(ts_files_" + Pattern.quote(originalTaskCode) + ")`?",
Pattern.CASE_INSENSITIVE
/* =========================
* 2. 替换 ts_task 中的 task_code
* 只改 VALUES 中的第 2 个字段
* ========================= */
result = result.replaceAll(
"(?i)(INSERT\\s+INTO\\s+`?ts_task`?\\s*\\([^)]*\\)\\s*VALUES\\s*\\(\\s*'[^']+'\\s*,\\s*)'[^']+'",
"$1'" + newTaskCode + "'"
);
Matcher tableNameMatcher = tableNamePattern.matcher(result);
StringBuffer tableNameResult = new StringBuffer();
/* =========================
* 3. 替换 local_storage_id / backup_storage_id
* 假设在 VALUES 末尾两个
* ========================= */
result = result.replaceAll(
"(?i)(INSERT\\s+INTO\\s+`?ts_task`?\\s*\\([^)]*\\)\\s*VALUES\\s*\\(.*?,)(-?\\d+)\\s*,\\s*(-?\\d+)(\\s*\\))",
"$1" + localStorageId + "," + backupStorageId + "$4"
);
while (tableNameMatcher.find()) {
String replacement = "INSERT INTO `ts_files_" + newTaskCode + "`";
tableNameMatcher.appendReplacement(tableNameResult, Matcher.quoteReplacement(replacement));
}
tableNameMatcher.appendTail(tableNameResult);
return tableNameResult.toString();
return result;
}
// private String replaceTaskCodeInSql(String sqlContent, String originalTaskCode, String newTaskCode, int localStorageId, int backupStorageId) {
// if (StrUtil.isBlank(originalTaskCode)) {
// return sqlContent;
// }
//
// // 1. 任务编号替换 - 使用精确匹配
// sqlContent = sqlContent.replace("'" + originalTaskCode + "'", "'" + newTaskCode + "'");
//
// // 2. 表名替换
// sqlContent = sqlContent.replace("ts_files_" + originalTaskCode, "ts_files_" + newTaskCode);
//
// // 3. 存储空间ID替换 - 针对INSERT语句进行精确替换
// // 使用更复杂的正则表达式来匹配完整的INSERT语句并替换对应的值
// sqlContent = replaceStorageIdsInInsertStatement(sqlContent, localStorageId, backupStorageId);
//
// return sqlContent;
// }
private String replaceStorageIdsInInsertStatement(String sqlContent, int localStorageId, int backupStorageId) {
// 只匹配 ts_task 表的 INSERT 语句并替换最后两个字段值
Pattern insertPattern = Pattern.compile(
"(INSERT\\s+INTO\\s+`?ts_task`?\\s*\\([^)]*\\)\\s*VALUES\\s*\\([^)]*)(?<!\\\\)\\s*,\\s*(-?\\d+)\\s*,\\s*(-?\\d+)\\s*\\)",
Pattern.CASE_INSENSITIVE
);
Matcher matcher = insertPattern.matcher(sqlContent);
StringBuffer result = new StringBuffer();
while (matcher.find()) {
String prefix = matcher.group(1);
// 使用新的存储空间ID替换原有的值
String modifiedInsert = prefix + "," + localStorageId + "," + backupStorageId + ")";
matcher.appendReplacement(result, Matcher.quoteReplacement(modifiedInsert));
}
matcher.appendTail(result);
return result.toString();
}
// 解析SQL语句
private List<String> parseSqlStatements(String sqlContent) {
List<String> sqlList = new ArrayList<>();
@ -1332,62 +1239,6 @@ public class TsTaskServiceImpl extends ServiceImpl<TsTaskMapper, TsTask> impleme
return failedCount;
}
// =========================== INSERT 合并优化 ===========================
private List<String> mergeInsertStatements(List<String> sqlStatements) {
Map<String, List<String>> groupedStatements = new LinkedHashMap<>();
Pattern pattern = Pattern.compile(
"INSERT\\s+INTO\\s+`?(\\w+)`?\\s*\\(([^)]+)\\)\\s*VALUES\\s*\\((.*)\\)",
Pattern.CASE_INSENSITIVE
);
for (String sql : sqlStatements) {
Matcher matcher = pattern.matcher(sql);
if (matcher.find()) {
String table = matcher.group(1);
String columns = matcher.group(2);
String values = matcher.group(3);
// 使用表名+列名作为分组键
String key = table + ":" + columns;
groupedStatements.computeIfAbsent(key, k -> new ArrayList<>()).add(values);
} else {
// 无法匹配的语句单独处理
groupedStatements.computeIfAbsent("_others", k -> new ArrayList<>()).add(sql);
}
}
List<String> mergedSql = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : groupedStatements.entrySet()) {
String key = entry.getKey();
List<String> valuesList = entry.getValue();
if ("_others".equals(key)) {
mergedSql.addAll(valuesList);
} else {
String[] parts = key.split(":", 2);
String table = parts[0];
String columns = parts[1];
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO `").append(table).append("` (").append(columns).append(") VALUES ");
for (int i = 0; i < valuesList.size(); i++) {
if (i > 0) {
sb.append(",");
}
sb.append("(").append(valuesList.get(i)).append(")");
}
sb.append(";");
mergedSql.add(sb.toString());
}
}
return mergedSql;
}
// =========================== 内部类定义 ===========================
@ -1437,6 +1288,154 @@ public class TsTaskServiceImpl extends ServiceImpl<TsTaskMapper, TsTask> impleme
}
}
@Transactional
public boolean importTaskSqlFromZip(MultipartFile file,
String taskCode,
int localStorageId,
int backupStorageId) throws IOException {
if ( file == null || file.isEmpty()) {
throw new IllegalArgumentException("ZIP 文件不能为空");
}
long startTime = System.currentTimeMillis();
LOGGER.info("开始导入任务 SQLtaskCode={}, 文件名={}", taskCode, file.getOriginalFilename());
// 1. 获取原始任务编号
String originalFilename = file.getOriginalFilename();
String originalTaskCode = extractTaskCodeFromFileName(originalFilename); // 例如 00002
LOGGER.debug("原始任务编号: {}", originalTaskCode);
// 2. 解压 ZIP 并获取 SQL 文件内容
byte[] zipBytes = file.getBytes();
List<SqlFileContent> sqlFiles = extractSqlFromZip(zipBytes);
LOGGER.info("解压完成,共找到 {} 个 SQL 文件", sqlFiles.size());
// 3. 替换 SQL 中的任务编号存储空间
List<String> allSqlStatements = new ArrayList<>();
for (SqlFileContent sqlFile : sqlFiles) {
String modifiedContent = replaceTaskCodeInSql(sqlFile.getContent(), originalTaskCode, taskCode, localStorageId, backupStorageId);
allSqlStatements.addAll(parseSqlStatements(modifiedContent));
LOGGER.debug("处理 SQL 文件 长度 {} 字符", modifiedContent.length());
}
LOGGER.info("SQL 替换完成,总语句数: {}", allSqlStatements.size());
// 4. 拆分 DDLTS_TASKTS_NODESTS_FILES
List<String> ddlSqlList = new ArrayList<>();
String taskInsertSql = null;
List<String> nodeInsertSqls = new ArrayList<>();
List<String> fileInsertSqls = new ArrayList<>();
for (String sql : allSqlStatements) {
String upper = sql.trim().toUpperCase();
if (upper.startsWith("CREATE") || upper.startsWith("DROP") || upper.startsWith("ALTER") || upper.startsWith("SET")) {
ddlSqlList.add(sql);
} else if (upper.startsWith("INSERT INTO TS_TASK")) {
taskInsertSql = sql;
} else if (upper.startsWith("INSERT INTO TS_NODES")) {
nodeInsertSqls.add(sql);
} else if (upper.startsWith("INSERT INTO TS_FILES_")) {
fileInsertSqls.add(sql);
}
}
LOGGER.info("拆分 SQL 完成DDL {} 条TS_TASK {} 条TS_NODES {} 条TS_FILES {} 条",
ddlSqlList.size(),
taskInsertSql != null ? 1 : 0,
nodeInsertSqls.size(),
fileInsertSqls.size());
// 5. 执行 DDL
if (!ddlSqlList.isEmpty()) {
LOGGER.info("开始执行 {} 条 DDL 语句", ddlSqlList.size());
executeSqlImport(ddlSqlList);
LOGGER.info("DDL 执行完成");
}
// 6. 执行 TS_TASK
if (taskInsertSql != null) {
LOGGER.info("插入 TS_TASK 记录");
jdbcTemplate.update(taskInsertSql);
LOGGER.info("TS_TASK 插入完成");
}
// 7. 执行 TS_NODES
if(!nodeInsertSqls.isEmpty()) {
LOGGER.info("插入 TS_NODES {} 条", nodeInsertSqls.size());
for (String nodeSql : nodeInsertSqls) {
jdbcTemplate.update(nodeSql);
}
LOGGER.info("TS_NODES 插入完成");
}
// 8. 流式分批插入 TS_FILES
if (!fileInsertSqls.isEmpty()) {
LOGGER.info("开始分批插入 TS_FILES共 {} 条", fileInsertSqls.size());
int batchSize = 2000; // 可调
List<String> batch = new ArrayList<>(batchSize);
int batchCount = 0;
for (String fileSql : fileInsertSqls) {
batch.add(fileSql);
if (batch.size() >= batchSize) {
batchCount++;
String mergedSql = mergeFileInsert(batch);
jdbcTemplate.update(mergedSql);
LOGGER.info("已插入 TS_FILES 第 {} 批 ({} 条记录)", batchCount, batch.size());
batch.clear();
}
}
if (!batch.isEmpty()) {
batchCount++;
String mergedSql = mergeFileInsert(batch);
jdbcTemplate.update(mergedSql);
LOGGER.info("已插入 TS_FILES 第 {} 批 ({} 条记录)", batchCount, batch.size());
}
LOGGER.info("TS_FILES 插入完成,共 {} 批", batchCount);
}
long endTime = System.currentTimeMillis();
LOGGER.info("任务 SQL 导入完成taskCode={},耗时 {} ms", taskCode, (endTime - startTime));
return true;
}
/**
* 合并同一表的 INSERT VALUES
*/
private String mergeFileInsert(List<String> insertSqls) {
if (insertSqls.isEmpty()) return "";
String first = insertSqls.get(0);
Matcher matcher = Pattern.compile(
"INSERT\\s+INTO\\s+`?(\\w+)`?\\s*\\(([^)]+)\\)\\s*VALUES\\s*\\((.*)\\)",
Pattern.CASE_INSENSITIVE
).matcher(first);
if (!matcher.find()) {
return String.join(";", insertSqls); // 无法解析原样返回
}
String table = matcher.group(1);
String columns = matcher.group(2);
StringBuilder sb = new StringBuilder("INSERT IGNORE INTO `").append(table).append("` (")
.append(columns).append(") VALUES ");
for (int i = 0; i < insertSqls.size(); i++) {
Matcher m = Pattern.compile(
"INSERT\\s+INTO\\s+`?\\w+`?\\s*\\([^)]*\\)\\s*VALUES\\s*\\((.*)\\)",
Pattern.CASE_INSENSITIVE
).matcher(insertSqls.get(i));
if (m.find()) {
if (i > 0) sb.append(",");
sb.append("(").append(m.group(1)).append(")");
}
}
sb.append(";");
return sb.toString();
}
}