Selaa lähdekoodia

1、TTS服务使用对话流传输合并语音返回
2、处理结束流
3、分离入库信息和返回内容

liyanbo 8 kuukautta sitten
vanhempi
sitoutus
a68d4fd2e4

+ 10 - 0
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/controller/admin/tts/StreamTtsService.java

@@ -45,6 +45,8 @@ public class StreamTtsService {
     private StreamInputTts synthesizer;
     // ==== 添加音频数据回调 ====
     private Consumer<byte[]> audioDataCallback;
+    // 添加完成回调接口
+    private Runnable onCompleteCallback;
 
     @PostConstruct
     public void init() {
@@ -168,6 +170,10 @@ public class StreamTtsService {
             public void onSynthesisComplete(StreamInputTtsResponse response) {
                 log.info("TTS合成完成: name={}, status={}", response.getName(), response.getStatus());
                 audioPlayer.stop();
+                // 调用完成回调
+                if (onCompleteCallback != null) {
+                    onCompleteCallback.run();
+                }
             }
 
             @Override
@@ -286,4 +292,8 @@ public class StreamTtsService {
         }
         stopTts();
     }
+
+    public void setOnCompleteCallback(Runnable callback) {
+        this.onCompleteCallback = callback;
+    }
 }

+ 41 - 22
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/chat/AiChatMessageServiceImpl.java

@@ -194,8 +194,9 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
 
         // 4.4 流式返回并处理TTS
         StringBuffer contentBuffer = new StringBuffer();
+        StringBuffer contentTTSBuffer = new StringBuffer();
         // 添加句子结束符正则表达式
-        Pattern sentencePattern = Pattern.compile("[。!?;.\n\r]"); // 增加换行符支持
+        Pattern sentencePattern = Pattern.compile("[。!?;\n\r]"); // 增加换行符支持
 
         ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
         AtomicReference<ScheduledFuture<?>> ttsTask = new AtomicReference<>();
@@ -212,38 +213,40 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
                     segment.setDocumentName(document != null ? document.getName() : null);
                 });
             }
+
             // 响应结果
             String newContent = chunk.getResult() != null ? chunk.getResult().getOutput().getText() : null;
             newContent = StrUtil.nullToDefault(newContent, ""); // 避免 null 的情况
 
             contentBuffer.append(newContent);
+            contentTTSBuffer.append(newContent);
+            System.out.println("==============newContent: " + newContent);
 
             // 发送新内容到TTS服务进行语音合成
             if (ttsTask.get() != null) {
                 ttsTask.get().cancel(false); // 取消之前的延迟任务
             }
             // 延迟500ms执行,合并短时间内到达的文本片段
-            ttsTask.set(scheduler.schedule(() -> {
-                Matcher matcher = sentencePattern.matcher(contentBuffer);
+//            ttsTask.set(scheduler.schedule(() -> {
+                Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
                 if (matcher.find()) {
-                    processCompleteSentence(contentBuffer, matcher);
-                } else if (contentBuffer.length() > 20) { // 最长20字未结束也处理
-                    processCompleteSentence(contentBuffer, contentBuffer.length());
+                    processCompleteSentence(contentTTSBuffer, matcher);
+                } else if (contentTTSBuffer.length() > 20) { // 最长20字未结束也处理
+                    processCompleteSentence(contentTTSBuffer, contentTTSBuffer.length());
                 }
-            }, 500, TimeUnit.MILLISECONDS));
+//            }, 500, TimeUnit.MILLISECONDS));
 
             CommonResult<AiChatMessageSendRespVO> result = success(new AiChatMessageSendRespVO()
                     .setEventType("TEXT")
                     .setSend(BeanUtils.toBean(userMessage, AiChatMessageSendRespVO.Message.class))
                     .setReceive(BeanUtils.toBean(assistantMessage, AiChatMessageSendRespVO.Message.class)
-                            .setContent(newContent).setSegments(segments)));
+                    .setContent(newContent).setSegments(segments)));
             sharedResponse.set(result);
             return result;
         }).doOnComplete(() -> {
-            if (contentBuffer.length() > 0) {
-                streamTtsService.sendText(contentBuffer.toString());
-                contentBuffer.setLength(0);
-            }
+
+            processRemainingText(contentTTSBuffer); // 处理剩余文本
+
             // 忽略租户,因为 Flux 异步无法透传租户
             TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
                     new AiChatMessageDO().setId(assistantMessage.getId()).setContent(contentBuffer.toString())));
@@ -252,11 +255,12 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
             if (ttsTask.get() != null) {
                 ttsTask.get().cancel(false);
             }
-            processRemainingText(contentBuffer); // 处理剩余文本
             scheduler.shutdown(); // 关闭调度器
 
             // 通知TTS服务文本发送完成
             streamTtsService.stopTts();
+            // ==== 添加回调清理 ====
+            streamTtsService.setAudioDataCallback(null);
             sink.complete(); // 完成流
         }).doOnError(throwable -> {
             log.error("[sendChatMessageStream][userId({}) sendReqVO({}) 发生异常]", userId, sendReqVO, throwable);
@@ -280,45 +284,60 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
             streamTtsService.setAudioDataCallback(audioBytes -> {
                 try {
                     // 确保TTS输出WAV格式(带文件头)
-                    byte[] wavAudioWithHeader = addWavHeader(audioBytes, 24000, 16, 1); // 修改为24kHz(匹配StreamTtsService设置)
+                    byte[] wavAudioWithHeader = addWavHeader(audioBytes, 24000, 16, 1);
                     String base64Audio = Base64.getEncoder().encodeToString(wavAudioWithHeader);
                     AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
                     audioResp.setEventType("AUDIO");
                     audioResp.setAudioData(base64Audio);
                     sink2.next(success(audioResp));
                 } catch (Exception e) {
-                    log.error("[------------][userId({}) sendReqVO({}) <UNK> 发生异常]", userId, sendReqVO, e);
-                    sink2.error(e);
+                    log.error("[TTS处理异常][userId({}) sendReqVO({})] 音频编码失败", userId, sendReqVO, e); // 更详细的日志
+                    sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
                 }
             });
+            // 设置TTS完成回调,终止音频流
+            streamTtsService.setOnCompleteCallback(() -> {
+                sink2.complete();
+                log.info("音频流已完成");
+            });
         });
 
-        // 合并文本流和音频流,使用mergeWith而非mergeSequential
-        return textStream.mergeWith(audioStream);
+        // 使用merge而非mergeSequential,确保任一流完成不阻塞其他流
+        return Flux.merge(textStream, audioStream)
+                .doFinally(signalType -> {
+                    // 双重保险:无论哪个流先完成,最终都清理资源
+                    streamTtsService.setAudioDataCallback(null);
+                    streamTtsService.setOnCompleteCallback(null);
+                });
     }
 
     // 处理完整句子
     private void processCompleteSentence(StringBuffer buffer, Matcher matcher) {
+        System.out.println("==============[处理完整句子[[buffer: " + buffer.toString());
         String sentence = buffer.substring(0, matcher.end());
         streamTtsService.sendText(sentence);
         buffer.delete(0, matcher.end());
-        System.out.println("TTS合成完整句: " + sentence);
+        log.info("TTS合成完整句: {}", sentence); // 替换System.out为日志
+        System.out.println("==============[处理完整句子[[buffer: " + buffer);
     }
 
     // 处理指定长度文本
     private void processCompleteSentence(StringBuffer buffer, int length) {
+        System.out.println("==============[处理指定长度文本[[buffer: " + buffer.toString());
         String sentence = buffer.substring(0, length);
         streamTtsService.sendText(sentence);
         buffer.delete(0, length);
-        System.out.println("TTS合成长文本: " + sentence);
+        log.info("TTS合成长文本: {}", sentence); // 替换System.out为日志
+        System.out.println("==============[处理指定长度文本[[buffer: " + buffer.toString());
     }
 
     // 处理剩余文本
     private void processRemainingText(StringBuffer buffer) {
-        if (buffer.length() > 0) {
+        if (!buffer.isEmpty()) {
+            System.out.println("TTS合成剩余文本: " + buffer);
             streamTtsService.sendText(buffer.toString());
             buffer.setLength(0);
-            System.out.println("TTS合成剩余文本: " + buffer.toString());
+            System.out.println("TTS合成剩余文本: " + buffer);
         }
     }