1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
|
@Service @Slf4j public class FileAreaService extends BaseService {
@Resource(name = "fileSendThreadPool") private ThreadPoolExecutor executor;
@Resource(name = "fileSendToolThreadPool") private ThreadPoolExecutor executorTool;
@Resource(name = "restTemplateIgnoreSSL") RestTemplate restTemplate;
@Value("${file.accept.addr}") private String targetServerUrl;
@Value("${file.send.chunkSize:5}") private int chunkSize;
@Value("${file.send.filePermission:750}") private int filePermissionAsOctal;
public void sendFileToOtherAreas(String userName, String filePath, String token, String targetPath) { log.info("文件传输任务开始 | user:{} | path:{} | targetPath:{}", userName, filePath, targetPath);
File file = new File(filePath); if (!file.exists() || file.isDirectory()) { log.error("文件不存在或为文件夹 | user:{} | path:{}", userName, filePath); throw new IllegalArgumentException("文件不存在或为文件夹"); } executor.execute(() -> { Path path = file.toPath(); String uploadId = initUploadSession(path.getFileName().toString(), token); try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) { int size = chunkSize * 1024 * 1024; long totalChunks = (file.length() + size - 1) / size;
if (totalChunks == 0) { log.warn("文件大小为0,不支持分片上传,自动创一个空文件 | user:{} | path:{} | targetPath:{}", userName, filePath, targetPath); } CompletableFuture.allOf(LongStream.range(0, totalChunks) .parallel() .mapToObj(chunkIndex -> CompletableFuture.runAsync(() -> sendOptimizedChunk(userName, filePath, uploadId, channel, chunkIndex, size, token), executorTool)).toArray(CompletableFuture[]::new)).join(); ResultBean resultBean = completeUpload(uploadId, userName, token, path, targetPath, filePermissionAsOctal); String msg; if (Objects.requireNonNull(resultBean).getCode() == 0) { msg = "文件:" + path.getFileName() + "已同步至文件柜" + resultBean.getData() + ",请您打开文件柜查看。"; log.info("文件传输完成 | uploadId:{} | user:{} | path:{} | targetPath:{}", uploadId, userName, filePath, targetPath); sendMsg(userName, msg); } else { msg = "文件:" + file.getName() + "传输合并失败,请稍后重试!"; log.info("文件传输失败 | uploadId:{} | user:{} | path:{} | targetPath:{}", uploadId, userName, filePath, targetPath); sendMsg(userName, msg); } } catch (Exception e) { log.error("文件传输任务执行失败", e); sendMsg(userName, "文件:" + file.getName() + "传输失败,请稍后重试!"); } }); }
private void sendOptimizedChunk(String userName, String filePath, String uploadId, FileChannel channel, long chunkIndex, int chunkSize, String token) { MappedByteBuffer buffer = null; byte[] bufferArray = null; try { long position = chunkIndex * chunkSize; long size = Math.min(chunkSize, channel.size() - position);
buffer = channel.map( FileChannel.MapMode.READ_ONLY, position, size ); bufferArray = new byte[buffer.remaining()]; buffer.get(bufferArray);
HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.MULTIPART_FORM_DATA); headers.add("X-Chunk-Offset", String.valueOf(position)); headers.setContentLength(buffer.limit()); headers.add("X-Upload-Id", uploadId);
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>(); body.add("chunk", new ByteArrayResource(bufferArray) { @Override public String getFilename() { return "chunk_" + chunkIndex + ".dat"; } }); body.add("uploadId", uploadId); body.add("chunkIndex", chunkIndex); executeWithRetry(() -> { ResponseEntity<ResultBean> responseEntity = restTemplate.exchange( targetServerUrl + "/uploadChunk?token=" + token, HttpMethod.POST, new HttpEntity<>(body, headers), ResultBean.class ); if (responseEntity.getStatusCode() != HttpStatus.OK) { log.error("文件传输失败 | uploadId:{} | user:{} | path:{}", uploadId, userName, filePath); throw new RuntimeException("文件传输失败"); } ResultBean resultBean = responseEntity.getBody(); if (resultBean != null && resultBean.getCode() != 0) { log.error("文件传输失败 | uploadId:{} | user:{} | path:{}", uploadId, userName, filePath); throw new RuntimeException("文件传输失败"); } else { log.info("文件传输成功 | uploadId:{} | user:{} | path:{}", uploadId, userName, filePath); } return null; }, 3, 1000L); } catch (Exception e) { throw new RuntimeException("内存映射失败", e); } finally { if (buffer != null) { ((DirectBuffer) buffer).cleaner().clean(); } } }
private void executeWithRetry(Callable<Void> action, int maxRetries, long initialDelay) throws Exception { int retryCount = 0; while (true) { try { action.call(); return; } catch (Exception e) { log.warn("操作失败,重试次数: {}", retryCount, e); if (retryCount++ >= maxRetries) { log.error("操作失败,达到最大重试次数: {}", maxRetries, e); throw e; } try { Thread.sleep(initialDelay * (long) Math.pow(2, retryCount)); } catch (InterruptedException ie) { log.error("线程中断,终止重试", ie); Thread.currentThread().interrupt(); throw new RuntimeException("传输中断", ie); } } } }
private ResultBean completeUpload(String uploadId, String userName, String token, Path path, String targetPath, int filePermissionAsOctal) { String fileHash; try { fileHash = FileUtil.getFileHash(path); } catch (IOException e) { log.error("获取文件哈希失败", e); throw new RuntimeException(e); } String fileName = path.getFileName().toString(); long fileSize = path.toFile().length();
String url = targetServerUrl + "/completeUpload?token=" + token; CompleteRequest completeRequest = new CompleteRequest(uploadId, userName, fileHash, fileSize, fileName, filePermissionAsOctal, targetPath); return restTemplate.postForObject(url, completeRequest, ResultBean.class); }
public void sendMsg(String userName, String mgs) {
}
private String initUploadSession(String fileName, String token) { String url = targetServerUrl + "/initUpload?token=" + token + "&fileName=" + fileName; ResultBean resultBean = restTemplate.postForObject(url, null, ResultBean.class); String uploadIds = Objects.requireNonNull(resultBean).getData().toString(); if (StringUtils.isBlank(uploadIds)) { log.error("文件传输初始化失败 | fileName:{}", fileName); throw new RuntimeException("文件传输初始化失败 | fileName:" + fileName); } return uploadIds; }
public void sendFileToIDM(FileOperation fileOperation, String token, String loginUser) { if (fileOperation == null || fileOperation.getSource() == null || fileOperation.getSource().isEmpty()) { log.warn("没有需要下载的文件"); return; } String targetDir = fileOperation.getTarget().getCanonicalPath(); log.debug("准备下载文件到目录: {}", targetDir); int partSize = chunkSize * 1024 * 1024; for (FileInfo fileInfo : fileOperation.getSource()) { String filePath = fileInfo.getCanonicalPath(); String fileName = fileInfo.getName(); String downloadUrl = targetServerUrl + "/download?token=" + token + "&filePath=" + filePath; File targetFile = new File(targetDir, fileName); if (targetFile.exists()) { LocalDateTime now = LocalDateTime.now(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); String nowStr = now.format(formatter); int lastDotIndex = fileName.lastIndexOf('.'); String baseName, extension; if (lastDotIndex != -1) { baseName = fileName.substring(0, lastDotIndex); extension = fileName.substring(lastDotIndex); targetFile = Paths.get(targetDir, baseName + "_" + nowStr + "_new" + extension).toFile(); } else { targetFile = Paths.get(targetDir, fileName + "_" + nowStr + "_new").toFile(); } } try { String targetFileAbsolutePath = targetFile.getAbsolutePath(); log.info("开始处理文件下载: 源文件={}, 目标文件={}", filePath, targetFileAbsolutePath);
HttpHeaders headers = new HttpHeaders(); headers.set("Range", "bytes=0-0"); ResponseEntity<byte[]> resp = restTemplate.exchange(downloadUrl, HttpMethod.GET, new HttpEntity<>(headers), byte[].class); long fileSize; List<String> cr = resp.getHeaders().get("Content-Range"); if (cr != null && !cr.isEmpty()) { String[] arr = cr.get(0).split("/"); fileSize = Long.parseLong(arr[1]); log.info("文件大小检测成功: {} bytes", fileSize); } else { log.warn("服务器不支持Range请求,将使用单线程下载: {}", fileName); if (resp.getBody() != null) { Files.write(targetFile.toPath(), resp.getBody()); log.info("文件单线程下载完成: {}", targetFileAbsolutePath); } continue; }
long totalParts = (fileSize + partSize - 1) / partSize; log.info("文件分片信息 - 大小: {} bytes, 分片大小: {} bytes, 总分片数: {}", fileSize, partSize, totalParts); try (RandomAccessFile raf = new RandomAccessFile(targetFile, "rw")) { raf.setLength(fileSize); List<CompletableFuture<Void>> futures = new ArrayList<>(); for (int i = 0; i < totalParts; i++) { final int partIdx = i; futures.add(CompletableFuture.runAsync(() -> { long start = partIdx * partSize; long end = Math.min(fileSize - 1, (partIdx + 1) * partSize - 1); HttpHeaders partHeaders = new HttpHeaders(); partHeaders.set("Range", "bytes=" + start + "-" + end); for (int retry = 0; retry < 3; retry++) { try { ResponseEntity<byte[]> partResp = restTemplate.exchange(downloadUrl, HttpMethod.GET, new HttpEntity<>(partHeaders), byte[].class); byte[] data = partResp.getBody(); if (data != null) { synchronized (raf) { raf.seek(start); raf.write(data); } log.debug("分片下载成功: 分片{} ({} - {})", partIdx, start, end); break; } } catch (Exception ex) { if (retry == 2) { log.error("分片下载失败: 分片{} ({} - {}), 错误: {}", partIdx, start, end, ex.getMessage()); throw new RuntimeException("分片下载失败", ex); } try { Thread.sleep(500L * (retry + 1)); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); log.error("线程被中断: {}", ignored.getMessage()); } } } }, executorTool)); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } updatePermission(loginUser, filePermissionAsOctal, targetFile.toPath()); log.info("文件多线程分片下载成功: {}", targetFileAbsolutePath); Node node = elfinderConfiguration.getVolumes().get(0); String targetPath = targetFile.getAbsolutePath().replace(node.getPath() + "/", "").replace(loginUser, node.getAlias()); sendMsg(loginUser, "文件:" + fileName + "已同步至文件柜" + targetPath + ",请您打开文件柜查看。"); } catch (Exception e) { log.error("文件分片下载异常: {}", fileName, e); sendMsg(loginUser, "文件:" + fileName + "下载失败,请稍后重试!"); } } }
private void updatePermission(String fileOwner, int filePermissionAsOctal, Path outputPath) { String permissionStr = String.valueOf(filePermissionAsOctal); PermissionBean permissionBean = new PermissionBean(); permissionBean.setUser(fileOwner); permissionBean.setChmod(permissionStr); changePermission(permissionBean, outputPath.toString(), fileOwner); } }
|