diff --git a/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsFilesServiceImpl.java b/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsFilesServiceImpl.java index 1132d46..39c680e 100644 --- a/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsFilesServiceImpl.java +++ b/java/src/main/java/com/yfd/platform/modules/experimentalData/service/impl/TsFilesServiceImpl.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; @@ -41,7 +42,6 @@ import com.yfd.platform.component.WebSocketServer; import com.yfd.platform.config.ResponseResult; import com.yfd.platform.modules.experimentalData.domain.*; import com.yfd.platform.modules.experimentalData.mapper.TsFilesMapper; -import com.yfd.platform.modules.experimentalData.mapper.TsNodesMapper; import com.yfd.platform.modules.experimentalData.mapper.TsTaskMapper; import com.yfd.platform.modules.experimentalData.service.ITsFilesService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -89,8 +89,6 @@ import java.util.zip.*; import org.apache.commons.codec.binary.Hex; -import static java.lang.Float.parseFloat; - /** *

@@ -5415,7 +5413,7 @@ public class TsFilesServiceImpl extends ServiceImpl impl * 返回值说明: com.yfd.platform.config.ResponseResult ***********************************/ @Override - public void batchSendNaviOutDataJob(String id, int samTimes, String token, String taskId,String configId) { + public void batchSendNaviOutDataJob(String id, int samTimes, String token, String taskId, String configId) { try { TsTask tsTask = tsTaskMapper.selectById(taskId); @@ -5424,161 +5422,39 @@ public class TsFilesServiceImpl extends ServiceImpl impl //判断文件后缀是.txt还是.csv String fileName = tsFiles.getFileName(); - String suffix = fileName.substring(fileName.lastIndexOf(".") + 1).toLowerCase(); - if ("txt".equals(suffix)) { - if (currentTaskFuture != null && !currentTaskFuture.isDone()) { - currentTaskFuture.cancel(true); // 中断之前的任务 - } - currentTaskFuture = executorService.submit(() -> { - StorageSourceConfig config = getStorageSourceConfig("filePath", "local", tsTask.getLocalStorageId()); - - - Path filePath = Paths.get(config.getValue() + tsFiles.getWorkPath() + tsFiles.getFileName()); - - try (BufferedReader reader = new BufferedReader(new FileReader(filePath.toFile()))) { - reader.readLine(); // 跳过标题行 - - - - SimpleNaviData data1 = null; - int result = 0; - // 读取第二行数据 - String secondLine = reader.readLine(); - if (secondLine != null) { - secondLine = secondLine.trim(); - String[] valuesSecond = secondLine.split("\t"); - data1 = parseLine(valuesSecond); - - } - - // 读取第三行数据 - String thirdLine = reader.readLine(); - if (thirdLine != null) { - thirdLine = thirdLine.trim(); - String[] valuesThird = thirdLine.split("\t"); - SimpleNaviData data2 = parseLine(valuesThird); - - // 假设 UtcTime 是 String 类型,首先转换为 double 类型 - double time1 = Double.parseDouble(data1.getUtcTime()); // 如果 getUtcTime() 返回 String 类型 - double time2 = Double.parseDouble(data2.getUtcTime()); // 如果 getUtcTime() 返回 String 类型 - // 保留小数点后六位 - double formattedTime1 = Double.parseDouble(String.format("%.6f", time1)); - double formattedTime2 = Double.parseDouble(String.format("%.6f", time2)); - - // 计算差值 - double diff = formattedTime2 - formattedTime1; - // 确保差值不为零,以避免除以零错误 - if (diff != 0) { - result = (int) Math.floor(1 / diff); - } else { - result = 1; - } - } - - - // 不管 samTimes 如何,先读取第一行数据 - String firstLine = reader.readLine(); - if (firstLine != null) { - firstLine = firstLine.trim(); - String[] values = firstLine.split("\t"); - if (values.length >= 40) { - // 发送第一行数据 - sendData(token, values, 1); - } else { - System.err.println("忽略不完整行: " + firstLine); - } - } - //todo 这个地方该从一下 换成200 - int step = samTimes * result; // 计算行号间隔 - int lineCount = 1; // 从第一行开始 - - - - while (!Thread.currentThread().isInterrupted()) { - lineCount += step; // 直接跳到下一个需要读取的行 - String line = null; - - // 读取指定行 - for (int i = 0; i < step; i++) { - line = reader.readLine(); - if (line == null) { - break; // 文件结束 - } - } - - if (line == null) { - break; // 文件结束 - } - - line = line.trim(); - String[] values = line.split("\t"); - if (values.length < 40) { - LOGGER.info("忽略不完整行: " + line); - continue; - } - - // 发送数据 - sendData(token, values, lineCount); - - // 固定休眠 3 秒 - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - // 如果休眠期间被中断,则退出循环 - Thread.currentThread().interrupt(); - break; - } - } - - // 发送完成信号(如果需要) - if (!Thread.currentThread().isInterrupted()) { - // ServerSendEventServer.sendMessage(token, "COMPLETED"); - } - - } catch (Exception e) { - // 捕获异常并记录日志 - LOGGER.error("任务执行失败: " + e.getMessage()); - e.printStackTrace(); - } - }); - }else{ - - TsFiles tsFilesData = tsFilesMapper.selectById(configId); - if (currentTaskFuture != null && !currentTaskFuture.isDone()) { - currentTaskFuture.cancel(true); - } - - currentTaskFuture = executorService.submit(() -> { - try { - // 1. 获取配置文件路径 - StorageSourceConfig config = getStorageSourceConfig("filePath", "local", tsTask.getLocalStorageId()); - Path basePath = Paths.get(config.getValue() + tsFilesData.getWorkPath()); - - // 2. 读取配置文件(假设配置文件名为 trj_config.txt) - Path configPath = basePath.resolve("trj_config.txt"); - Map columnMapping = parseConfigFile(configPath); - - String timeColumn = columnMapping.get("time"); - String lonColumn = columnMapping.get("lon"); - String latColumn = columnMapping.get("lat"); - String hgtColumn = columnMapping.get("hgt"); - - LOGGER.info("列映射: time={}, lon={}, lat={}, hgt={}", - timeColumn, lonColumn, latColumn, hgtColumn); - - // 3. 获取数据文件路径 - Path dataPath = basePath.resolve("VINS.csv"); - - // 4. 处理数据文件 - processDataFile(dataPath, samTimes, token, timeColumn, lonColumn, latColumn, hgtColumn); - - } catch (Exception e) { - LOGGER.error("任务执行失败: {}", e.getMessage(), e); - } - }); + TsFiles tsFilesData = tsFilesMapper.selectById(configId); + if (currentTaskFuture != null && !currentTaskFuture.isDone()) { + currentTaskFuture.cancel(true); } + currentTaskFuture = executorService.submit(() -> { + try { + // 1. 获取配置文件路径 + StorageSourceConfig config = getStorageSourceConfig("filePath", "local", tsTask.getLocalStorageId()); + Path basePath = Paths.get(config.getValue() + tsFilesData.getWorkPath()); + // 2. 读取配置文件(假设配置文件名为 trj_config.txt) + Path configPath = basePath.resolve(tsFilesData.getFileName()); + Map columnMapping = parseConfigFile(configPath, token); + + String timeColumn = columnMapping.get("time"); + String lonColumn = columnMapping.get("lon"); + String latColumn = columnMapping.get("lat"); + String hgtColumn = columnMapping.get("hgt"); + + LOGGER.info("列映射: time={}, lon={}, lat={}, hgt={}", + timeColumn, lonColumn, latColumn, hgtColumn); + + // 3. 获取数据文件路径 + Path dataPath = basePath.resolve(tsFiles.getFileName()); + + // 4. 处理数据文件 + processDataFile(dataPath, samTimes, token, timeColumn, lonColumn, latColumn, hgtColumn); + + } catch (Exception e) { + LOGGER.error("任务执行失败: {}", e.getMessage(), e); + } + }); } catch (Exception e) { @@ -5590,43 +5466,40 @@ public class TsFilesServiceImpl extends ServiceImpl impl } - // 封装发送数据的逻辑 - private void sendData(String token, String[] values, int lineCount) { - if (Thread.currentThread().isInterrupted()) { - return; - } - - try { - SimpleNaviData data = parseLine(values); - String jsonData = JSONUtil.toJsonStr(data); // 假设 JSONUtil.toJsonStr 不会抛出 IOException - ServerSendEventServer.sendMessageById(token, jsonData); // 假设 sendMessage 不会抛出 IOException - LOGGER.info("Line " + lineCount + " sent at: " + System.currentTimeMillis()); - } catch (Exception e) { - // 捕获所有可能的异常 - LOGGER.error("发送数据失败: " + e.getMessage()); - if (e.getCause() instanceof InterruptedException) { - Thread.currentThread().interrupt(); // 重新设置中断状态 - } - } - } - - // 辅助方法:解析行数据 - private SimpleNaviData parseLine(String[] values) { - SimpleNaviData data = new SimpleNaviData(); - data.setUtcTime(getValueSafely(values, 8, "0.0")); // UTC_TIME(索引8) - data.setLat(getValueSafely(values, 17, "0.0")); // LAT(索引17) - data.setLon(getValueSafely(values, 18, "0.0")); // LON(索引18) - data.setAlt(getValueSafely(values, 19, "0.0")); // ALT(索引19) - return data; - } - - // 安全获取数组值的方法 - private String getValueSafely(String[] values, int index, String defaultValue) { - return (index < values.length) ? values[index] : defaultValue; - } - - - +// // 封装发送数据的逻辑 +// private void sendData(String token, String[] values, int lineCount) { +// if (Thread.currentThread().isInterrupted()) { +// return; +// } +// +// try { +// SimpleNaviData data = parseLine(values); +// String jsonData = JSONUtil.toJsonStr(data); // 假设 JSONUtil.toJsonStr 不会抛出 IOException +// ServerSendEventServer.sendMessageById(token, jsonData); // 假设 sendMessage 不会抛出 IOException +// LOGGER.info("Line " + lineCount + " sent at: " + System.currentTimeMillis()); +// } catch (Exception e) { +// // 捕获所有可能的异常 +// LOGGER.error("发送数据失败: " + e.getMessage()); +// if (e.getCause() instanceof InterruptedException) { +// Thread.currentThread().interrupt(); // 重新设置中断状态 +// } +// } +// } +// +// // 辅助方法:解析行数据 +// private SimpleNaviData parseLine(String[] values) { +// SimpleNaviData data = new SimpleNaviData(); +// data.setUtcTime(getValueSafely(values, 8, "0.0")); // UTC_TIME(索引8) +// data.setLat(getValueSafely(values, 17, "0.0")); // LAT(索引17) +// data.setLon(getValueSafely(values, 18, "0.0")); // LON(索引18) +// data.setAlt(getValueSafely(values, 19, "0.0")); // ALT(索引19) +// return data; +// } +// +// // 安全获取数组值的方法 +// private String getValueSafely(String[] values, int index, String defaultValue) { +// return (index < values.length) ? values[index] : defaultValue; +// } /** @@ -5636,7 +5509,7 @@ public class TsFilesServiceImpl extends ServiceImpl impl * @return 列名映射Map(time, lon, lat, hgt) * @throws IOException 文件读取错误 */ - public Map parseConfigFile(Path configFilePath) throws IOException { + public Map parseConfigFile(Path configFilePath, String token) throws IOException { Map columnMapping = new HashMap<>(); try (BufferedReader reader = Files.newBufferedReader(configFilePath, StandardCharsets.UTF_8)) { @@ -5665,7 +5538,13 @@ public class TsFilesServiceImpl extends ServiceImpl impl // 验证是否获取到所有必需的列 if (columnMapping.size() < 4) { - throw new IOException("配置文件缺少必需的列定义"); + JSONObject jsonResponse = new JSONObject(); + jsonResponse.put("message", "配置文件选择错误,请重新选择!"); + + // 将 JSON 数据发送到客户端 + ServerSendEventServer.sendMessageById(token, jsonResponse.toString()); + + //throw new IOException("配置文件选择错误,请重新选择!"); } return columnMapping; @@ -5675,6 +5554,7 @@ public class TsFilesServiceImpl extends ServiceImpl impl /** * 处理数据文件 */ + private void processDataFile(Path dataPath, int samTimes, String token, String timeColumn, String lonColumn, String latColumn, String hgtColumn) @@ -5688,48 +5568,58 @@ public class TsFilesServiceImpl extends ServiceImpl impl return; } - // 2. 解析标题行 - 使用逗号分隔 - String[] headers = headerLine.split(","); // 改为逗号分隔 + // 2. 自动检测分隔符(关键改进) + String delimiter = detectDelimiter(headerLine); + LOGGER.info("检测到分隔符: {}", delimiter.replace("\t", "\\t").replace(" ", "\\s")); - // 添加标题行调试日志 + // 3. 解析标题行 + String[] headers = headerLine.split(delimiter); LOGGER.info("数据文件标题行包含 {} 列", headers.length); LOGGER.debug("完整标题行: {}", String.join("|", headers)); Map columnIndices = new HashMap<>(); - // 3. 查找列索引(修复后的逻辑) + // 4. 查找列索引(改进:大小写不敏感+空格处理) for (int i = 0; i < headers.length; i++) { String header = headers[i].trim(); - // 使用 equalsIgnoreCase 进行不区分大小写的匹配 - if (header.equalsIgnoreCase(timeColumn)) { + // 标准化列名:移除空格和下划线 + String normalizedHeader = header.replaceAll("[_\\s]", "") + .toLowerCase(); + + if (normalizedHeader.equals(timeColumn.replaceAll("[_\\s]", "").toLowerCase())) { columnIndices.put("time", i); - LOGGER.info("找到 time 列 '{}' 在位置 {}", header, i); } - if (header.equalsIgnoreCase(lonColumn)) { + if (normalizedHeader.equals(lonColumn.replaceAll("[_\\s]", "").toLowerCase())) { columnIndices.put("lon", i); - LOGGER.info("找到 lon 列 '{}' 在位置 {}", header, i); } - if (header.equalsIgnoreCase(latColumn)) { + if (normalizedHeader.equals(latColumn.replaceAll("[_\\s]", "").toLowerCase())) { columnIndices.put("lat", i); - LOGGER.info("找到 lat 列 '{}' 在位置 {}", header, i); } - if (header.equalsIgnoreCase(hgtColumn)) { + if (normalizedHeader.equals(hgtColumn.replaceAll("[_\\s]", "").toLowerCase())) { columnIndices.put("hgt", i); - LOGGER.info("找到 hgt 列 '{}' 在位置 {}", header, i); } } - // 4. 验证列是否存在 + // 5. 验证列是否存在(改进:提供更友好的错误信息) List missingColumns = new ArrayList<>(); - if (!columnIndices.containsKey("time")) missingColumns.add(timeColumn); - if (!columnIndices.containsKey("lon")) missingColumns.add(lonColumn); - if (!columnIndices.containsKey("lat")) missingColumns.add(latColumn); - if (!columnIndices.containsKey("hgt")) missingColumns.add(hgtColumn); + if (!columnIndices.containsKey("time")) { + missingColumns.add("时间列(" + timeColumn + ")"); + LOGGER.warn("未找到时间列,尝试的列名: {}", timeColumn); + } + if (!columnIndices.containsKey("lon")) { + missingColumns.add("经度列(" + lonColumn + ")"); + } + if (!columnIndices.containsKey("lat")) { + missingColumns.add("纬度列(" + latColumn + ")"); + } + if (!columnIndices.containsKey("hgt")) { + missingColumns.add("高度列(" + hgtColumn + ")"); + } if (!missingColumns.isEmpty()) { - LOGGER.error("以下列在数据文件中不存在: {}", String.join(", ", missingColumns)); - LOGGER.error("数据文件标题行: {}", headerLine); + LOGGER.error("缺少必需列: {}", String.join(", ", missingColumns)); + LOGGER.error("可用列: {}", String.join(", ", headers)); return; } @@ -5741,15 +5631,19 @@ public class TsFilesServiceImpl extends ServiceImpl impl LOGGER.info("列索引: time={}, lon={}, lat={}, hgt={}", timeIndex, lonIndex, latIndex, hgtIndex); - // 5. 读取并处理数据行 + // 6. 计算实际步长(关键修复) + int step = samTimes * 200; // 与之前逻辑保持一致 + if (step <= 0) step = 1; // 避免除零错误 + + // 7. 读取并处理数据行 String line; int lineCount = 0; + int sendCount = 0; int lineCountData = 0; - float timeValue0 = 0; float timeValue1 = 0; - int result = 0; // + int result = 0; // while (!Thread.currentThread().isInterrupted() && (line = reader.readLine()) != null) { lineCountData++; @@ -5768,7 +5662,7 @@ public class TsFilesServiceImpl extends ServiceImpl impl float data = timeValue1 - timeValue0; // 计算 1 / data if (data != 0) { // 确保避免除以零 - result = (int) Math.floor(1 / data); + result = (int) Math.floor(1 / data); } else { result = 1; } @@ -5776,32 +5670,36 @@ public class TsFilesServiceImpl extends ServiceImpl impl } - - while (!Thread.currentThread().isInterrupted() && (line = reader.readLine()) != null) { lineCount++; - // 使用逗号分隔数据行 - String[] values = line.split(","); // 改为逗号分隔 + // 跳过空行 + if (line.trim().isEmpty()) continue; + + // 使用检测到的分隔符 + String[] values = line.split(delimiter); + + // 跳过不符合步长的行(第一行总是发送) if (lineCount > 1 && (lineCount - 1) % (samTimes * result) != 0) { continue; } - - - // 检查数据完整性 - int maxIndex = getMaxIndex(timeIndex, lonIndex, latIndex, hgtIndex); + // 检查数据完整性(动态计算最大索引) + int maxIndex = Math.max(Math.max(timeIndex, lonIndex), + Math.max(latIndex, hgtIndex)); if (values.length <= maxIndex) { - LOGGER.warn("行 {} 数据不完整: {}", lineCount, line); + LOGGER.warn("行 {} 数据不完整 (需要 {} 列, 实际 {} 列): {}", + lineCount, maxIndex + 1, values.length, line); continue; } // 发送数据 sendDataCsv(token, values, lineCount, timeIndex, lonIndex, latIndex, hgtIndex); + sendCount++; // 每次发送后都休眠3秒 try { - Thread.sleep(3000); + Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -5809,11 +5707,71 @@ public class TsFilesServiceImpl extends ServiceImpl impl } if (!Thread.currentThread().isInterrupted()) { - LOGGER.info("文件处理完成,共处理 {} 行", lineCount); + LOGGER.info("文件处理完成: 总行数={}, 发送行数={}", lineCount, sendCount); } } } + // 自动检测分隔符(支持两种格式) + private String detectDelimiter(String headerLine) { + // 优先检查制表符 + if (headerLine.contains("\t")) { + return "\t"; + } + + // 检查逗号 + if (headerLine.contains(",")) { + return ","; + } + + // 检查空格(多个空格) + if (headerLine.contains(" ")) { + return "\\s+"; + } + + // 默认使用制表符 + return "\t"; + } + + // 安全获取数组值的方法 + private String getValueSafely(String[] values, int index, String defaultValue) { + return (index < values.length) ? values[index] : defaultValue; + } + + + // 改进的发送方法(处理各种数据格式) + private void sendDataCsv(String token, String[] values, int lineCount, + int timeIndex, int lonIndex, + int latIndex, int hgtIndex) { + try { + SimpleNaviData data = new SimpleNaviData(); + + // 处理可能带空格的数值 + data.setUtcTime(values[timeIndex].trim()); + data.setLon(parseDoubleSafe(values[lonIndex].trim(), 0.0)); + data.setLat(parseDoubleSafe(values[latIndex].trim(), 0.0)); + data.setAlt(parseDoubleSafe(values[hgtIndex].trim(), 0.0)); + + String jsonData = JSONUtil.toJsonStr(data); + ServerSendEventServer.sendMessageById(token, jsonData); + LOGGER.info("发送行 {}: UTC={}, Lon={}, Lat={}, Alt={}", + lineCount, data.getUtcTime(), data.getLon(), + data.getLat(), data.getAlt()); + } catch (Exception e) { + LOGGER.error("发送行 {} 失败: {}", lineCount, e.getMessage()); + } + } + + // 安全解析double值 + private String parseDoubleSafe(String value, double defaultValue) { + try { + return String.valueOf(Double.parseDouble(value)); + } catch (NumberFormatException e) { + LOGGER.warn("数值解析失败: '{}', 使用默认值 {}", value, defaultValue); + return String.valueOf(defaultValue); + } + } + // 辅助方法:获取最大索引 private int getMaxIndex(int... indices) { int max = -1; @@ -5824,34 +5782,6 @@ public class TsFilesServiceImpl extends ServiceImpl impl } - private void sendDataCsv(String token, String[] values, int lineCount, - int timeIndex, int lonIndex, int latIndex, int hgtIndex) { - if (Thread.currentThread().isInterrupted()) { - return; - } - - try { - SimpleNaviData data = new SimpleNaviData(); - data.setUtcTime(getValueSafely(values, timeIndex, "0.0")); - data.setLon(getValueSafely(values, lonIndex, "0.0")); - data.setLat(getValueSafely(values, latIndex, "0.0")); - data.setAlt(getValueSafely(values, hgtIndex, "0.0")); - - String jsonData = JSONUtil.toJsonStr(data); - ServerSendEventServer.sendMessageById(token, jsonData); - LOGGER.info("Line {} sent at: {}", lineCount, System.currentTimeMillis()); - } catch (Exception e) { - LOGGER.error("发送数据失败: {}", e.getMessage()); - if (e.getCause() instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } - } - - - - - /** * 查询文件内容接口 * @@ -6236,7 +6166,7 @@ public class TsFilesServiceImpl extends ServiceImpl impl List listTsFilesPng = tsFilesMapper.selectList(queryWrapperPng); if (listTsFilesPng == null) { trajectoriesDto.setTsFilesListPng(new ArrayList<>()); - }else { + } else { for (TsFiles tsFiles1 : listTsFilesPng) { FileItemResult fileItemResult = new FileItemResult(); //获取图片url @@ -6268,7 +6198,7 @@ public class TsFilesServiceImpl extends ServiceImpl impl List listTsFilesTxt = tsFilesMapper.selectList(queryWrapperTxt); if (listTsFilesTxt == null) { trajectoriesDto.setTsFilesListTxt(new ArrayList<>()); - }else{ + } else { for (TsFiles tsFiles1 : listTsFilesTxt) { //读物文件内容 StorageSourceConfig config = getStorageSourceConfig("filePath", "local", tsTask.getLocalStorageId()); @@ -6303,7 +6233,7 @@ public class TsFilesServiceImpl extends ServiceImpl impl List listTsFilesConfig = tsFilesMapper.selectList(queryWrapperConfig); if (listTsFilesConfig == null) { trajectoriesDto.setTsFilesListConfig(new ArrayList<>()); - }else { + } else { for (TsFiles tsFiles1 : listTsFilesConfig) { tsFilesListConfig.add(tsFiles1); } @@ -6346,8 +6276,6 @@ public class TsFilesServiceImpl extends ServiceImpl impl } - - }