liyanbo пре 2 недеља
родитељ
комит
ffb39a1da6

+ 42 - 29
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/chat/AiChatMessageServiceImpl.java

@@ -229,7 +229,6 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
 
 //        Flux<ChatResponse> streamResponse = chatModel.stream(prompt);
 
-        // 4.3 初始化TTS服务 - 创建新的实例而非使用共享实例
         // 4.3 初始化TTS服务 - 统一处理阿里云和豆包TTS
         StreamingAliyunTtsService streamingAliyunTtsService;
         ScheduledExecutorService scheduler;
@@ -254,6 +253,8 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
                 // 为豆包TTS创建音频流,用于发送同步合成的音频数据
                 // 重置豆包TTS的sink引用
                 this.douBaoSinkRef = new AtomicReference<>();
+                // 重置首次音频标志
+                this.isFirstDouBaoAudio.set(true);
                 
                 audioStream = Flux.create(sink2 -> {
                     this.douBaoSinkRef.set(sink2);
@@ -369,25 +370,42 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
                 boolean isDouBaoTts = finalAiTtsDO != null && "DouBao".equals(finalAiTtsDO.getPlatform());
                 
                 if (isDouBaoTts) {
-                    processRemainingText(finalAiTtsDO, contentTTSBuffer); // 处理豆包TTS剩余文本
+                    // 处理豆包TTS剩余文本
+                    processRemainingText(finalAiTtsDO, contentTTSBuffer);
+                    
+                    // 等待一段时间,确保所有TTS任务都已完成
+                    try {
+                        // 等待所有TTS任务完成
+                        while (douBaoTtsTaskCount.get() > 0) {
+                            Thread.sleep(100);
+                        }
+                        // 再等待一段时间,确保所有音频数据都已发送
+                        Thread.sleep(500);
+                        // 关闭豆包TTS音频流
+                        if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
+                            this.douBaoSinkRef.get().complete();
+                            log.info("豆包TTS音频流已关闭");
+                            // 停止豆包TTS服务
+                            streamingDouBaoTtsService.stopTts();
+                            log.info("豆包TTS服务已停止");
+                        }
+                    } catch (Exception e) {
+                        log.error("关闭豆包TTS音频流失败", e);
+                    }
                 } else {
                     processRemainingText(streamingAliyunTtsService, contentTTSBuffer); // 处理阿里云TTS剩余文本
+                    // 停止阿里云TTS服务
+                    try {
+                        streamingAliyunTtsService.stopTts();
+                    } catch (Exception e) {
+                        log.error("停止TTS服务失败", e);
+                    }
                 }
 
                 if (ttsTask.get() != null) {
                     ttsTask.get().cancel(false);
                 }
                 scheduler.shutdown(); // 关闭调度器
-
-                // 通知TTS服务文本发送完成
-                try {
-                    if (!isDouBaoTts) {
-                        streamingAliyunTtsService.stopTts();
-                    }
-                    // 豆包TTS服务在任务完成时统一停止
-                } catch (Exception e) {
-                    log.error("停止TTS服务失败", e);
-                }
             }
 
             // 忽略租户,因为 Flux 异步无法透传租户
@@ -516,7 +534,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         log.info("豆包TTS合成长文本: {}", sentence);
     }
 
-    // 处理剩余文本
+    // 处理剩余文本 - 阿里云TTS
     private void processRemainingText(StreamingAliyunTtsService streamingAliyunTtsService, StringBuffer buffer) {
         if (streamingAliyunTtsService == null || buffer == null || buffer.isEmpty()) {
             return;
@@ -538,6 +556,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
 
         // 增加任务计数
         douBaoTtsTaskCount.incrementAndGet();
+        log.info("豆包TTS任务计数增加,当前: {}", douBaoTtsTaskCount.get());
 
         // 在单独的线程中处理豆包TTS,避免阻塞主线程
         new Thread(() -> {
@@ -566,45 +585,39 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
                         audioResp.setEventType("AUDIO");
                         audioResp.setAudioData(base64Audio);
 
-                        log.info("豆包TTS合成成功");
+                        log.info("豆包TTS合成成功,准备发送音频数据");
 
                         // 将音频数据发送到前端
                         if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
                             try {
                                 this.douBaoSinkRef.get().next(success(audioResp));
+                                log.info("豆包TTS音频数据发送成功");
                             } catch (Exception e) {
                                 log.error("发送豆包TTS音频数据失败", e);
                             }
+                        } else {
+                            log.warn("豆包TTS sink引用为空,无法发送音频数据");
                         }
                     }
                 });
+                log.info("豆包TTS文本处理完成: {}", text);
             } catch (Exception e) {
                 log.error("豆包TTS合成失败", e);
             } finally {
-                // 减少任务计数,当所有任务完成时关闭音频流
-                if (douBaoTtsTaskCount.decrementAndGet() == 0) {
-                    log.info("所有豆包TTS任务已完成,关闭音频流");
-                    if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
-                        try {
-                            this.douBaoSinkRef.get().complete();
-                            // 停止豆包TTS服务
-                            streamingDouBaoTtsService.stopTts();
-                        } catch (Exception e) {
-                            log.error("关闭豆包TTS音频流失败", e);
-                        }
-                    }
-                }
+                // 减少任务计数,但不关闭音频流
+                int remainingTasks = douBaoTtsTaskCount.decrementAndGet();
+                log.info("豆包TTS任务计数减少,剩余: {}", remainingTasks);
             }
         }).start();
     }
 
-    // 处理豆包TTS的剩余文本
+    // 处理剩余文本 - 豆包TTS
     private void processRemainingText(AiTtsDO aiTtsDO, StringBuffer buffer) {
         if (buffer == null || buffer.isEmpty()) {
             return;
         }
 
-        log.info("TTS合成剩余文本: {}", buffer);
+        log.info("豆包TTS合成剩余文本: {}", buffer);
         processDouBaoTts(aiTtsDO, buffer.toString());
         buffer.setLength(0);
     }

+ 19 - 4
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/tts/DouBaoTtsService.java

@@ -17,6 +17,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 @Service
@@ -34,8 +35,15 @@ public class DouBaoTtsService {
         // 构建请求
         Request request = buildTtsRequest(aiTtsDO, content, command, OutputFormatEnum.MP3.getName());
         
+        // 创建带有超时设置的OkHttpClient
+        OkHttpClient client = new OkHttpClient.Builder()
+                .connectTimeout(30, TimeUnit.SECONDS)  // 连接超时
+                .readTimeout(60, TimeUnit.SECONDS)     // 读取超时
+                .writeTimeout(30, TimeUnit.SECONDS)    // 写入超时
+                .build();
+        
         // 发送请求并处理响应
-        try (Response response = new OkHttpClient().newCall(request).execute()) {
+        try (Response response = client.newCall(request).execute()) {
             if (!response.isSuccessful()) {
                 handleErrorResponse(response);
             }
@@ -95,10 +103,17 @@ public class DouBaoTtsService {
      */
     public void streamTextToSpeech(AiTtsDO aiTtsDO, String content, String command, Consumer<byte[]> audioDataCallback) throws IOException {
         // 构建请求
-        Request request = buildTtsRequest(aiTtsDO, content, command, OutputFormatEnum.PCM.getName());
+        Request request = buildTtsRequest(aiTtsDO, content, command, "pcm");
+        
+        // 创建带有超时设置的OkHttpClient
+        OkHttpClient client = new OkHttpClient.Builder()
+                .connectTimeout(30, TimeUnit.SECONDS)  // 连接超时
+                .readTimeout(60, TimeUnit.SECONDS)     // 读取超时
+                .writeTimeout(30, TimeUnit.SECONDS)    // 写入超时
+                .build();
         
         // 发送请求并处理响应
-        try (Response response = new OkHttpClient().newCall(request).execute()) {
+        try (Response response = client.newCall(request).execute()) {
             if (!response.isSuccessful()) {
                 handleErrorResponse(response);
             }
@@ -129,7 +144,7 @@ public class DouBaoTtsService {
                                 if (!chunk.isEmpty() && audioDataCallback != null) {
                                     // 实时解码并返回音频数据
                                     byte[] audioData = java.util.Base64.getDecoder().decode(chunk);
-                                    logger.debug("豆包TTS流式返回音频数据,长度: {} bytes", audioData.length);
+                                    logger.info("豆包TTS流式返回音频数据,长度: {} bytes", audioData.length);
                                     audioDataCallback.accept(audioData);
                                 }
                             }

+ 35 - 16
byzs-web/src/main/java/cn/iocoder/byzs/module/web/service/ai/WebAiServiceImpl.java

@@ -618,12 +618,36 @@ public class WebAiServiceImpl {
     /**
      * 处理豆包TTS文本完成后的逻辑
      */
-    private void handleDouBaoTextComplete(AiTtsDO aiTtsDO, ScheduledExecutorService scheduler,
-                                         AtomicBoolean ttsStopped, AtomicBoolean allTextProcessed,
+    private void handleDouBaoTextComplete(AiTtsDO aiTtsDO, ScheduledExecutorService scheduler, 
+                                         AtomicBoolean ttsStopped, AtomicBoolean allTextProcessed, 
                                          FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
         allTextProcessed.set(true);
         log.info("所有文本处理完毕,准备通知TTS服务文本已发送完毕");
 
+        // 等待所有TTS任务完成,然后关闭音频流
+        new Thread(() -> {
+            try {
+                // 等待所有TTS任务完成
+                while (douBaoTtsTaskCount.get() > 0) {
+                    Thread.sleep(100);
+                }
+                // 再等待一段时间,确保所有音频数据都已发送
+                Thread.sleep(500);
+                // 关闭豆包TTS音频流
+                if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
+                    this.douBaoSinkRef.get().complete();
+                    log.info("豆包TTS音频流已关闭");
+                    // 停止豆包TTS服务
+                    streamingDouBaoTtsService.stopTts();
+                    log.info("豆包TTS服务已停止");
+                    // 设置标志位
+                    ttsStopped.set(true);
+                }
+            } catch (Exception e) {
+                log.error("关闭豆包TTS音频流失败", e);
+            }
+        }).start();
+
         // 添加额外的超时检测,作为最后的保障
         scheduler.schedule(() -> {
             if (allTextProcessed.get() && !ttsStopped.get()) {
@@ -683,6 +707,7 @@ public class WebAiServiceImpl {
 
         // 增加任务计数
         douBaoTtsTaskCount.incrementAndGet();
+        log.info("豆包TTS任务计数增加,当前: {}", douBaoTtsTaskCount.get());
 
         // 在单独的线程中处理豆包TTS,避免阻塞主线程
         new Thread(() -> {
@@ -711,34 +736,28 @@ public class WebAiServiceImpl {
                         audioResp.setEventType("AUDIO");
                         audioResp.setAudioData(base64Audio);
 
-                        log.info("豆包TTS合成成功");
+                        log.info("豆包TTS合成成功,准备发送音频数据");
 
                         // 将音频数据发送到前端
                         if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
                             try {
                                 this.douBaoSinkRef.get().next(success(audioResp));
+                                log.info("豆包TTS音频数据发送成功");
                             } catch (Exception e) {
                                 log.error("发送豆包TTS音频数据失败", e);
                             }
+                        } else {
+                            log.warn("豆包TTS sink引用为空,无法发送音频数据");
                         }
                     }
                 });
+                log.info("豆包TTS文本处理完成: {}", text);
             } catch (Exception e) {
                 log.error("豆包TTS合成失败", e);
             } finally {
-                // 减少任务计数,当所有任务完成时关闭音频流
-                if (douBaoTtsTaskCount.decrementAndGet() == 0) {
-                    log.info("所有豆包TTS任务已完成,关闭音频流");
-                    if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
-                        try {
-                            this.douBaoSinkRef.get().complete();
-                            // 停止豆包TTS服务
-                            streamingDouBaoTtsService.stopTts();
-                        } catch (Exception e) {
-                            log.error("关闭豆包TTS音频流失败", e);
-                        }
-                    }
-                }
+                // 减少任务计数,但不关闭音频流
+                int remainingTasks = douBaoTtsTaskCount.decrementAndGet();
+                log.info("豆包TTS任务计数减少,剩余: {}", remainingTasks);
             }
         }).start();
     }