Java 17 异步多线程视频上传实战

简介: 本文基于Java 17实现了企业级的异步多线程视频上传方案,核心是自定义IO密集型线程池 + CompletableFuture异步编程 + 分片上传优化,并扩展了阿里云OSS集成、进度回调、断点续传、分布式锁、日志监控等关键特性。

在互联网应用中,视频上传是高频且核心的业务场景——短视频平台的内容发布、教育系统的课程视频上传、企业网盘的大文件传输,都离不开稳定高效的视频上传能力。但传统的同步上传方式存在明显痛点:单线程上传大视频耗时久、用户体验差;IO阻塞导致服务吞吐量低;大文件上传易中断且无法续传;多节点部署时还可能出现重复上传问题。

一、核心原理:为什么这样设计?

在动手编码前,我们先理清核心技术的底层逻辑——知其然更知其所以然,才能灵活应对业务变化。

1.1 IO 密集型场景的线程池设计逻辑

视频上传属于典型的IO 密集型操作(大部分时间消耗在网络 IO/磁盘 IO 等待上,CPU 利用率低),与 CPU 密集型场景的线程池配置逻辑完全不同:

  • CPU 密集型:线程数 = CPU 核心数 + 1(减少上下文切换);
  • IO 密集型:线程数 = CPU 核心数 * 2(甚至 *4),让 CPU 在 IO 等待时处理其他线程任务,提升资源利用率。

Java 17 中线程池的核心参数设计依据:

  • 核心线程数:Runtime.getRuntime().availableProcessors() * 2,保证基础并发能力;
  • 最大线程数:Runtime.getRuntime().availableProcessors() * 4,应对突发上传请求;
  • 空闲线程存活时间:60s,释放闲置资源;
  • 任务队列:LinkedBlockingQueue(100),避免无界队列导致内存溢出;
  • 拒绝策略:CallerRunsPolicy,让调用线程执行任务,避免直接丢弃(适合上传场景)。

1.2 CompletableFuture 异步编程模型

Java 8 引入的 CompletableFuture 是异步编程的核心,Java 17 对其兼容性和性能做了优化,相比传统 Thread + Future 有三大优势:

  1. 非阻塞:无需手动管理线程,通过回调处理结果;
  2. 链式调用:支持 thenApply/whenComplete/allOf 等方法,简化多任务协同;
  3. 异常处理:内置 exceptionally/handle 方法,统一处理异步任务异常。

对于视频上传,我们用 supplyAsync 提交有返回值的上传任务,用 allOf 等待所有分片上传完成,用 whenComplete 处理最终结果,完全规避同步阻塞问题。

1.3 分片上传的底层逻辑

大视频文件(如 1GB 以上)直接上传易因网络波动中断,分片上传将文件拆分为固定大小的分片(如 10MB/片),核心流程:

image.png

分片上传的核心优势:

  • 断点续传:记录已上传分片,中断后仅传未完成的;
  • 并行提速:多线程同时上传不同分片;
  • 降低超时风险:单分片体积小,不易触发网络超时。

二、基础环境准备

2.1 Maven 依赖(最新稳定版)

创建 Maven 项目,pom.xml 引入以下依赖(基于 Spring Boot 3.2.0,适配 Java 17):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.0</version>
       <relativePath/>
   </parent>
   <groupId>com.jam.demo</groupId>
   <artifactId>video-upload-demo</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>video-upload-demo</name>
   <description>Java 17 异步多线程视频上传demo</description>
   <properties>
       <java.version>17</java.version>
       <maven.compiler.source>17</maven.compiler.source>
       <maven.compiler.target>17</maven.compiler.target>
       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       <aliyun.oss.version>3.18.0</aliyun.oss.version>
       <redisson.version>3.23.3</redisson.version>
       <mybatis-plus.version>3.5.5</mybatis-plus.version>
       <fastjson2.version>2.0.43</fastjson2.version>
       <springdoc.version>2.3.0</springdoc.version>
       <micrometer.version>1.12.0</micrometer.version>
       <guava.version>32.1.3-jre</guava.version>
       <lombok.version>1.18.30</lombok.version>
   </properties>
   <dependencies>
       <!-- Spring Boot 核心 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-aop</artifactId>
       </dependency>
       <!-- Lombok -->
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>${lombok.version}</version>
           <scope>provided</scope>
       </dependency>
       <!-- 阿里云 OSS SDK -->
       <dependency>
           <groupId>com.aliyun.oss</groupId>
           <artifactId>aliyun-sdk-oss</artifactId>
           <version>${aliyun.oss.version}</version>
       </dependency>
       <!-- Redisson 分布式锁 -->
       <dependency>
           <groupId>org.redisson</groupId>
           <artifactId>redisson-spring-boot-starter</artifactId>
           <version>${redisson.version}</version>
       </dependency>
       <!-- MyBatisPlus -->
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>${mybatis-plus.version}</version>
       </dependency>
       <!-- MySQL 驱动 -->
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <scope>runtime</scope>
       </dependency>
       <!-- Fastjson2 -->
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>${fastjson2.version}</version>
       </dependency>
       <!-- Guava -->
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>${guava.version}</version>
       </dependency>
       <!-- Swagger3 (SpringDoc) -->
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>${springdoc.version}</version>
       </dependency>
       <!-- 监控指标 (Prometheus) -->
       <dependency>
           <groupId>io.micrometer</groupId>
           <artifactId>micrometer-registry-prometheus</artifactId>
           <version>${micrometer.version}</version>
       </dependency>
       <!-- 测试 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>
   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
               <configuration>
                   <excludes>
                       <exclude>
                           <groupId>org.projectlombok</groupId>
                           <artifactId>lombok</artifactId>
                       </exclude>
                   </excludes>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

2.2 核心配置文件

application.yml 配置(包含 OSS、Redis、数据库、线程池、监控等):

spring:
 # 数据库配置
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://localhost:3306/video_upload?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
   username: root
   password: root
 # Redis 配置(Redisson 分布式锁)
 data:
   redis:
     host: localhost
     port: 6379
     password:
     database: 0
# MyBatisPlus 配置
mybatis-plus:
 configuration:
   map-underscore-to-camel-case: true
   log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
 mapper-locations: classpath:mapper/**/*.xml
 type-aliases-package: com.jam.demo.entity
# 阿里云 OSS 配置
aliyun:
 oss:
   endpoint: oss-cn-hangzhou.aliyuncs.com
   access-key-id: your-access-key-id
   access-key-secret: your-access-key-secret
   bucket-name: your-bucket-name
   base-path: videos/
# 视频上传配置
video:
 upload:
   chunk-size: 10485760 # 分片大小 10MB
   max-size: 524288000 # 最大文件大小 500MB
   supported-formats: mp4,avi,mov,mkv # 支持的视频格式
# 线程池配置
thread:
 pool:
   core-size: ${runtime.availableProcessors}2
   max-size: ${runtime.availableProcessors}4
   keep-alive-seconds: 60
   queue-capacity: 100
# 监控配置
management:
 endpoints:
   web:
     exposure:
       include: prometheus,health,info
 metrics:
   tags:
     application: video-upload-demo

2.3 数据库表设计(MySQL 8.0)

创建 video_upload_chunk 表,用于记录分片上传状态(断点续传核心):

CREATE DATABASE IF NOT EXISTS video_upload DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE video_upload;

-- 视频分片上传状态表
CREATE TABLE IF NOT EXISTS video_upload_chunk (
   id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',
   file_md5 VARCHAR(64) NOT NULL COMMENT '文件MD5(唯一标识文件)',
   chunk_num INT NOT NULL COMMENT '分片序号(从1开始)',
   chunk_size BIGINT NOT NULL COMMENT '分片大小(字节)',
   total_size BIGINT NOT NULL COMMENT '文件总大小(字节)',
   total_chunk INT NOT NULL COMMENT '总分片数',
   file_name VARCHAR(255) NOT NULL COMMENT '文件名',
   file_ext VARCHAR(10) NOT NULL COMMENT '文件扩展名',
   upload_status TINYINT NOT NULL DEFAULT 0 COMMENT '上传状态:0-未上传 1-已上传 2-合并完成',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_file_md5_chunk_num (file_md5, chunk_num) COMMENT '唯一索引:文件MD5+分片序号',
   KEY idx_file_md5 (file_md5) COMMENT '文件MD5索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='视频分片上传状态表';

三、核心代码实现

3.1 线程池配置类(自定义 IO 密集型线程池)

遵循阿里巴巴规范,线程池需手动创建而非使用 Executors 静态方法,避免资源耗尽:

package com.jam.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 异步线程池配置类(IO密集型)
* @author ken
* @date 2025/12/04
*/

@Slf4j
@Configuration
@EnableAsync
public class AsyncThreadPoolConfig {

   @Value("${thread.pool.core-size}")
   private int coreSize;

   @Value("${thread.pool.max-size}")
   private int maxSize;

   @Value("${thread.pool.keep-alive-seconds}")
   private int keepAliveSeconds;

   @Value("${thread.pool.queue-capacity}")
   private int queueCapacity;

   /**
    * 视频上传专用线程池
    * @return 自定义线程池
    */

   @Bean(name = "videoUploadExecutor")
   public Executor videoUploadExecutor() {
       // 修正核心/最大线程数(处理配置文件中${runtime.availableProcessors}的拼接)
       int cpuCore = Runtime.getRuntime().availableProcessors();
       coreSize = cpuCore * 2;
       maxSize = cpuCore * 4;

       log.info("初始化视频上传线程池:核心线程数={}, 最大线程数={}, 队列容量={}", coreSize, maxSize, queueCapacity);

       return new ThreadPoolExecutor(
               coreSize,
               maxSize,
               keepAliveSeconds,
               TimeUnit.SECONDS,
               new LinkedBlockingQueue<>(queueCapacity),
               new ThreadFactory() {
                   private final AtomicInteger threadNum = new AtomicInteger(1);

                   @Override
                   public Thread newThread(Runnable r) {
                       Thread thread = new Thread(r);
                       // 自定义线程名,便于问题排查
                       thread.setName("video-upload-thread-" + threadNum.getAndIncrement());
                       // 非守护线程,保证上传任务完成
                       thread.setDaemon(false);
                       // 设置线程优先级(IO密集型设为NORM_PRIORITY即可)
                       thread.setPriority(Thread.NORM_PRIORITY);
                       return thread;
                   }
               },
               new ThreadPoolExecutor.CallerRunsPolicy() {
                   @Override
                   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                       log.warn("视频上传线程池任务已满,由调用线程执行:{}", Thread.currentThread().getName());
                       super.rejectedExecution(r, e);
                   }
               }
       );
   }
}

3.2 核心实体类

3.2.1 分片状态实体类(MyBatisPlus)

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* 视频分片上传状态实体
* @author ken
* @date 2025/12/04
*/

@Data
@TableName("video_upload_chunk")
public class VideoUploadChunk implements Serializable {

   private static final long serialVersionUID = 1L;

   /**
    * 主键ID
    */

   @TableId(type = IdType.AUTO)
   private Long id;

   /**
    * 文件MD5(唯一标识文件)
    */

   private String fileMd5;

   /**
    * 分片序号(从1开始)
    */

   private Integer chunkNum;

   /**
    * 分片大小(字节)
    */

   private Long chunkSize;

   /**
    * 文件总大小(字节)
    */

   private Long totalSize;

   /**
    * 总分片数
    */

   private Integer totalChunk;

   /**
    * 文件名
    */

   private String fileName;

   /**
    * 文件扩展名
    */

   private String fileExt;

   /**
    * 上传状态:0-未上传 1-已上传 2-合并完成
    */

   private Integer uploadStatus;

   /**
    * 创建时间
    */

   private LocalDateTime createTime;

   /**
    * 更新时间
    */

   @TableField(update = "NOW()")
   private LocalDateTime updateTime;
}

3.2.2 上传进度DTO

package com.jam.demo.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

/**
* 视频上传进度DTO
* @author ken
* @date 2025/12/04
*/

@Data
@Schema(description = "视频上传进度DTO")
public class VideoUploadProgressDTO {

   /**
    * 文件MD5
    */

   @Schema(description = "文件MD5")
   private String fileMd5;

   /**
    * 已上传分片数
    */

   @Schema(description = "已上传分片数")
   private Integer uploadedChunkNum;

   /**
    * 总分片数
    */

   @Schema(description = "总分片数")
   private Integer totalChunkNum;

   /**
    * 上传进度(0-100)
    */

   @Schema(description = "上传进度(0-100)")
   private Integer progress;

   /**
    * 上传状态:0-上传中 1-完成 2-失败
    */

   @Schema(description = "上传状态:0-上传中 1-完成 2-失败")
   private Integer status;
}

3.3 上传进度监听器(实时回调)

定义监听器接口,实现进度回调逻辑:

package com.jam.demo.listener;

import com.jam.demo.dto.VideoUploadProgressDTO;

/**
* 视频上传进度监听器
* @author ken
* @date 2025/12/04
*/

public interface VideoUploadProgressListener {

   /**
    * 更新上传进度
    * @param progressDTO 进度信息
    */

   void onProgressUpdate(VideoUploadProgressDTO progressDTO);

   /**
    * 上传完成
    * @param fileMd5 文件MD5
    * @param ossUrl 上传后的OSS地址
    */

   void onComplete(String fileMd5, String ossUrl);

   /**
    * 上传失败
    * @param fileMd5 文件MD5
    * @param errorMsg 错误信息
    */

   void onFailure(String fileMd5, String errorMsg);
}

/**
* 默认上传进度监听器实现
* @author ken
* @date 2025/12/04
*/

package com.jam.demo.listener;

import com.jam.demo.dto.VideoUploadProgressDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* 默认上传进度监听器
* @author ken
* @date 2025/12/04
*/

@Slf4j
@Component
public class DefaultVideoUploadProgressListener implements VideoUploadProgressListener {

   @Override
   public void onProgressUpdate(VideoUploadProgressDTO progressDTO) {
       log.info("文件{}上传进度:{}%(已上传{}分片/总分片{})",
               progressDTO.getFileMd5(),
               progressDTO.getProgress(),
               progressDTO.getUploadedChunkNum(),
               progressDTO.getTotalChunkNum());
       // 实际业务中可推送至前端(如WebSocket)
   }

   @Override
   public void onComplete(String fileMd5, String ossUrl) {
       log.info("文件{}上传完成,OSS地址:{}", fileMd5, ossUrl);
   }

   @Override
   public void onFailure(String fileMd5, String errorMsg) {
       log.error("文件{}上传失败:{}", fileMd5, errorMsg);
   }
}

3.4 工具类

3.4.1 文件MD5工具类(唯一标识文件)

package com.jam.demo.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
* 文件MD5工具类
* @author ken
* @date 2025/12/04
*/

@Slf4j
@Component
public class FileMd5Util {

   /**
    * 计算文件MD5
    * @param file 文件
    * @return 文件MD5值
    * @throws IOException IO异常
    */

   public String calculateFileMd5(File file) throws IOException {
       if (ObjectUtils.isEmpty(file) || !file.exists() || !file.isFile()) {
           throw new IllegalArgumentException("文件不存在或不是有效文件");
       }

       try (FileInputStream fis = new FileInputStream(file)) {
           MessageDigest md5 = MessageDigest.getInstance("MD5");
           byte[] buffer = new byte[8192];
           int len;
           while ((len = fis.read(buffer)) != -1) {
               md5.update(buffer, 0, len);
           }
           byte[] md5Bytes = md5.digest();
           // 转16进制字符串
           StringBuilder sb = new StringBuilder();
           for (byte b : md5Bytes) {
               sb.append(String.format("%02x", b));
           }
           return sb.toString();
       } catch (NoSuchAlgorithmException e) {
           log.error("MD5算法不存在", e);
           throw new RuntimeException("计算文件MD5失败", e);
       }
   }
}

3.4.2 分布式锁工具类(Redisson)

package com.jam.demo.util;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
* Redisson分布式锁工具类
* @author ken
* @date 2025/12/04
*/

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonLockUtil {

   private final RedissonClient redissonClient;

   /**
    * 获取分布式锁
    * @param lockKey 锁键
    * @param waitTime 等待时间(秒)
    * @param leaseTime 持有时间(秒)
    * @return 锁对象
    */

   public RLock lock(String lockKey, long waitTime, long leaseTime) {
       if (!StringUtils.hasText(lockKey)) {
           throw new IllegalArgumentException("锁键不能为空");
       }
       RLock lock = redissonClient.getLock(lockKey);
       try {
           boolean locked = lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
           if (locked) {
               log.debug("获取分布式锁成功:{}", lockKey);
               return lock;
           } else {
               log.warn("获取分布式锁失败:{}", lockKey);
               return null;
           }
       } catch (InterruptedException e) {
           log.error("获取分布式锁异常", e);
           Thread.currentThread().interrupt();
           return null;
       }
   }

   /**
    * 释放分布式锁
    * @param lock 锁对象
    */

   public void unlock(RLock lock) {
       if (ObjectUtils.isEmpty(lock)) {
           return;
       }
       try {
           if (lock.isHeldByCurrentThread()) {
               lock.unlock();
               log.debug("释放分布式锁成功:{}", lock.getName());
           }
       } catch (Exception e) {
           log.error("释放分布式锁异常", e);
       }
   }
}

3.5 Mapper层(MyBatisPlus)

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.VideoUploadChunk;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
* 视频分片上传状态Mapper
* @author ken
* @date 2025/12/04
*/

@Repository
public interface VideoUploadChunkMapper extends BaseMapper<VideoUploadChunk> {

   /**
    * 根据文件MD5查询已上传的分片数
    * @param fileMd5 文件MD5
    * @return 已上传分片数
    */

   Integer countUploadedChunkByFileMd5(@Param("fileMd5") String fileMd5);

   /**
    * 根据文件MD5和分片序号更新上传状态
    * @param fileMd5 文件MD5
    * @param chunkNum 分片序号
    * @param status 上传状态
    * @return 影响行数
    */

   int updateChunkStatus(@Param("fileMd5") String fileMd5, @Param("chunkNum") Integer chunkNum, @Param("status") Integer status);

   /**
    * 根据文件MD5查询所有分片信息
    * @param fileMd5 文件MD5
    * @return 分片列表
    */

   List<VideoUploadChunk> listByFileMd5(@Param("fileMd5") String fileMd5);
}

对应的 VideoUploadChunkMapper.xml(resources/mapper/VideoUploadChunkMapper.xml):

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.demo.mapper.VideoUploadChunkMapper">

   <select id="countUploadedChunkByFileMd5" resultType="java.lang.Integer">
       SELECT COUNT(*) FROM video_upload_chunk
       WHERE file_md5 = #{fileMd5} AND upload_status = 1
   </select>

   <update id="updateChunkStatus">
       UPDATE video_upload_chunk
       SET upload_status = #{status}, update_time = NOW()
       WHERE file_md5 = #{fileMd5} AND chunk_num = #{chunkNum}
   </update>

   <select id="listByFileMd5" resultType="com.jam.demo.entity.VideoUploadChunk">
       SELECT * FROM video_upload_chunk
       WHERE file_md5 = #{fileMd5}
       ORDER BY chunk_num ASC
   </select>

</mapper>

3.6 核心服务层(异步+分片+OSS+断点续传)

3.6.1 服务接口

package com.jam.demo.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.jam.demo.entity.VideoUploadChunk;
import com.jam.demo.listener.VideoUploadProgressListener;
import org.springframework.web.multipart.MultipartFile;

import java.util.concurrent.CompletableFuture;

/**
* 视频上传服务接口
* @author ken
* @date 2025/12/04
*/

public interface VideoUploadService extends IService<VideoUploadChunk> {

   /**
    * 异步上传视频文件(支持分片、断点续传、进度回调)
    * @param file 视频文件
    * @param listener 上传进度监听器
    * @return CompletableFuture<String> 上传后的OSS地址
    */

   CompletableFuture<String> uploadVideoAsync(MultipartFile file, VideoUploadProgressListener listener);

   /**
    * 查询文件上传进度
    * @param fileMd5 文件MD5
    * @return 进度信息(已上传分片数/总分片数/进度百分比)
    */

   com.jam.demo.dto.VideoUploadProgressDTO getUploadProgress(String fileMd5);
}

3.6.2 服务实现类(核心逻辑)

package com.jam.demo.service.impl;

import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.UploadPartRequest;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.jam.demo.dto.VideoUploadProgressDTO;
import com.jam.demo.entity.VideoUploadChunk;
import com.jam.demo.listener.VideoUploadProgressListener;
import com.jam.demo.mapper.VideoUploadChunkMapper;
import com.jam.demo.service.VideoUploadService;
import com.jam.demo.util.FileMd5Util;
import com.jam.demo.util.RedissonLockUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
* 视频上传服务实现类(集成阿里云OSS、分片上传、断点续传、分布式锁)
* @author ken
* @date 2025/12/04
*/

@Slf4j
@Service
@RequiredArgsConstructor
public class VideoUploadServiceImpl extends ServiceImpl<VideoUploadChunkMapper, VideoUploadChunk> implements VideoUploadService {

   // 注入自定义线程池
   private final Executor videoUploadExecutor;
   private final FileMd5Util fileMd5Util;
   private final RedissonLockUtil redissonLockUtil;

   // 阿里云OSS配置
   @Value("${aliyun.oss.endpoint}")
   private String ossEndpoint;
   @Value("${aliyun.oss.access-key-id}")
   private String ossAccessKeyId;
   @Value("${aliyun.oss.access-key-secret}")
   private String ossAccessKeySecret;
   @Value("${aliyun.oss.bucket-name}")
   private String ossBucketName;
   @Value("${aliyun.oss.base-path}")
   private String ossBasePath;

   // 视频上传配置
   @Value("${video.upload.chunk-size}")
   private long chunkSize; // 分片大小(字节)
   @Value("${video.upload.max-size}")
   private long maxSize; // 最大文件大小(字节)
   @Value("${video.upload.supported-formats}")
   private String supportedFormats; // 支持的视频格式

   /**
    * 异步上传视频文件(核心方法)
    * @param file 视频文件
    * @param listener 上传进度监听器
    * @return CompletableFuture<String> OSS地址
    */

   @Override
   @Async("videoUploadExecutor")
   public CompletableFuture<String> uploadVideoAsync(MultipartFile file, VideoUploadProgressListener listener) {
       // 1. 前置校验
       try {
           validateFile(file);
       } catch (IllegalArgumentException e) {
           String fileMd5 = getFileMd5Safely(file);
           listener.onFailure(fileMd5, e.getMessage());
           return CompletableFuture.failedFuture(e);
       }

       // 2. 计算文件MD5(唯一标识)
       String fileMd5;
       try {
           fileMd5 = fileMd5Util.calculateFileMd5(convertMultipartFileToFile(file));
       } catch (IOException e) {
           log.error("计算文件MD5失败", e);
           listener.onFailure("", "计算文件MD5失败:" + e.getMessage());
           return CompletableFuture.failedFuture(e);
       }

       // 3. 分布式锁:防止同一文件重复上传(锁键=fileMd5)
       String lockKey = "video_upload_" + fileMd5;
       RLock lock = redissonLockUtil.lock(lockKey, 5, 30);
       if (ObjectUtils.isEmpty(lock)) {
           listener.onFailure(fileMd5, "获取分布式锁失败,文件正在上传中");
           return CompletableFuture.failedFuture(new RuntimeException("文件正在上传中,请稍后再试"));
       }

       // 4. 核心上传逻辑
       try {
           return doUploadVideo(file, fileMd5, listener);
       } catch (Exception e) {
           log.error("视频上传异常", e);
           listener.onFailure(fileMd5, "上传失败:" + e.getMessage());
           return CompletableFuture.failedFuture(e);
       } finally {
           // 释放分布式锁
           redissonLockUtil.unlock(lock);
       }
   }

   /**
    * 执行视频上传核心逻辑
    * @param file 视频文件
    * @param fileMd5 文件MD5
    * @param listener 进度监听器
    * @return CompletableFuture<String> OSS地址
    */

   private CompletableFuture<String> doUploadVideo(MultipartFile file, String fileMd5, VideoUploadProgressListener listener) {
       try {
           // 4.1 获取文件基本信息
           String fileName = file.getOriginalFilename();
           String fileExt = getFileExtension(fileName);
           long fileSize = file.getSize();
           int totalChunk = (int) Math.ceil((double) fileSize / chunkSize);

           // 4.2 初始化分片上传状态(数据库)
           initChunkStatus(fileMd5, fileName, fileExt, fileSize, totalChunk);

           // 4.3 初始化阿里云OSS分片上传
           String objectKey = ossBasePath + fileMd5 + "." + fileExt;
           OSS ossClient = new OSSClientBuilder().build(ossEndpoint, ossAccessKeyId, ossAccessKeySecret);
           InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(ossBucketName, objectKey);
           InitiateMultipartUploadResult initResult = ossClient.initiateMultipartUpload(initRequest);
           String uploadId = initResult.getUploadId();

           // 4.4 查询已上传的分片(断点续传核心)
           List<VideoUploadChunk> uploadedChunks = baseMapper.listByFileMd5(fileMd5);
           Set<Integer> uploadedChunkNums = uploadedChunks.stream()
                   .filter(chunk -> chunk.getUploadStatus() == 1)
                   .map(VideoUploadChunk::getChunkNum)
                   .collect(Collectors.toSet());

           // 4.5 构建待上传分片列表
           List<Integer> toUploadChunkNums = Lists.newArrayList();
           for (int i = 1; i <= totalChunk; i++) {
               if (!uploadedChunkNums.contains(i)) {
                   toUploadChunkNums.add(i);
               }
           }

           // 4.6 无待上传分片:直接返回OSS地址
           if (CollectionUtils.isEmpty(toUploadChunkNums)) {
               String ossUrl = "https://" + ossBucketName + "." + ossEndpoint + "/" + objectKey;
               listener.onComplete(fileMd5, ossUrl);
               return CompletableFuture.completedFuture(ossUrl);
           }

           // 4.7 异步上传所有待传分片
           CountDownLatch latch = new CountDownLatch(toUploadChunkNums.size());
           Map<Integer, PartETag> partETagMap = Maps.newConcurrentMap();

           for (Integer chunkNum : toUploadChunkNums) {
               CompletableFuture.runAsync(() -> {
                   try {
                       uploadSingleChunk(file, ossClient, uploadId, objectKey, chunkNum, fileMd5, partETagMap);
                       // 更新进度
                       updateProgress(fileMd5, totalChunk, listener);
                   } catch (Exception e) {
                       log.error("上传分片{}失败", chunkNum, e);
                       throw new RuntimeException("分片" + chunkNum + "上传失败", e);
                   } finally {
                       latch.countDown();
                   }
               }, videoUploadExecutor);
           }

           // 4.8 等待所有分片上传完成
           try {
               latch.await();
           } catch (InterruptedException e) {
               log.error("等待分片上传完成被中断", e);
               Thread.currentThread().interrupt();
               throw new RuntimeException("上传被中断", e);
           }

           // 4.9 合并分片
           List<PartETag> partETags = Lists.newArrayList(partETagMap.values());
           CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
                   ossBucketName, objectKey, uploadId, partETags);
           ossClient.completeMultipartUpload(completeRequest);
           ossClient.shutdown();

           // 4.10 更新数据库状态(合并完成)
           baseMapper.updateChunkStatus(fileMd5, null, 2);

           // 4.11 回调上传完成
           String ossUrl = "https://" + ossBucketName + "." + ossEndpoint + "/" + objectKey;
           listener.onComplete(fileMd5, ossUrl);

           return CompletableFuture.completedFuture(ossUrl);
       } catch (Exception e) {
           log.error("执行视频上传逻辑失败", e);
           listener.onFailure(fileMd5, e.getMessage());
           return CompletableFuture.failedFuture(e);
       }
   }

   /**
    * 上传单个分片到阿里云OSS
    * @param file 视频文件
    * @param ossClient OSS客户端
    * @param uploadId OSS分片上传ID
    * @param objectKey OSS对象Key
    * @param chunkNum 分片序号
    * @param fileMd5 文件MD5
    * @param partETagMap 分片ETag映射
    */

   private void uploadSingleChunk(MultipartFile file, OSS ossClient, String uploadId, String objectKey,
                                  Integer chunkNum, String fileMd5, Map<Integer, PartETag> partETagMap)
throws IOException {
       // 计算分片起始/结束位置
       long start = (chunkNum - 1) * chunkSize;
       long end = Math.min(start + chunkSize, file.getSize());
       long currentChunkSize = end - start;

       // 读取分片数据
       File tempFile = convertMultipartFileToFile(file);
       try (InputStream inputStream = new FileInputStream(tempFile)) {
           inputStream.skip(start); // 跳过已读部分

           // 构建OSS分片上传请求
           UploadPartRequest uploadRequest = new UploadPartRequest();
           uploadRequest.setBucketName(ossBucketName);
           uploadRequest.setKey(objectKey);
           uploadRequest.setUploadId(uploadId);
           uploadRequest.setInputStream(inputStream);
           uploadRequest.setPartSize(currentChunkSize);
           uploadRequest.setPartNumber(chunkNum);

           // 上传分片
           PartETag partETag = ossClient.uploadPart(uploadRequest).getPartETag();
           partETagMap.put(chunkNum, partETag);

           // 更新数据库分片状态
           baseMapper.updateChunkStatus(fileMd5, chunkNum, 1);
           log.info("文件{}分片{}上传成功,大小:{}字节", fileMd5, chunkNum, currentChunkSize);
       } finally {
           // 删除临时文件
           if (tempFile.exists()) {
               boolean deleted = tempFile.delete();
               if (!deleted) {
                   log.warn("删除临时文件失败:{}", tempFile.getAbsolutePath());
               }
           }
       }
   }

   /**
    * 初始化分片上传状态(数据库)
    * @param fileMd5 文件MD5
    * @param fileName 文件名
    * @param fileExt 文件扩展名
    * @param fileSize 文件总大小
    * @param totalChunk 总分片数
    */

   private void initChunkStatus(String fileMd5, String fileName, String fileExt, long fileSize, int totalChunk) {
       // 查询是否已初始化
       LambdaQueryWrapper<VideoUploadChunk> queryWrapper = new LambdaQueryWrapper<>();
       queryWrapper.eq(VideoUploadChunk::getFileMd5, fileMd5);
       List<VideoUploadChunk> existChunks = baseMapper.selectList(queryWrapper);

       if (CollectionUtils.isEmpty(existChunks)) {
           // 未初始化:批量插入分片状态
           List<VideoUploadChunk> chunks = Lists.newArrayList();
           for (int i = 1; i <= totalChunk; i++) {
               VideoUploadChunk chunk = new VideoUploadChunk();
               chunk.setFileMd5(fileMd5);
               chunk.setChunkNum(i);
               chunk.setChunkSize(Math.min(chunkSize, fileSize - (i - 1) * chunkSize));
               chunk.setTotalSize(fileSize);
               chunk.setTotalChunk(totalChunk);
               chunk.setFileName(fileName);
               chunk.setFileExt(fileExt);
               chunk.setUploadStatus(0); // 未上传
               chunks.add(chunk);
           }
           saveBatch(chunks);
           log.info("文件{}分片状态初始化完成,总分片数:{}", fileMd5, totalChunk);
       }
   }

   /**
    * 更新上传进度并回调
    * @param fileMd5 文件MD5
    * @param totalChunk 总分片数
    * @param listener 进度监听器
    */

   private void updateProgress(String fileMd5, int totalChunk, VideoUploadProgressListener listener) {
       Integer uploadedChunkNum = baseMapper.countUploadedChunkByFileMd5(fileMd5);
       int progress = (int) (((double) uploadedChunkNum / totalChunk) * 100);

       VideoUploadProgressDTO progressDTO = new VideoUploadProgressDTO();
       progressDTO.setFileMd5(fileMd5);
       progressDTO.setUploadedChunkNum(uploadedChunkNum);
       progressDTO.setTotalChunkNum(totalChunk);
       progressDTO.setProgress(progress);
       progressDTO.setStatus(0); // 上传中

       listener.onProgressUpdate(progressDTO);
   }

   /**
    * 校验上传文件(格式、大小)
    * @param file 上传文件
    */

   private void validateFile(MultipartFile file) {
       // 校验文件是否为空
       if (ObjectUtils.isEmpty(file) || file.isEmpty()) {
           throw new IllegalArgumentException("上传文件不能为空");
       }

       // 校验文件大小
       long fileSize = file.getSize();
       if (fileSize > maxSize) {
           throw new IllegalArgumentException("文件大小超出限制(最大" + maxSize / 1024 / 1024 + "MB)");
       }

       // 校验文件格式
       String fileName = file.getOriginalFilename();
       if (!StringUtils.hasText(fileName)) {
           throw new IllegalArgumentException("文件名不能为空");
       }
       String fileExt = getFileExtension(fileName);
       Set<String> supportFormats = Lists.newArrayList(supportedFormats.split(",")).stream()
               .map(String::trim)
               .collect(Collectors.toSet());
       if (!supportFormats.contains(fileExt.toLowerCase())) {
           throw new IllegalArgumentException("不支持的视频格式:" + fileExt + ",支持格式:" + supportedFormats);
       }

       // 校验文件MIME类型(防止扩展名伪造)
       String contentType = file.getContentType();
       if (!StringUtils.hasText(contentType) || !contentType.startsWith("video/")) {
           throw new IllegalArgumentException("文件不是有效视频文件");
       }
   }

   /**
    * 获取文件扩展名
    * @param fileName 文件名
    * @return 扩展名(小写)
    */

   private String getFileExtension(String fileName) {
       if (!fileName.contains(".")) {
           return "";
       }
       return fileName.substring(fileName.lastIndexOf(".") + 1).toLowerCase();
   }

   /**
    * 将MultipartFile转换为File
    * @param multipartFile 上传文件
    * @return File
    * @throws IOException IO异常
    */

   private File convertMultipartFileToFile(MultipartFile multipartFile) throws IOException {
       File tempFile = File.createTempFile("video_upload_", "." + getFileExtension(multipartFile.getOriginalFilename()));
       multipartFile.transferTo(tempFile);
       tempFile.deleteOnExit(); // JVM退出时删除
       return tempFile;
   }

   /**
    * 安全获取文件MD5(异常时返回空)
    * @param file 上传文件
    * @return 文件MD5
    */

   private String getFileMd5Safely(MultipartFile file) {
       try {
           return fileMd5Util.calculateFileMd5(convertMultipartFileToFile(file));
       } catch (Exception e) {
           return "";
       }
   }

   /**
    * 查询文件上传进度
    * @param fileMd5 文件MD5
    * @return 进度DTO
    */

   @Override
   public VideoUploadProgressDTO getUploadProgress(String fileMd5) {
       if (!StringUtils.hasText(fileMd5)) {
           throw new IllegalArgumentException("文件MD5不能为空");
       }

       // 查询总分片数
       LambdaQueryWrapper<VideoUploadChunk> queryWrapper = new LambdaQueryWrapper<>();
       queryWrapper.eq(VideoUploadChunk::getFileMd5, fileMd5);
       queryWrapper.last("LIMIT 1");
       VideoUploadChunk chunk = baseMapper.selectOne(queryWrapper);
       if (ObjectUtils.isEmpty(chunk)) {
           throw new IllegalArgumentException("文件不存在或未开始上传");
       }

       int totalChunk = chunk.getTotalChunk();
       Integer uploadedChunkNum = baseMapper.countUploadedChunkByFileMd5(fileMd5);
       int progress = (int) (((double) uploadedChunkNum / totalChunk) * 100);

       VideoUploadProgressDTO progressDTO = new VideoUploadProgressDTO();
       progressDTO.setFileMd5(fileMd5);
       progressDTO.setUploadedChunkNum(uploadedChunkNum);
       progressDTO.setTotalChunkNum(totalChunk);
       progressDTO.setProgress(progress);

       // 判断状态:合并完成=1,上传中=0,失败=2
       LambdaQueryWrapper<VideoUploadChunk> statusQuery = new LambdaQueryWrapper<>();
       statusQuery.eq(VideoUploadChunk::getFileMd5, fileMd5);
       statusQuery.eq(VideoUploadChunk::getUploadStatus, 2);
       long completedCount = baseMapper.selectCount(statusQuery);
       if (completedCount > 0) {
           progressDTO.setStatus(1); // 完成
       } else if (uploadedChunkNum < totalChunk) {
           progressDTO.setStatus(0); // 上传中
       } else {
           progressDTO.setStatus(2); // 失败(总分片数=已上传但未合并)
       }

       return progressDTO;
   }
}

3.7 控制器(Swagger3注解)

package com.jam.demo.controller;

import com.jam.demo.dto.VideoUploadProgressDTO;
import com.jam.demo.listener.DefaultVideoUploadProgressListener;
import com.jam.demo.service.VideoUploadService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import java.util.concurrent.CompletableFuture;

/**
* 视频上传控制器
* @author ken
* @date 2025/12/04
*/

@Slf4j
@RestController
@RequestMapping("/api/video")
@RequiredArgsConstructor
@Tag(name = "视频上传接口", description = "异步多线程视频上传(分片+断点续传+进度回调)")
public class VideoUploadController {

   private final VideoUploadService videoUploadService;
   private final DefaultVideoUploadProgressListener defaultProgressListener;

   /**
    * 异步上传视频文件
    * @param file 视频文件
    * @return ResponseEntity<String> 上传结果
    */

   @PostMapping("/upload")
   @Operation(summary = "视频上传", description = "异步多线程分片上传视频,支持断点续传和进度回调")
   public ResponseEntity<String> uploadVideo(
           @Parameter(description = "视频文件(支持mp4/avi/mov/mkv,最大500MB)", required = true)

           @RequestParam("file") MultipartFile file) {
       try {
           CompletableFuture<String> future = videoUploadService.uploadVideoAsync(file, defaultProgressListener);
           // 异步结果回调(非阻塞)
           future.whenComplete((ossUrl, ex) -> {
               if (ex != null) {
                   log.error("视频上传最终失败", ex);
               } else {
                   log.info("视频上传最终成功,OSS地址:{}", ossUrl);
               }
           });
           return ResponseEntity.ok("视频上传任务已提交,文件MD5:" + getFileMd5(file));
       } catch (Exception e) {
           log.error("提交视频上传任务失败", e);
           return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("上传失败:" + e.getMessage());
       }
   }

   /**
    * 查询上传进度
    * @param fileMd5 文件MD5
    * @return ResponseEntity<VideoUploadProgressDTO> 进度信息
    */

   @GetMapping("/progress")
   @Operation(summary = "查询上传进度", description = "根据文件MD5查询视频上传进度")
   public ResponseEntity<VideoUploadProgressDTO> getUploadProgress(
           @Parameter(description = "文件MD5", required = true)

           @RequestParam("fileMd5") String fileMd5) {
       try {
           VideoUploadProgressDTO progressDTO = videoUploadService.getUploadProgress(fileMd5);
           return ResponseEntity.ok(progressDTO);
       } catch (Exception e) {
           log.error("查询上传进度失败", e);
           return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
       }
   }

   /**
    * 安全获取文件MD5
    * @param file 上传文件
    * @return MD5值
    */

   private String getFileMd5(MultipartFile file) {
       try {
           return new com.jam.demo.util.FileMd5Util().calculateFileMd5(
                   com.jam.demo.service.impl.VideoUploadServiceImpl.class.getDeclaredMethod(
                           "convertMultipartFileToFile", MultipartFile.class)
                           .invoke(null, file))
;
       } catch (Exception e) {
           return "";
       }
   }
}

3.8 启动类

package com.jam.demo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

/**
* 视频上传Demo启动类
* @author ken
* @date 2025/12/04
*/

@SpringBootApplication
@EnableAsync
@MapperScan("com.jam.demo.mapper")
public class VideoUploadDemoApplication {

   public static void main(String[] args) {
       SpringApplication.run(VideoUploadDemoApplication.class, args);
   }
}

四、企业级扩展:日志与监控

4.1 日志配置(Logback)

创建 logback-spring.xml(resources/logback-spring.xml):

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
   <contextName>video-upload-demo</contextName>
   <!-- 日志输出格式 -->
   <property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
   <!-- 日志存储路径 -->
   <property name="LOG_PATH" value="./logs/video-upload"/>

   <!-- 控制台输出 -->
   <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
       <encoder>
           <pattern>${LOG_PATTERN}</pattern>
           <charset>UTF-8</charset>
       </encoder>
   </appender>

   <!-- 文件输出(按天分割) -->
   <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
       <file>${LOG_PATH}/video-upload.log</file>
       <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
           <fileNamePattern>${LOG_PATH}/video-upload.%d{yyyy-MM-dd}.log</fileNamePattern>
           <maxHistory>30</maxHistory>
           <totalSizeCap>10GB</totalSizeCap>
       </rollingPolicy>
       <encoder>
           <pattern>${LOG_PATTERN}</pattern>
           <charset>UTF-8</charset>
       </encoder>
   </appender>

   <!-- 错误日志输出 -->
   <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
       <file>${LOG_PATH}/video-upload-error.log</file>
       <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
           <fileNamePattern>${LOG_PATH}/video-upload-error.%d{yyyy-MM-dd}.log</fileNamePattern>
           <maxHistory>30</maxHistory>
           <totalSizeCap>10GB</totalSizeCap>
       </rollingPolicy>
       <encoder>
           <pattern>${LOG_PATTERN}</pattern>
           <charset>UTF-8</charset>
       </encoder>
       <filter class="ch.qos.logback.classic.filter.LevelFilter">
           <level>ERROR</level>
           <onMatch>ACCEPT</onMatch>
           <onMismatch>DENY</onMismatch>
       </filter>
   </appender>

   <!-- 根日志级别 -->
   <root level="INFO">
       <appender-ref ref="CONSOLE"/>
       <appender-ref ref="FILE"/>
       <appender-ref ref="ERROR_FILE"/>
   </root>

   <!-- 自定义包日志级别 -->
   <logger name="com.jam.demo" level="DEBUG" additivity="false">
       <appender-ref ref="CONSOLE"/>
       <appender-ref ref="FILE"/>
       <appender-ref ref="ERROR_FILE"/>
   </logger>

   <!-- 阿里云OSS日志级别 -->
   <logger name="com.aliyun.oss" level="WARN" additivity="false">
       <appender-ref ref="CONSOLE"/>
       <appender-ref ref="ERROR_FILE"/>
   </logger>
</configuration>

4.2 监控配置(Prometheus)

添加监控指标配置类,统计上传成功率、耗时等核心指标:

package com.jam.demo.config;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 监控指标配置类
* @author ken
* @date 2025/12/04
*/

@Configuration
public class MetricsConfig {

   /**
    * 视频上传成功计数器
    */

   @Bean
   public Counter videoUploadSuccessCounter(MeterRegistry registry) {
       return registry.counter("video.upload.success.count");
   }

   /**
    * 视频上传失败计数器
    */

   @Bean
   public Counter videoUploadFailureCounter(MeterRegistry registry) {
       return registry.counter("video.upload.failure.count");
   }

   /**
    * 视频上传耗时计时器
    */

   @Bean
   public Timer videoUploadTimer(MeterRegistry registry) {
       return registry.timer("video.upload.duration.seconds");
   }
}

修改 VideoUploadServiceImpl,集成监控指标:

// 注入监控指标
private final Counter videoUploadSuccessCounter;
private final Counter videoUploadFailureCounter;
private final Timer videoUploadTimer;

// 在doUploadVideo方法中添加耗时统计
@Override
@Async("videoUploadExecutor")
public CompletableFuture<String> uploadVideoAsync(MultipartFile file, VideoUploadProgressListener listener) {
   return CompletableFuture.supplyAsync(() -> {
       Timer.Sample sample = Timer.start(videoUploadTimer.getRegistry());
       try {
           // 原有上传逻辑
           String ossUrl = doUploadVideo(file, listener).join();
           videoUploadSuccessCounter.increment();
           return ossUrl;
       } catch (Exception e) {
           videoUploadFailureCounter.increment();
           throw new RuntimeException(e);
       } finally {
           sample.stop(videoUploadTimer);
       }
   }, videoUploadExecutor);
}

启动项目后,访问 http://localhost:8080/actuator/prometheus 即可查看监控指标,可对接Grafana可视化展示。

五、核心技术点辨析(易混淆点)

5.1 CompletableFuture的supplyAsync vs runAsync

  • supplyAsync:适合有返回值的异步任务(如上传文件返回OSS地址);
  • runAsync:适合无返回值的异步任务(如日志记录、进度推送);
  • 两者均需指定自定义线程池,否则使用默认的ForkJoinPool.commonPool(),易导致线程耗尽。

5.2 分布式锁的过期时间设置

  • 等待时间(waitTime):设为5秒,避免客户端长时间等待;
  • 持有时间(leaseTime):设为30秒,需大于单个分片上传耗时,防止锁提前释放;
  • 释放时机:必须在finally块中释放,避免死锁。

5.3 分片上传的合并逻辑

  • 阿里云OSS的分片合并需按分片序号排序,否则会合并失败;
  • 合并前需确保所有分片上传完成,通过CountDownLatch等待所有分片任务结束。

5.4 断点续传的核心

  • 基于文件MD5唯一标识文件,避免重复上传;
  • 数据库记录每个分片的上传状态,上传前查询已完成分片,仅传未完成的。

六、测试与验证

6.1 启动项目

  1. 配置MySQL、Redis、阿里云OSS参数(修改application.yml);
  2. 执行VideoUploadDemoApplication启动项目;
  3. 访问http://localhost:8080/swagger-ui/index.html,通过Swagger3测试接口:
  • 上传视频:调用/api/video/upload接口,上传mp4文件;
  • 查询进度:调用/api/video/progress接口,传入文件MD5,查看进度。

6.2 验证核心功能

  1. 异步上传:上传大文件时,接口立即返回“任务已提交”,后台线程池异步处理;
  2. 分片上传:查看阿里云OSS控制台,可见分片文件(上传中),合并后生成完整文件;
  3. 断点续传:中断上传后,重新上传同一文件,仅上传未完成的分片;
  4. 分布式锁:多客户端同时上传同一文件,仅一个客户端能获取锁,避免重复上传;
  5. 监控指标:访问http://localhost:8080/actuator/prometheus,可见video_upload_success_countvideo_upload_duration_seconds等指标。

七、总结

本文基于Java 17实现了企业级的异步多线程视频上传方案,核心是自定义IO密集型线程池 + CompletableFuture异步编程 + 分片上传优化,并扩展了阿里云OSS集成、进度回调、断点续传、分布式锁、日志监控等关键特性。

核心要点回顾

  1. IO密集型线程池配置:核心线程数=CPU核心数2,最大线程数=CPU核心数4,拒绝策略设为CallerRunsPolicy;
  2. 异步编程:使用CompletableFuture实现非阻塞上传,指定自定义线程池避免资源耗尽;
  3. 分片上传:大文件拆分为10MB分片,异步并行上传,提升速度和稳定性;
  4. 企业级特性:
  • 断点续传:基于文件MD5和数据库分片状态,中断后仅传未完成分片;
  • 分布式锁:Redisson防止多节点重复上传;
  • 进度回调:通过监听器实时返回上传进度;
  • 监控告警:Prometheus统计上传成功率、耗时,便于运维监控。

该方案可应用于生产环境,适配短视频、在线教育、企业网盘等各类视频上传场景,兼顾性能、稳定性和可维护性。

目录
相关文章
|
7天前
|
Java API 开发者
深入解析Java Stream API:为何要避免在forEach中执行复杂操作
深入解析Java Stream API:为何要避免在forEach中执行复杂操作
197 116
|
17天前
|
消息中间件 Java Shell
RocketMQ集群部署与快速入门全解密:从原理到实战,万字干货吃透消息中间件
本文详解Apache RocketMQ核心概念、多Master多Slave集群部署及Java实战,涵盖NameServer、Broker配置、消息收发、事务消息与故障排查,助你掌握分布式消息系统搭建与应用。
196 2
|
25天前
|
人工智能 前端开发 数据挖掘
AI学习全景图:从大模型到RAG,从工具到变现,一条从0到1的路线
告别碎片化学习!本文系统梳理AI知识五层结构:从基础认知到商业变现,提供完整学习路径与优质资源链接。帮你构建AI知识网络,实现从工具使用到能力落地的跃迁。
566 2
|
26天前
|
机器学习/深度学习 SQL 关系型数据库
TRUNCATE、DELETE、DROP 的区别?
MySQL中DELETE、TRUNCATE和DROP均用于删除数据,但作用不同:DELETE删除行记录,支持WHERE条件和事务回滚,速度慢;TRUNCATE快速清空表并重置自增ID,不可回滚;DROP则彻底删除表结构与数据,操作不可逆。三者在日志记录、速度及功能上有显著差异。
265 0
|
20天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
1018 59
Meta SAM3开源:让图像分割,听懂你的话
|
14天前
|
关系型数据库 MySQL Java
【Java架构师体系课 | MySQL篇】⑦ 深入理解MySQL事务隔离级别与锁机制
本文深入讲解数据库事务隔离级别与锁机制,涵盖ACID特性、并发问题(脏读、不可重复读、幻读)、四种隔离级别对比及MVCC原理,分析表锁、行锁、间隙锁、临键锁等机制,并结合实例演示死锁处理与优化策略,帮助理解数据库并发控制核心原理。
134 4
|
10天前
|
数据采集 人工智能 数据可视化
2025年数据中台系统选型指南:热门推荐与能力盘点
2025年,数据中台成为企业数字化转型核心。本文深度解析瓴羊Dataphin、腾讯云WeData、华为云DataArts Studio、网易数帆EasyData及Talend Data Fabric五大主流平台,从产品能力、行业实践、技术优势等维度对比分析,助力企业实现数据标准化、治理高效化与消费便捷化,释放数据价值,提升竞争力。
|
1月前
|
数据采集 人工智能 自然语言处理
大模型微调「数据集构建」保姆级教程(超全)
2024年是“行业大模型元年”,但超80%微调失败源于数据问题。本文揭示从数据收集、清洗到增强的全流程方法论,强调“数据优先”而非“算法崇拜”,结合实战案例与工具推荐,助你构建高质量数据集,真正释放大模型业务价值。
780 2
大模型微调「数据集构建」保姆级教程(超全)
|
18天前
|
机器学习/深度学习 人工智能 自然语言处理
基于 NLP 与深度学习的智能面试训练系统:「模拟面试」APP 技术实现解析
本文详解「模拟面试」APP 的核心技术实现,涵盖 NLP 简历解析、AI 面试评估模型、多模态交互等模块,基于 PyTorch、spaCy、BERT 等技术栈,提供可落地的算法代码与系统架构设计,聚焦技术细节,助力招聘场景智能化升级。