7

CompletableFuture之批量上传 - 按时睡好觉

 1 year ago
source link: https://www.cnblogs.com/-tang/p/17517164.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

最近接到一个需求,批量上传图片到服务器及实时更新上传进度。当处理大量文件上传任务时,效率是一个关键因素。传统的串行方式会导致任务耗时较长,而使用并发处理可以极大地提高上传效率。想到很久之前用CompletableFuture优化过一些多统计的业务场景,效果都还不错,因此在这里也使用它来优化一下上传的效率。

CompletableFuture简介

CompletableFuture类是Java 8引入的,它实现了FutureCompletionStage接口,提供了更强大和灵活的异步编程功能。CompletableFuture除了具有Future的特性外,还提供了更多的操作和组合方式来处理异步任务。它可以更方便地处理异步任务,实现并发编程,并提供更好的异常处理和结果转换机制。在进行异步编程时,CompletableFuture是一个更为强大和推荐的选择。

主要特点:

  1. 异步执行:允许将任务提交给后台线程,在任务执行期间不会阻塞主线程。这样可以提高应用程序的响应性能,特别是在处理I/O密集型操作时,如网络请求或数据库查询。

  2. 链式调用和组合操作:支持链式调用,可以将多个异步任务按照顺序连接起来形成一个任务流水线。每个任务的执行依赖于前一个任务的结果,这种串行的处理方式可以简化异步任务的编写和管理。

  3. 异常处理:提供了异常处理的机制,可以通过异常回调方法来捕获和处理任务执行过程中的异常情况。这样可以更好地控制和处理任务执行过程中的异常,提供更健壮的代码。

  4. 转换和合并结果:提供了一系列的转换和合并操作,可以对任务的结果进行映射、转换和合并。这样可以方便地对任务的结果进行处理和转换,得到最终期望的结果。

  5. 多任务并行执行:支持等待多个任务并行执行,并等待它们全部完成或任意一个完成。这种能力使得在处理并发任务时可以更好地利用系统资源,提高任务执行的效率。

串行和并行的效率对比

测试批量上传了1000张图片,每张图片在579KB,一共564MB。使用串行方式上传,总时长为501秒,使用并行方式上传,总时长是108秒,通过对比优化前后的代码,可以明显看出使用CompletableFuture并发处理方式的效率更高。由于任务是并行执行的,多核处理器的能力得到了充分的利用,从而大大提高了批量上传的速度。

串行处理方式

/**
* describe: 批量上传图片
*
* @param files 图片文件集合
* @param fileId 文件夹id
* @param scheduleKey 上传进度key
* @date 2023年06月28日 11:42:03
* @author Tang
*/
@Override
public BatchUploadVO batchUpload2(MultipartFile[] files, Long fileId, String scheduleKey) {
//取上传配置
String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
List<String> imgTypeList = Arrays.asList(config.getImgType().split(","));
List<String> errorNames = Lists.newCopyOnWriteArrayList();
String userName = SecurityAuthorHolder.getSecurityUser().getUsername();
for(MultipartFile file : files){
try {
RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
String suffix = Objects.requireNonNull(file.getOriginalFilename()).substring(file.getOriginalFilename().lastIndexOf(".") + 1);
ServerException.Assert(!imgTypeList.contains(suffix), "文件格式不正确,支持" + String.join(",", imgTypeList));
ServerException.Assert(file.getSize() > config.getMaxSize() * 1024, "文件最大不能超过" + config.getMaxSize() + "K");
//上传
ImgResourceEntity saveData = upload(file, config);
saveData.setFileId(fileId);
saveData.setCreator(userName);
baseMapper.insert(saveData);
//缓存自增  供轮询查询实时进度
RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
} catch (Exception e) {
errorNames.add(file.getOriginalFilename());
RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
}
}
BatchUploadVO vo = schedule(scheduleKey);
vo.setErrFileNames(errorNames);
return vo;
}

串行处理调用时间

2067462-20230630161642501-627890934.png

并行处理方式

/**
* describe: 批量上传图片
*
* @param files 图片文件集合
* @param fileId 文件夹id
* @param scheduleKey 上传进度key
* @date 2023年06月28日 11:42:03
* @author Tang
*/
@Override
public BatchUploadVO batchUpload(MultipartFile[] files, Long fileId, String scheduleKey) {
ExecutorService executor = Executors.newFixedThreadPool(10);
//取上传配置
String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
List<String> imgTypeList = Arrays.asList(config.getImgType().split(","));
List<String> errorNames = Lists.newCopyOnWriteArrayList();
String userName = SecurityAuthorHolder.getSecurityUser().getUsername();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
Arrays.stream(files).map(v ->
CompletableFuture.runAsync(
() -> {
try {
RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
String suffix = Objects.requireNonNull(v.getOriginalFilename()).substring(v.getOriginalFilename().lastIndexOf(".") + 1);
ServerException.Assert(!imgTypeList.contains(suffix), "文件格式不正确,支持" + String.join(",", imgTypeList));
ServerException.Assert(v.getSize() > config.getMaxSize() * 1024, "文件最大不能超过" + config.getMaxSize() + "K");
//上传
ImgResourceEntity saveData = upload(v, config);
saveData.setFileId(fileId);
saveData.setCreator(userName);
baseMapper.insert(saveData);
//缓存自增  供轮询查询实时进度
RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
} catch (Exception e) {
errorNames.add(v.getOriginalFilename());
RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
}
}, executor)
).toArray(CompletableFuture[]::new)
);
// 等待所有 CompletableFuture 完成
allFutures.join();
// 关闭线程池
executor.shutdown();
BatchUploadVO vo = schedule(scheduleKey);
vo.setErrImgFileNames(errorNames);
return vo;
}

 并行调用处理时间

2067462-20230630161904044-264043790.png

实现过程中的注意事项

  1. 线程池的使用:为了实现并发处理,可以使用线程池来管理并执行异步任务。通过合理设置线程池的大小和参数,可以控制并发线程的数量和资源的利用率。

  2. 异常处理:在并发处理中,每个任务都是独立执行的,因此需要适当处理任务中可能出现的异常情况,避免异常的影响扩散。

  3. 进度更新:为了实时更新上传进度,可以将每个任务的进度信息保存到Redis中,并在前端通过轮询查询的方式获取最新的进度信息。

  4. 线程安全:确保上传逻辑的线程安全性,避免多线程环境下的竞态条件和数据一致性问题。

使用CompletableFuture来优化批量上传任务是一种高效且灵活的方式。通过并发处理,我们可以充分利用多核处理器的能力,提高任务的执行效率。同时,通过实时更新上传进度并返回总体的上传结果,可以给用户更好的体验。
在实现过程中,我们需要合理使用线程池、处理异常、保证数据同步和线程安全,以确保上传任务的稳定性和性能。同时,我们还可以利用CompletableFuture提供的方法来处理任务的结果、异常和其他相关操作,以满足具体的业务需求。
通过使用CompletableFuture进行批量上传任务的优化,可以显著提高系统的性能和用户体验,适用于需要处理大量并发任务的场景。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK