多线程分片上传与下载文件功能说明

本文档详细说明了项目中多线程分片上传与下载文件的整体流程、关键实现、配置要点及注意事项。


一、功能概述

  • 分片上传:将大文件切分为多个分片,采用多线程并发上传,提升传输效率与稳定性。
  • 分片下载:支持多线程并发下载文件分片,边下载边写入本地指定位置,提升下载速度。
  • 断点续传:通过分片索引和唯一上传ID实现断点续传与失败重试。
  • 文件校验:上传合并后进行MD5校验,确保文件完整性。

二、分片上传流程

1. 客户端上传流程

  1. 初始化上传会话
    • 调用 /initUpload 接口,传入文件名和认证信息,服务端返回唯一 uploadId
  2. 分片并发上传
    • 客户端将文件按设定分片大小(如5MB)切分。
    • 多线程并发调用 /uploadChunk,每次上传一个分片,参数包含 uploadIdchunkIndex、分片内容等。
    • 支持失败重试机制。
  3. 合并分片
    • 所有分片上传完成后,调用 /completeUpload,服务端按顺序合并分片,校验MD5和文件大小,合并成功后返回目标路径。

2. 服务端关键接口说明

  • POST /initUpload
    • 生成唯一 uploadId,初始化分片缓存。
  • POST /uploadChunk
    • 接收单个分片,保存到临时目录 /tmp/chunks/{uploadId}/{chunkIndex}
  • POST /completeUpload
    • 按分片索引顺序合并所有分片,校验完整性,设置文件权限,清理临时分片。

3. 主要代码流程(参考 FileAreaService.javaFileAreaController.java

1
2
3
4
5
6
7
8
9
10
11
// 1. 初始化上传
String uploadId = initUploadSession(fileName, authInfo);

// 2. 多线程分片上传
CompletableFuture.allOf(LongStream.range(0, totalChunks)
.parallel()
.mapToObj(chunkIndex -> CompletableFuture.runAsync(() ->
sendOptimizedChunk(...), executorTool)).toArray(CompletableFuture[]::new)).join();

// 3. 合并分片
ResultBean resultBean = completeUpload(uploadId, userId, authInfo, path, targetPath, filePermissionAsOctal);

三、分片下载流程

1. 客户端下载流程

  1. 获取文件大小
    • 发送带 Range: bytes=0-0 的请求,获取 Content-Range 响应头,解析文件总大小。
  2. 多线程分片下载
    • 按分片大小计算总分片数。
    • 多线程并发请求带 Range: bytes=start-end 的接口,下载各自分片。
    • 每个线程写入本地文件指定位置(RandomAccessFile)。
    • 支持失败重试。
  3. 权限设置与通知
    • 下载完成后设置文件权限,发送通知消息。

2. 服务端接口说明

  • GET /download
    • 支持 Range 请求,返回指定字节区间的数据流,支持断点续传。

3. 主要代码流程(参考 FileAreaService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. 获取文件大小
HttpHeaders headers = new HttpHeaders();
headers.set("Range", "bytes=0-0");
ResponseEntity<byte[]> resp = restTemplate.exchange(downloadUrl, HttpMethod.GET, new HttpEntity<>(headers), byte[].class);
long fileSize = ...;

// 2. 多线程分片下载
for (int i = 0; i < totalParts; i++) {
CompletableFuture.runAsync(() -> {
// 请求分片并写入文件
}, executorTool);
}
CompletableFuture.allOf(...).join();

四、线程池配置

  • 发送和工具线程池分别配置,详见 FileSendThreadPoolConfig.java
  • 可根据CPU核心数和业务量调整线程池参数。

五、关键参数说明

  • chunkSize:分片大小(单位MB),默认5MB,可通过配置调整。
  • uploadId:每次上传的唯一标识,防止分片混淆。
  • filePermissionAsOctal:文件权限,上传/下载后设置。

六、异常处理与重试

  • 上传/下载分片均内置重试机制,支持指数退避。
  • 合并失败、校验失败等会自动清理临时分片并通知用户。

七、注意事项

  • /tmp/chunks 目录需有足够空间和权限,定期清理过期分片。
  • 文件名冲突时自动重命名,避免覆盖。
  • 上传/下载接口需支持大文件和高并发。

八、参考类与方法

  • FileAreaService.sendFileToOtherAreassendFileToRemote
  • FileAreaController.initUploaduploadChunkcompleteUploadfileDownload
  • FileSendThreadPoolConfig 线程池配置

九、常见问题

  • Q: 分片上传/下载失败如何排查?
    • 检查网络、分片大小、线程池配置、磁盘空间、服务端日志。
  • Q: 如何调整并发数?
    • 修改 fileSendToolThreadPool 相关配置。
  • Q: 如何支持断点续传?
    • 依赖 uploadId 和分片索引,客户端可记录已完成分片,失败后重试未完成部分。

十、核心代码与类说明

1. FileAreaController 主要分片上传/下载接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// FileAreaController.java 关键方法片段

@RestController()
@RequestMapping("/v1/area")
@Slf4j
@RequiredArgsConstructor
public class FileAreaController extends BaseController {

private final FileAreaService fileAreaService;

private final ElfinderController elfinderController;

private final Map<String, ChunkStorage> uploadSessions = new ConcurrentHashMap<>();


@PostMapping("/sendFile")
public ResultBean sendFile(@RequestBody FileOperation fileOperation) {
ResultBean response = new ResultBean();
try {
String authInfo = (String) request.getSession().getAttribute("auth_token");
String userId = getResfulLoginUser(authInfo);
String secureToken = SecureUtil.getEncryptStr(System.currentTimeMillis() + "&" + userId + "&" + request.getRemoteAddr());
List<FileInfo> source = fileOperation.getSource();
String type = fileOperation.getType();

if (StringUtils.equals(type, "remote")) {
fileAreaService.sendFileToRemote(fileOperation, secureToken, userId);
} else {
String targetPath = fileOperation.getTarget().getCanonicalPath();
for (FileInfo fileInfo : source) {
fileAreaService.sendFileToOtherAreas(userId, fileInfo.getCanonicalPath(), secureToken, targetPath);
}
}
response.setMessage("操作成功,后台正在发送中,发送成功后系统会通过消息提醒");
} catch (Exception e) {
response.setCode(500);
response.setMessage("服务器内部错误: " + e.getMessage());
}
return response;
}


@PostMapping("/initUpload")
public ResultBean initUpload(@RequestParam String fileName, @RequestParam String authInfo) {
String uploadId = UUID.randomUUID().toString();
uploadSessions.put(uploadId, new ChunkStorage(uploadId, fileName));
log.info("文件传输初始化 | uploadId:{} | user:{} | path:{}", uploadId, authInfo, fileName);
return ResultBean.ok(uploadId);
}

// 定时清理uploadSessions中过期的缓存
@Scheduled(cron = "0 0 0 * * ?")
public void clearExpiredUploadSessions() {
LocalDateTime now = LocalDateTime.now();
log.warn("清理过期的分片缓存begin | 当前时间:{} | 清理前:{}", now, uploadSessions.size());
Set<String> expiredKeys = uploadSessions.entrySet()
.stream()
.filter(entry -> now.isAfter(entry.getValue().getCreateTime().plusWeeks(1)))
.map(entry -> {
ChunkStorage storage = entry.getValue();
deleteChunkDirectory(storage.getUploadId());
return entry.getKey();
})
.collect(Collectors.toSet());
expiredKeys.forEach(uploadSessions::remove);
log.warn("清理过期的分片缓存end | 当前时间:{} | 清理后:{}", now, uploadSessions.size());
}

public void deleteChunkDirectory(String uploadId) {
Path chunkDir = Paths.get("/tmp/chunks").resolve(uploadId); // 安全构造路径
if (!Files.exists(chunkDir)) return;
try (Stream<Path> pathStream = Files.walk(chunkDir)) {
pathStream.sorted(Comparator.reverseOrder()) // 更清晰的逆序写法
.forEach(path -> {
try {
Files.deleteIfExists(path);
log.debug("已删除分片文件: {}", path);
} catch (IOException e) {
log.error("分片清理失败: {}", path, e);
}
});
} catch (IOException e) { // 限定捕获IO异常
log.error("分片目录遍历失败: {}", chunkDir, e);
}
}

@PostMapping(value = "/uploadChunk", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public ResultBean uploadChunk(
@RequestPart("chunk") MultipartFile chunk,
@RequestParam String uploadId,
@RequestParam long chunkIndex,
@RequestParam String authInfo) {
try {
ChunkStorage storage = uploadSessions.get(uploadId);
if (storage == null) {
return ResultBean.fail("无效的uploadId");
}
// 直接按规则生成分片路径,无需存储到 Map
String chunkDir = "/tmp/chunks/" + uploadId;
Files.createDirectories(Paths.get(chunkDir)); // 确保目录存在
String chunkPath = chunkDir + "/" + chunkIndex;
chunk.transferTo(Paths.get(chunkPath));
log.info("文件上传分片 | uploadId:{} | user:{} | path:{} | chunkIndex:{}", uploadId,
getResfulLoginUser(authInfo), chunkPath, chunkIndex);
return ResultBean.ok();
} catch (IOException e) {
return ResultBean.fail("分片保存失败");
}
}

@PostMapping("/completeUpload")
public ResultBean completeUpload(@RequestBody CompleteRequest completeRequest, @RequestParam String authInfo) {
try {
ChunkStorage storage = uploadSessions.get(completeRequest.getUploadId());
// 按顺序合并分片
String finalPath = mergeChunks(storage, completeRequest);
log.info("文件合并完成 | uploadId:{} | user:{} | path:{}", completeRequest.getUploadId(), completeRequest.getUserName(), finalPath);
String homeBase = elfinderConfiguration.getVolumes().get(0).getPath();
return ResultBean.ok(finalPath.replace(homeBase + "/", ""));
} catch (Exception e) {
log.error("文件合并失败", e);
return ResultBean.fail("文件合并失败");
} finally {
Path chunkDir = Paths.get("/tmp/chunks/" + completeRequest.getUploadId());
try (Stream<Path> pathStream = Files.walk(chunkDir)) {
pathStream.sorted((a, b) -> b.compareTo(a)) // 先删文件再删目录
.forEach(path -> {
try {
log.debug("已删除分片文件: {}", path);
Files.deleteIfExists(path);
} catch (IOException e) {
log.error("分片清理失败: {}", path, e);
}
});
} catch (IOException e) {
log.error("分片清理失败", e);
}
uploadSessions.remove(completeRequest.getUploadId());
}
}

private String mergeChunks(ChunkStorage storage, CompleteRequest completeRequest) throws Exception {
String targetPath = completeRequest.getTargetPath();
// 如果是以/结尾,则去掉最后的/
if (targetPath.endsWith("/")) {
targetPath = targetPath.substring(0, targetPath.length() - 1);
}
Path outputPath = Paths.get(targetPath + File.separator + storage.getFileName());
File file = outputPath.toFile();
if (file.exists()) {
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
String nowStr = now.format(formatter);
// 提取文件名和后缀
String fileName = storage.getFileName();
int lastDotIndex = fileName.indexOf('.');
String replaced = file.getParentFile().getAbsolutePath();
if (lastDotIndex != -1) {
String baseName = fileName.substring(0, lastDotIndex);
String extension = fileName.substring(lastDotIndex);
outputPath = Paths.get(replaced + File.separator + baseName + "_" + nowStr + "_new" + extension);
} else {
outputPath = Paths.get(replaced + File.separator + fileName + "_" + nowStr + "_new");
}
} else {
Files.createFile(outputPath);
}

// 若果文件大小为0,则直接创建一个文件
Long totalSize = completeRequest.getTotalSize();

if (totalSize == 0) {
log.info("文件大小为0,创建一个空文件");
if (!Files.exists(outputPath)) {
Files.createFile(outputPath);
}
} else {
mergeFileFromTmp(completeRequest, outputPath);
// 校验文件
String md5Checksum = completeRequest.getMd5Checksum();
long length = file.length();
String fileHash = FileUtil.getFileHash(outputPath);
if (totalSize != length) {
log.error("文件大小不匹配,实际大小:{},预期大小:{}", length, totalSize);
// 删除文件
file.deleteOnExit();
throw new IllegalArgumentException("文件大小不匹配");
}
if (!md5Checksum.equals(fileHash)) {
log.error("文件校验失败,实际哈希值:{},预期哈希值:{}", fileHash, md5Checksum);
file.deleteOnExit();
throw new IllegalArgumentException("文件哈希值校验失败");
}
}

updatePermission(completeRequest.getUserName(), completeRequest.getFilePermissionAsOctal(), outputPath);
return outputPath.toString();
}

private void updatePermission(String fileOwner, int filePermissionAsOctal, Path outputPath) {
// 设置文件权限
String permissionStr = String.valueOf(filePermissionAsOctal);
PermissionBean permissionBean = new PermissionBean();
permissionBean.setUser(fileOwner);
permissionBean.setChmod(permissionStr);
changePermission(permissionBean, outputPath.toString(), fileOwner);
}

private static void mergeFileFromTmp(CompleteRequest completeRequest, Path outputPath) throws IOException {
// 获取 uploadId 对应的分片目录
Path chunkDir = Paths.get("/tmp/chunks/" + completeRequest.getUploadId());

try (FileChannel outChannel = FileChannel.open(outputPath,
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
// 修改后的mergeChunks方法片段
try (Stream<Path> pathStream = Files.list(chunkDir)) {
pathStream.sorted((p1, p2) -> {
int idx1 = Integer.parseInt(p1.getFileName().toString());
int idx2 = Integer.parseInt(p2.getFileName().toString());
return Integer.compare(idx1, idx2);
})
.forEach(chunkPath -> {
try (FileChannel inChannel = FileChannel.open(chunkPath)) {
inChannel.transferTo(0, inChannel.size(), outChannel);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
try {
Files.deleteIfExists(chunkPath);
log.debug("已删除分片文件: {}", chunkPath);
} catch (IOException e) {
log.error("分片文件删除失败: {}", chunkPath, e);
}
}
});
}
}
}


@RequestMapping(value = {"/download"}, method = {RequestMethod.GET})
@ResponseBody
public void fileDownload(@RequestParam("token") String token,
@RequestParam("filePath") String filePath,
HttpServletRequest request,
HttpServletResponse response) throws IOException {
Path path = Paths.get(filePath);
if (!Files.exists(path)) {
response.setContentType("application/json; charset=UTF-8");
response.getWriter().write("{\"message\":\"文件未找到\",\"code\":1001}");
return;
}
String fileName = path.getFileName().toString();
long fileLength = Files.size(path);
response.setCharacterEncoding("utf-8");
response.setHeader("Content-Disposition", "attachment; filename=\"" + fileName + "\"");
response.setHeader("Accept-Ranges", "bytes");

String range = request.getHeader("Range");
long start = 0, end = fileLength - 1;
if (range != null && range.startsWith("bytes=")) {
String[] parts = range.replace("bytes=", "").split("-");
start = Long.parseLong(parts[0]);
if (parts.length > 1 && !parts[1].isEmpty()) {
end = Long.parseLong(parts[1]);
}
response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
}
long contentLength = end - start + 1;
response.setContentLengthLong(contentLength);
response.setHeader("Content-Range", "bytes " + start + "-" + end + "/" + fileLength);

try (InputStream is = Files.newInputStream(path)) {
is.skip(start);
byte[] buffer = new byte[64 * 1024]; // 64KB缓冲区
long toRead = contentLength;
OutputStream out = response.getOutputStream();
while (toRead > 0) {
int len = is.read(buffer, 0, (int) Math.min(buffer.length, toRead));
if (len == -1) break;
out.write(buffer, 0, len);
toRead -= len;
}
out.flush();
}
}
}

2. FileAreaService 分片上传/下载核心逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
// FileAreaService.java 关键方法片段

@Service
@Slf4j
public class FileAreaService extends BaseService {

@Resource(name = "fileSendThreadPool")
private ThreadPoolExecutor executor;

@Resource(name = "fileSendToolThreadPool")
private ThreadPoolExecutor executorTool;

@Resource(name = "restTemplateIgnoreSSL")
RestTemplate restTemplate;

@Value("${file.accept.addr}")
private String targetServerUrl;

@Value("${file.send.chunkSize:5}")
private int chunkSize;

@Value("${file.send.filePermission:750}")
private int filePermissionAsOctal;

public void sendFileToOtherAreas(String userName, String filePath, String token, String targetPath) {
log.info("文件传输任务开始 | user:{} | path:{} | targetPath:{}", userName, filePath, targetPath);
// 判断文件是否存在,以及是否是文件夹,不支持文件夹
File file = new File(filePath);
if (!file.exists() || file.isDirectory()) {
log.error("文件不存在或为文件夹 | user:{} | path:{}", userName, filePath);
throw new IllegalArgumentException("文件不存在或为文件夹");
}
executor.execute(() -> {
Path path = file.toPath();
String uploadId = initUploadSession(path.getFileName().toString(), token);
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
int size = chunkSize * 1024 * 1024;
long totalChunks = (file.length() + size - 1) / size;

// 并行分片上传(使用工作窃取线程池)
if (totalChunks == 0) {
log.warn("文件大小为0,不支持分片上传,自动创一个空文件 | user:{} | path:{} | targetPath:{}", userName, filePath, targetPath);
}
CompletableFuture.allOf(LongStream.range(0, totalChunks)
.parallel()
.mapToObj(chunkIndex -> CompletableFuture.runAsync(() ->
sendOptimizedChunk(userName, filePath, uploadId, channel, chunkIndex, size, token),
executorTool)).toArray(CompletableFuture[]::new)).join();
ResultBean resultBean = completeUpload(uploadId, userName, token, path, targetPath, filePermissionAsOctal);
String msg;
if (Objects.requireNonNull(resultBean).getCode() == 0) {
msg = "文件:" + path.getFileName() + "已同步至文件柜" + resultBean.getData() + ",请您打开文件柜查看。";
log.info("文件传输完成 | uploadId:{} | user:{} | path:{} | targetPath:{}", uploadId, userName, filePath, targetPath);
sendMsg(userName, msg);
} else {
msg = "文件:" + file.getName() + "传输合并失败,请稍后重试!";
log.info("文件传输失败 | uploadId:{} | user:{} | path:{} | targetPath:{}", uploadId, userName, filePath, targetPath);
sendMsg(userName, msg);
}
} catch (Exception e) {
log.error("文件传输任务执行失败", e);
sendMsg(userName, "文件:" + file.getName() + "传输失败,请稍后重试!");
}
});
}

private void sendOptimizedChunk(String userName, String filePath, String uploadId,
FileChannel channel, long chunkIndex, int chunkSize, String token) {
MappedByteBuffer buffer = null;
byte[] bufferArray = null;
try {
long position = chunkIndex * chunkSize;
long size = Math.min(chunkSize, channel.size() - position);

// 内存映射优化(只读模式)
buffer = channel.map(
FileChannel.MapMode.READ_ONLY,
position,
size
);
bufferArray = new byte[buffer.remaining()];
buffer.get(bufferArray); // 将缓冲区内容复制到字节数组

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
headers.add("X-Chunk-Offset", String.valueOf(position));
headers.setContentLength(buffer.limit()); // 显式设置内容长度
headers.add("X-Upload-Id", uploadId); // 可选:增加上传ID头

MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("chunk", new ByteArrayResource(bufferArray) {
@Override
public String getFilename() {
return "chunk_" + chunkIndex + ".dat";
}
});
body.add("uploadId", uploadId);
body.add("chunkIndex", chunkIndex);
executeWithRetry(() -> {
ResponseEntity<ResultBean> responseEntity = restTemplate.exchange(
targetServerUrl + "/uploadChunk?token=" + token,
HttpMethod.POST,
new HttpEntity<>(body, headers),
ResultBean.class
);
if (responseEntity.getStatusCode() != HttpStatus.OK) {
log.error("文件传输失败 | uploadId:{} | user:{} | path:{}", uploadId, userName, filePath);
throw new RuntimeException("文件传输失败");
}
ResultBean resultBean = responseEntity.getBody();
if (resultBean != null && resultBean.getCode() != 0) {
log.error("文件传输失败 | uploadId:{} | user:{} | path:{}", uploadId, userName, filePath);
throw new RuntimeException("文件传输失败");
} else {
log.info("文件传输成功 | uploadId:{} | user:{} | path:{}", uploadId, userName, filePath);
}
return null; // 明确返回null以匹配Callable<Void>
}, 3, 1000L);
} catch (Exception e) {
throw new RuntimeException("内存映射失败", e);
} finally {
if (buffer != null) {
((DirectBuffer) buffer).cleaner().clean();
}
}
}

// 带指数退避的重试方法
private void executeWithRetry(Callable<Void> action, int maxRetries, long initialDelay) throws Exception {
int retryCount = 0;
while (true) {
try {
action.call();
return;
} catch (Exception e) {
log.warn("操作失败,重试次数: {}", retryCount, e);
if (retryCount++ >= maxRetries) {
log.error("操作失败,达到最大重试次数: {}", maxRetries, e);
throw e;
}
try {
Thread.sleep(initialDelay * (long) Math.pow(2, retryCount));
} catch (InterruptedException ie) {
log.error("线程中断,终止重试", ie);
Thread.currentThread().interrupt();
throw new RuntimeException("传输中断", ie);
}
}
}
}


private ResultBean completeUpload(String uploadId, String userName, String token, Path path, String targetPath, int filePermissionAsOctal) {
String fileHash;
try {
fileHash = FileUtil.getFileHash(path);
} catch (IOException e) {
log.error("获取文件哈希失败", e);
throw new RuntimeException(e);
}
String fileName = path.getFileName().toString();
long fileSize = path.toFile().length();

String url = targetServerUrl + "/completeUpload?token=" + token;
CompleteRequest completeRequest = new CompleteRequest(uploadId, userName, fileHash, fileSize, fileName, filePermissionAsOctal, targetPath);
return restTemplate.postForObject(url, completeRequest, ResultBean.class);
}

public void sendMsg(String userName, String mgs) {

}

private String initUploadSession(String fileName, String token) {
String url = targetServerUrl + "/initUpload?token=" + token + "&fileName=" + fileName;
ResultBean resultBean = restTemplate.postForObject(url, null, ResultBean.class);
String uploadIds = Objects.requireNonNull(resultBean).getData().toString();
if (StringUtils.isBlank(uploadIds)) {
log.error("文件传输初始化失败 | fileName:{}", fileName);
throw new RuntimeException("文件传输初始化失败 | fileName:" + fileName);
}
return uploadIds;
}


// 需要从微系统文件柜去下载指定的文件到本机的目标位置
public void sendFileToIDM(FileOperation fileOperation, String token, String loginUser) {
if (fileOperation == null || fileOperation.getSource() == null || fileOperation.getSource().isEmpty()) {
log.warn("没有需要下载的文件");
return;
}
String targetDir = fileOperation.getTarget().getCanonicalPath();
log.debug("准备下载文件到目录: {}", targetDir);
int partSize = chunkSize * 1024 * 1024; // 5MB分片
for (FileInfo fileInfo : fileOperation.getSource()) {
String filePath = fileInfo.getCanonicalPath();
String fileName = fileInfo.getName();
String downloadUrl = targetServerUrl + "/download?token=" + token + "&filePath=" + filePath;
File targetFile = new File(targetDir, fileName);
// 文件存在则重命名
if (targetFile.exists()) {
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
String nowStr = now.format(formatter);
int lastDotIndex = fileName.lastIndexOf('.');
String baseName, extension;
if (lastDotIndex != -1) {
baseName = fileName.substring(0, lastDotIndex);
extension = fileName.substring(lastDotIndex);
targetFile = Paths.get(targetDir, baseName + "_" + nowStr + "_new" + extension).toFile();
} else {
targetFile = Paths.get(targetDir, fileName + "_" + nowStr + "_new").toFile();
}
}
try {
String targetFileAbsolutePath = targetFile.getAbsolutePath();
log.info("开始处理文件下载: 源文件={}, 目标文件={}", filePath, targetFileAbsolutePath);

// 1. 获取文件总大小
HttpHeaders headers = new HttpHeaders();
headers.set("Range", "bytes=0-0");
ResponseEntity<byte[]> resp = restTemplate.exchange(downloadUrl, HttpMethod.GET, new HttpEntity<>(headers), byte[].class);
long fileSize;
List<String> cr = resp.getHeaders().get("Content-Range");
if (cr != null && !cr.isEmpty()) {
String[] arr = cr.get(0).split("/");
fileSize = Long.parseLong(arr[1]);
log.info("文件大小检测成功: {} bytes", fileSize);
} else {
log.warn("服务器不支持Range请求,将使用单线程下载: {}", fileName);
// 不支持Range,直接单线程下载
if (resp.getBody() != null) {
Files.write(targetFile.toPath(), resp.getBody());
log.info("文件单线程下载完成: {}", targetFileAbsolutePath);
}
continue;
}

long totalParts = (fileSize + partSize - 1) / partSize;
log.info("文件分片信息 - 大小: {} bytes, 分片大小: {} bytes, 总分片数: {}", fileSize, partSize, totalParts);
// 2. 多线程分片下载并写入文件
try (RandomAccessFile raf = new RandomAccessFile(targetFile, "rw")) {
raf.setLength(fileSize);
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < totalParts; i++) {
final int partIdx = i;
futures.add(CompletableFuture.runAsync(() -> {
long start = partIdx * partSize;
long end = Math.min(fileSize - 1, (partIdx + 1) * partSize - 1);
HttpHeaders partHeaders = new HttpHeaders();
partHeaders.set("Range", "bytes=" + start + "-" + end);
for (int retry = 0; retry < 3; retry++) {
try {
ResponseEntity<byte[]> partResp = restTemplate.exchange(downloadUrl, HttpMethod.GET, new HttpEntity<>(partHeaders), byte[].class);
byte[] data = partResp.getBody();
if (data != null) {
synchronized (raf) {
raf.seek(start);
raf.write(data);
}
log.debug("分片下载成功: 分片{} ({} - {})", partIdx, start, end);
break;
}
} catch (Exception ex) {
if (retry == 2) {
log.error("分片下载失败: 分片{} ({} - {}), 错误: {}", partIdx, start, end, ex.getMessage());
throw new RuntimeException("分片下载失败", ex);
}
try {
Thread.sleep(500L * (retry + 1));
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
log.error("线程被中断: {}", ignored.getMessage());
}
}
}
}, executorTool));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
// 3. 设置文件权限
updatePermission(loginUser, filePermissionAsOctal, targetFile.toPath());
log.info("文件多线程分片下载成功: {}", targetFileAbsolutePath);
Node node = elfinderConfiguration.getVolumes().get(0);
String targetPath = targetFile.getAbsolutePath().replace(node.getPath() + "/", "").replace(loginUser, node.getAlias());
sendMsg(loginUser, "文件:" + fileName + "已同步至文件柜" + targetPath + ",请您打开文件柜查看。");
} catch (Exception e) {
log.error("文件分片下载异常: {}", fileName, e);
sendMsg(loginUser, "文件:" + fileName + "下载失败,请稍后重试!");
}
}
}

private void updatePermission(String fileOwner, int filePermissionAsOctal, Path outputPath) {
// 设置文件权限
String permissionStr = String.valueOf(filePermissionAsOctal);
PermissionBean permissionBean = new PermissionBean();
permissionBean.setUser(fileOwner);
permissionBean.setChmod(permissionStr);
changePermission(permissionBean, outputPath.toString(), fileOwner);
}
}

3. FileSendThreadPoolConfig 线程池配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// FileSendThreadPoolConfig.java 关键配置片段

@Bean(name = "fileSendThreadPool", destroyMethod = "shutdown")
public ThreadPoolExecutor threadPoolExecutor() {
return new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new CustomThreadFactory("file-send-pool"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

@Bean(name = "fileSendToolThreadPool", destroyMethod = "shutdown")
public ThreadPoolExecutor fileSendToolThreadPool() {
int availableProcessors = 4;
int corePool = availableProcessors;
int maxPool = availableProcessors * 4;
int queueCapacity = availableProcessors * 10;
return new ThreadPoolExecutor(
corePool,
maxPool,
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new CustomThreadFactory("file-send-pool-tool"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

4. 相关Bean说明

  • CompleteRequest:分片合并请求体,包含uploadId、userName、md5Checksum、totalSize、fileName、filePermissionAsOctal、targetPath等字段。
  • ResultBean:统一返回结果对象,包含code、message、data等。
  • ChunkStorage:服务端用于缓存分片上传会话信息的对象。