Просмотр исходного кода

1、视频切片加入异步处理:使用线程池管理转码任务,避免阻塞请求线程

liyanbo 4 месяцев назад
Родитель
Сommit
85a14c1e83

+ 106 - 42
byzs-module-infra/src/main/java/cn/iocoder/byzs/module/infra/framework/file/core/client/local/LocalFileClient.java

@@ -1,14 +1,13 @@
 package cn.iocoder.byzs.module.infra.framework.file.core.client.local;
 
 import cn.hutool.core.io.FileUtil;
-import cn.hutool.core.io.IoUtil;
 import cn.iocoder.byzs.module.infra.framework.file.core.client.AbstractFileClient;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * 本地文件客户端
@@ -17,6 +16,24 @@ import java.util.List;
  */
 public class LocalFileClient extends AbstractFileClient<LocalFileClientConfig> {
 
+    // 线程池用于异步处理视频转码
+    private static final ExecutorService TRANSCODE_EXECUTOR = new ThreadPoolExecutor(
+            1, // 核心线程数,根据服务器CPU核心数调整
+            Runtime.getRuntime().availableProcessors() / 2, // 最大线程数,使用一半的CPU核心
+            60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(10), // 任务队列,限制等待任务数
+            new ThreadFactory() {
+                private final AtomicInteger counter = new AtomicInteger(0);
+                @Override
+                public Thread newThread(Runnable r) {
+                    Thread thread = new Thread(r, "video-transcode-" + counter.incrementAndGet());
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:当队列满时,由调用者执行
+    );
+
     public LocalFileClient(Long id, LocalFileClientConfig config) {
         super(id, config);
     }
@@ -27,52 +44,99 @@ public class LocalFileClient extends AbstractFileClient<LocalFileClientConfig> {
 
     @Override
     public String ffmpegHts(String path) {
-        String pathMp4= getFilePath(path);
+        // 将在lambda表达式中使用的变量声明为final
+        final String finalPath = path;
+        final String pathMp4 = getFilePath(path);
         String filePathName = pathMp4.substring(0, pathMp4.lastIndexOf("."));
         String fileName = filePathName.substring(filePathName.lastIndexOf("/"));
-        String pathM3u8 = filePathName + fileName;
-        String pathM3u8Name = pathM3u8 + ".m3u8";
+        final String pathM3u8 = filePathName + fileName;
+        final String pathM3u8Name = pathM3u8 + ".m3u8";
+
+        // 创建目录
         new File(filePathName).mkdirs();
-        try {
-            System.out.println("PATH环境变量: " + System.getenv("PATH"));
-            // 构建FFmpeg命令进行HLS切片
-            ProcessBuilder processBuilder = new ProcessBuilder(
-                    "ffmpeg", "-i", pathMp4,
-                    "-c:v", "h264", "-c:a", "aac",
-                    "-hls_time", "10", "-hls_list_size", "0",
-                    "-hls_segment_filename",  pathM3u8 + "_%03d.ts",
-                    pathM3u8Name
-            );
-
-            // 重定向错误流到输入流(便于捕获FFmpeg输出信息)
-            processBuilder.redirectErrorStream(true);
-
-            // 启动进程
-            Process process = processBuilder.start();
-
-            // 读取FFmpeg输出日志
-            try (BufferedReader reader = new BufferedReader(
-                    new InputStreamReader(process.getInputStream()))) {
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    System.out.println("FFmpeg输出: " + line);
+
+        // 提交转码任务到线程池
+        Future<String> future = TRANSCODE_EXECUTOR.submit(() -> {
+            try {
+                // 构建FFmpeg命令,添加资源限制参数
+                ProcessBuilder processBuilder = new ProcessBuilder(
+                        "ffmpeg", "-i", pathMp4,
+                        "-c:v", "h264", "-c:a", "aac",
+                        "-threads", "2", // 限制使用2个线程,根据需要调整
+                        "-preset", "medium", // 编码预设
+                        "-crf", "24", // 视频质量控制
+                        "-hls_time", "10", "-hls_list_size", "0",
+                        "-hls_segment_filename", pathM3u8 + "_%03d.ts",
+                        pathM3u8Name
+                );
+
+                // 设置进程优先级(Linux/Mac)
+                try {
+                    if (System.getProperty("os.name").toLowerCase().contains("win")) {
+                        // Windows系统设置优先级的方式不同
+                    } else {
+                        // Unix/Linux系统设置进程组优先级
+                        processBuilder.environment().put("NICE", "10");
+                    }
+                } catch (Exception e) {
+                    System.err.println("设置进程优先级失败: " + e.getMessage());
                 }
-            }
 
-            // 等待进程执行完成
-            int exitCode = process.waitFor();
-            if (exitCode == 0) {
-                String dateStr = path.substring(0, 8);
-                pathM3u8Name = pathM3u8Name.substring(pathM3u8Name.lastIndexOf(dateStr));
-                System.out.println("视频切片成功,m3u8路径: " + pathM3u8Name);
-                return super.formatFileUrl(config.getDomain(), pathM3u8Name);
-            } else {
-                System.err.println("FFmpeg执行失败,退出码: " + exitCode);
-                return null;
+                // 重定向错误流到输入流
+                processBuilder.redirectErrorStream(true);
+
+                // 启动进程
+                final Process process = processBuilder.start();
+
+                // Windows系统设置进程优先级
+                if (System.getProperty("os.name").toLowerCase().contains("win")) {
+                    try {
+                        new ProcessBuilder("wmic", "process", "where", "ProcessId=" + process.pid(), "CALL", "setpriority", "below normal").start();
+                    } catch (Exception e) {
+                        System.err.println("设置Windows进程优先级失败: " + e.getMessage());
+                    }
+                }
+
+                // 读取FFmpeg输出日志
+                StringBuilder output = new StringBuilder();
+                try (BufferedReader reader = new BufferedReader(
+                        new InputStreamReader(process.getInputStream()))) {
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        output.append(line).append("\n");
+                    }
+                }
+
+                // 设置超时时间,避免无限等待
+                boolean completed = process.waitFor(30, TimeUnit.MINUTES); // 30分钟超时
+                if (!completed) {
+                    process.destroyForcibly(); // 强制终止进程
+                    throw new TimeoutException("视频转码超时");
+                }
+
+                int exitCode = process.exitValue();
+                if (exitCode == 0) {
+                    String dateStr = finalPath.substring(0, 8);
+                    String finalPathM3u8Name = pathM3u8Name.substring(pathM3u8Name.lastIndexOf(dateStr));
+                    System.out.println("视频切片成功,m3u8路径: " + finalPathM3u8Name);
+                    return super.formatFileUrl(config.getDomain(), finalPathM3u8Name);
+                } else {
+                    System.err.println("FFmpeg执行失败,退出码: " + exitCode + ",输出: " + output.toString());
+                    return null;
+                }
+
+            } catch (Exception e) {
+                System.err.println("视频处理失败: " + e.getMessage());
+                throw new RuntimeException("视频处理失败,请检查视频格式是否支持:", e);
             }
+        });
 
-        }catch (Exception e) {
-            throw new RuntimeException("视频处理失败,请检查视频格式是否支持:", e);
+        // 获取结果
+        try {
+            return future.get(35, TimeUnit.MINUTES); // 设置获取结果的超时时间
+        } catch (Exception e) {
+            future.cancel(true); // 取消任务
+            throw new RuntimeException("视频转码处理超时或被取消", e);
         }
     }