Sfoglia il codice sorgente

1、智能问答添加豆包tts(流请求,跟任务执行状态、单独线程,加入音频WAV头)
2、加入豆包tts流式处理逻辑
3、增加单独线程处理豆包tts
4、优化豆包tts类逻辑

liyanbo 2 settimane fa
parent
commit
9666c46ae7

+ 242 - 49
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/chat/AiChatMessageServiceImpl.java

@@ -29,8 +29,10 @@ import cn.iocoder.byzs.module.ai.service.knowledge.bo.AiKnowledgeSegmentSearchRe
 import cn.iocoder.byzs.module.ai.service.model.AiChatRoleService;
 import cn.iocoder.byzs.module.ai.service.model.AiModelService;
 import cn.iocoder.byzs.module.ai.service.model.AiToolService;
+import cn.iocoder.byzs.module.ai.service.tts.DouBaoTtsService;
 import cn.iocoder.byzs.module.ai.util.AiUtils;
 import cn.iocoder.byzs.module.ai.util.tts.StreamTtsService;
+import cn.iocoder.byzs.module.ai.util.tts.StreamingDouBaoTtsService;
 import cn.iocoder.byzs.module.ai.util.tts.WavHeader;
 import com.alibaba.nls.client.protocol.SampleRateEnum;
 import jakarta.annotation.Resource;
@@ -49,6 +51,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.reactive.function.client.WebClientRequestException;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 import reactor.util.retry.Retry;
 
 import java.time.Duration;
@@ -59,6 +62,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -109,9 +113,20 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
     @Resource
     private ObjectProvider<StreamTtsService> streamTtsServiceProvider;  // 使用ObjectProvider获取原型bean
 
+    @Resource
+    private DouBaoTtsService douBaoTtsService;
+
+    @Resource
+    private StreamingDouBaoTtsService streamingDouBaoTtsService;
+
     @Resource
     private AiTtsMapper ttsMapper;
 
+    // 豆包TTS的sink引用,用于发送音频数据
+    private AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef;
+    
+    // 标记是否是首次发送豆包TTS音频数据
+    private final AtomicBoolean isFirstDouBaoAudio = new AtomicBoolean(true);
 
     @Transactional(rollbackFor = Exception.class)
     public AiChatMessageSendRespVO sendMessage(AiChatMessageSendReqVO sendReqVO, Long userId) {
@@ -186,6 +201,9 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
 
         // 添加useTts标志,判断是否使用TTS服务
         boolean useTts = aiTtsDO != null;
+        // 创建final副本供lambda表达式使用
+        final boolean finalUseTts = useTts;
+        final AiTtsDO finalAiTtsDO = aiTtsDO;
 
         // 2. 知识库找回
         List<AiKnowledgeSegmentSearchRespBO> knowledgeSegments = recallKnowledgeSegment(sendReqVO.getContent(),
@@ -212,6 +230,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
 //        Flux<ChatResponse> streamResponse = chatModel.stream(prompt);
 
         // 4.3 初始化TTS服务 - 创建新的实例而非使用共享实例
+        // 4.3 初始化TTS服务 - 统一处理阿里云和豆包TTS
         StreamTtsService streamTtsService;
         ScheduledExecutorService scheduler;
         AtomicReference<ScheduledFuture<?>> ttsTask;
@@ -219,49 +238,70 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         Pattern sentencePattern;
         Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.empty();
 
-        if (useTts) {
-            // 只有当需要使用TTS服务时才创建实例
-            streamTtsService = streamTtsServiceProvider.getObject();
+        if (finalUseTts) {
+            // 判断是否为豆包TTS
+            boolean isDouBaoTts = finalAiTtsDO != null && "DouBao".equals(finalAiTtsDO.getPlatform());
+            
             contentTTSBuffer = new StringBuffer();
             sentencePattern = Pattern.compile("[。!?;\n\r]"); // 增加换行符支持
             scheduler = Executors.newSingleThreadScheduledExecutor();
             ttsTask = new AtomicReference<>();
-
-            // 先创建音频流并设置回调,再启动TTS服务
-            audioStream = Flux.create(sink2 -> {
-                AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
-                streamTtsService.setAudioDataCallback(audioBytes -> {
-                    try {
-                        byte[] processedAudio;
-                        if (isFirstChunk.getAndSet(false)) {
-                            // 仅首包添加WAV头
-                            processedAudio = WavHeader.addWavHeader(audioBytes, SampleRateEnum.SAMPLE_RATE_16K.value, 16, 1);
-                            log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
-                        } else {
-                            // 后续包直接使用原始PCM数据
-                            processedAudio = audioBytes;
+            
+            if (isDouBaoTts) {
+                streamTtsService = null;
+                // 豆包TTS服务 - 同步合成,模拟流式处理
+                log.info("使用豆包TTS服务");
+                // 为豆包TTS创建音频流,用于发送同步合成的音频数据
+                // 重置豆包TTS的sink引用
+                this.douBaoSinkRef = new AtomicReference<>();
+                
+                audioStream = Flux.create(sink2 -> {
+                    this.douBaoSinkRef.set(sink2);
+                });
+                
+                // 启动豆包TTS服务
+                streamingDouBaoTtsService.startTts(finalAiTtsDO);
+            } else {
+                // 阿里云TTS服务 - 流式合成
+                log.info("使用阿里云TTS服务");
+                streamTtsService = streamTtsServiceProvider.getObject();
+
+                // 先创建音频流并设置回调,再启动TTS服务
+                audioStream = Flux.create(sink2 -> {
+                    AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
+                    streamTtsService.setAudioDataCallback(audioBytes -> {
+                        try {
+                            byte[] processedAudio;
+                            if (isFirstChunk.getAndSet(false)) {
+                                // 仅首包添加WAV头
+                                processedAudio = WavHeader.addWavHeader(audioBytes, SampleRateEnum.SAMPLE_RATE_16K.value, 16, 1);
+                                log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
+                            } else {
+                                // 后续包直接使用原始PCM数据
+                                processedAudio = audioBytes;
+                            }
+                            String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
+                            AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
+                            audioResp.setEventType("AUDIO");
+                            audioResp.setAudioData(base64Audio);
+                            sink2.next(success(audioResp));
+                        } catch (Exception e) {
+                            log.error("[TTS处理异常] 音频编码失败", e);
+                            sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
                         }
-                        String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
-                        AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
-                        audioResp.setEventType("AUDIO");
-                        audioResp.setAudioData(base64Audio);
-                        sink2.next(success(audioResp));
-                    } catch (Exception e) {
-                        log.error("[TTS处理异常] 音频编码失败", e);
-                        sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
-                    }
+                    });
+                    streamTtsService.setOnCompleteCallback(sink2::complete);
                 });
-                streamTtsService.setOnCompleteCallback(sink2::complete);
-            });
 
-            // 启动TTS服务
-            streamTtsService.startTts(aiTtsDO);
+                // 启动阿里云TTS服务
+                streamTtsService.startTts(finalAiTtsDO);
+            }
         } else {
             streamTtsService = null;
             sentencePattern = null;
             scheduler = null;
-            ttsTask = null;
             contentTTSBuffer = null;
+            ttsTask = null;
         }
 
         // 4.4 流式返回并处理TTS
@@ -287,7 +327,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
             contentBuffer.append(newContent);
 
             // 只有当需要使用TTS服务时才处理TTS相关逻辑
-            if (useTts) {
+            if (finalUseTts) {
                 contentTTSBuffer.append(newContent);
                 log.debug("TTS新内容: {}", newContent);
 
@@ -295,13 +335,23 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
                 if (ttsTask.get() != null) {
                     ttsTask.get().cancel(false); // 取消之前的延迟任务
                 }
-                // 延迟500ms执行,合并短时间内到达的文本片段
+                // 延迟200ms执行,合并短时间内到达的文本片段
                 ttsTask.set(scheduler.schedule(() -> {
                     Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
+                    boolean isDouBaoTts = finalAiTtsDO != null && "DouBao".equals(finalAiTtsDO.getPlatform());
+                    
                     if (matcher.find()) {
-                        processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
+                        if (isDouBaoTts) {
+                            processCompleteSentence(finalAiTtsDO, contentTTSBuffer, matcher);
+                        } else {
+                            processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
+                        }
                     } else if (contentTTSBuffer.length() > 50) { // 最长50字未结束也处理
-                        processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
+                        if (isDouBaoTts) {
+                            processCompleteSentence(finalAiTtsDO, contentTTSBuffer, contentTTSBuffer.length());
+                        } else {
+                            processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
+                        }
                     }
                 }, 500, TimeUnit.MILLISECONDS));
             }
@@ -315,8 +365,14 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         }).doOnComplete(() -> {
 
             // 只有当需要使用TTS服务时才处理TTS相关逻辑
-            if (useTts) {
-                processRemainingText(streamTtsService, contentTTSBuffer); // 处理剩余文本
+            if (finalUseTts) {
+                boolean isDouBaoTts = finalAiTtsDO != null && "DouBao".equals(finalAiTtsDO.getPlatform());
+                
+                if (isDouBaoTts) {
+                    processRemainingText(finalAiTtsDO, contentTTSBuffer); // 处理豆包TTS剩余文本
+                } else {
+                    processRemainingText(streamTtsService, contentTTSBuffer); // 处理阿里云TTS剩余文本
+                }
 
                 if (ttsTask.get() != null) {
                     ttsTask.get().cancel(false);
@@ -325,7 +381,10 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
 
                 // 通知TTS服务文本发送完成
                 try {
-                    streamTtsService.stopTts();
+                    if (!isDouBaoTts) {
+                        streamTtsService.stopTts();
+                    }
+                    // 豆包TTS服务在任务完成时统一停止
                 } catch (Exception e) {
                     log.error("停止TTS服务失败", e);
                 }
@@ -340,9 +399,17 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
             TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
                     new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage())));
             // 发生错误时停止TTS服务
-            if (useTts && streamTtsService != null) {
+            if (finalUseTts) {
+                boolean isDouBaoTts = finalAiTtsDO != null && "DouBao".equals(finalAiTtsDO.getPlatform());
+                
                 try {
-                    streamTtsService.stopTts();
+                    if (isDouBaoTts) {
+                        // 豆包TTS服务在任务完成时统一停止
+                    } else {
+                        if (streamTtsService != null) {
+                            streamTtsService.stopTts();
+                        }
+                    }
                 } catch (Exception e) {
                     log.error("停止TTS服务失败", e);
                 }
@@ -351,15 +418,25 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         // ==== 添加finally块清理 ====
         .doFinally(signalType -> {
             // 通知TTS服务文本发送完成
-            if (useTts && streamTtsService != null) {
+            if (finalUseTts) {
+                boolean isDouBaoTts = finalAiTtsDO != null && "DouBao".equals(finalAiTtsDO.getPlatform());
+                
                 try {
-                    streamTtsService.stopTts();
+                    if (isDouBaoTts) {
+                        // 豆包TTS服务在任务完成时统一停止
+                        // 重置豆包TTS的首次音频标记
+                        isFirstDouBaoAudio.set(true);
+                    } else {
+                        if (streamTtsService != null) {
+                            streamTtsService.stopTts();
+                        }
+                    }
                 } catch (Exception e) {
                     log.error("停止TTS服务失败", e);
                 }
             }
             // 确保调度器被关闭
-            if (useTts && scheduler != null && !scheduler.isShutdown()) {
+            if (finalUseTts && scheduler != null && !scheduler.isShutdown()) {
                 scheduler.shutdownNow();
             }
         }).onErrorResume(error -> Flux.just(error(ErrorCodeConstants.CHAT_STREAM_ERROR)));
@@ -368,18 +445,26 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         return Flux.merge(textStream, audioStream)
                 .doFinally(signalType -> {
                     // 双重保险:无论哪个流先完成,最终都清理资源
-                    if (useTts && streamTtsService != null) {
-                        streamTtsService.setAudioDataCallback(null);
-                        streamTtsService.setOnCompleteCallback(null);
+                    if (finalUseTts) {
+                        boolean isDouBaoTts = finalAiTtsDO != null && "DouBao".equals(finalAiTtsDO.getPlatform());
+                        
+                        if (isDouBaoTts) {
+                            // 豆包TTS音频流由任务计数器管理,在所有任务完成后自动关闭
+                            // 重置豆包TTS的首次音频标记
+                            isFirstDouBaoAudio.set(true);
+                        } else if (streamTtsService != null) {
+                            streamTtsService.setAudioDataCallback(null);
+                            streamTtsService.setOnCompleteCallback(null);
+                        }
                     }
                     // 确保调度器被关闭
-                    if (useTts && scheduler != null && !scheduler.isShutdown()) {
+                    if (finalUseTts && scheduler != null && !scheduler.isShutdown()) {
                         scheduler.shutdownNow();
                     }
                 });
     }
 
-    // 处理完整句子
+    // 处理完整句子 - 阿里云TTS
     private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, Matcher matcher) {
         if (streamTtsService == null || buffer == null || matcher == null) {
             return;
@@ -392,7 +477,20 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         log.info("TTS合成完整句: {}", sentence);
     }
 
-    // 处理指定长度文本
+    // 处理完整句子 - 豆包TTS
+    private void processCompleteSentence(AiTtsDO aiTtsDO, StringBuffer buffer, Matcher matcher) {
+        if (buffer == null || matcher == null) {
+            return;
+        }
+
+        String sentence = buffer.substring(0, matcher.end());
+        log.debug("[处理完整句子][buffer: {}", sentence);
+        processDouBaoTts(aiTtsDO, sentence);
+        buffer.delete(0, matcher.end());
+        log.info("豆包TTS合成完整句: {}", sentence);
+    }
+
+    // 处理指定长度文本 - 阿里云TTS
     private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, int length) {
         if (streamTtsService == null || buffer == null || length <= 0) {
             return;
@@ -405,6 +503,19 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         log.info("TTS合成长文本: {}", sentence);
     }
 
+    // 处理指定长度文本 - 豆包TTS
+    private void processCompleteSentence(AiTtsDO aiTtsDO, StringBuffer buffer, int length) {
+        if (buffer == null || length <= 0) {
+            return;
+        }
+
+        String sentence = buffer.substring(0, length);
+        log.debug("[处理指定长度文本][buffer: {}", sentence);
+        processDouBaoTts(aiTtsDO, sentence);
+        buffer.delete(0, length);
+        log.info("豆包TTS合成长文本: {}", sentence);
+    }
+
     // 处理剩余文本
     private void processRemainingText(StreamTtsService streamTtsService, StringBuffer buffer) {
         if (streamTtsService == null || buffer == null || buffer.isEmpty()) {
@@ -416,6 +527,88 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
         buffer.setLength(0);
     }
 
+    // 豆包TTS任务计数器
+    private final AtomicInteger douBaoTtsTaskCount = new AtomicInteger(0);
+
+    // 处理豆包TTS合成
+    private void processDouBaoTts(AiTtsDO aiTtsDO, String text) {
+        if (text == null || text.trim().isEmpty()) {
+            return;
+        }
+
+        // 增加任务计数
+        douBaoTtsTaskCount.incrementAndGet();
+
+        // 在单独的线程中处理豆包TTS,避免阻塞主线程
+        new Thread(() -> {
+            try {
+                // 使用豆包TTS流式服务进行处理
+                streamingDouBaoTtsService.sendText(aiTtsDO, text, audioBytes -> {
+                    // 处理音频数据
+                    if (audioBytes != null && audioBytes.length > 0) {
+                        // 豆包TTS现在返回的是PCM格式,需要添加WAV头
+                        byte[] processedAudio;
+                        if (isFirstDouBaoAudio.compareAndSet(true, false)) {
+                            // 首次音频数据:添加WAV头以符合前端期望
+                            processedAudio = WavHeader.addWavHeader(audioBytes, SampleRateEnum.SAMPLE_RATE_16K.value, 16, 1);
+                            log.info("豆包TTS首次音频合成成功,添加WAV头,原始长度: {} bytes,处理后长度: {} bytes", 
+                                    audioBytes.length, processedAudio.length);
+                        } else {
+                            // 后续音频数据:直接使用原始PCM数据
+                            processedAudio = audioBytes;
+                            log.info("豆包TTS后续音频合成成功,长度: {} bytes", processedAudio.length);
+                        }
+
+                        String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
+
+                        // 创建音频响应对象
+                        AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
+                        audioResp.setEventType("AUDIO");
+                        audioResp.setAudioData(base64Audio);
+
+                        log.info("豆包TTS合成成功");
+
+                        // 将音频数据发送到前端
+                        if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
+                            try {
+                                this.douBaoSinkRef.get().next(success(audioResp));
+                            } catch (Exception e) {
+                                log.error("发送豆包TTS音频数据失败", e);
+                            }
+                        }
+                    }
+                });
+            } catch (Exception e) {
+                log.error("豆包TTS合成失败", e);
+            } finally {
+                // 减少任务计数,当所有任务完成时关闭音频流
+                if (douBaoTtsTaskCount.decrementAndGet() == 0) {
+                    log.info("所有豆包TTS任务已完成,关闭音频流");
+                    if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
+                        try {
+                            this.douBaoSinkRef.get().complete();
+                            // 停止豆包TTS服务
+                            streamingDouBaoTtsService.stopTts();
+                        } catch (Exception e) {
+                            log.error("关闭豆包TTS音频流失败", e);
+                        }
+                    }
+                }
+            }
+        }).start();
+    }
+
+    // 处理豆包TTS的剩余文本
+    private void processRemainingText(AiTtsDO aiTtsDO, StringBuffer buffer) {
+        if (buffer == null || buffer.isEmpty()) {
+            return;
+        }
+
+        log.info("TTS合成剩余文本: {}", buffer);
+        processDouBaoTts(aiTtsDO, buffer.toString());
+        buffer.setLength(0);
+    }
+
     private List<AiKnowledgeSegmentSearchRespBO> recallKnowledgeSegment(String content,
                                                                         AiChatConversationDO conversation) {
         // 1. 查询聊天角色

+ 147 - 71
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/tts/DouBaoTtsService.java

@@ -2,6 +2,8 @@ package cn.iocoder.byzs.module.ai.service.tts;
 
 import cn.iocoder.byzs.module.ai.dal.dataobject.tts.AiTtsDO;
 import cn.iocoder.byzs.module.ai.framework.ai.config.YudaoAiProperties;
+import com.alibaba.nls.client.protocol.OutputFormatEnum;
+import com.alibaba.nls.client.protocol.SampleRateEnum;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import okhttp3.*;
 import org.slf4j.Logger;
@@ -9,13 +11,13 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Consumer;
 
 @Service
 public class DouBaoTtsService {
@@ -25,7 +27,126 @@ public class DouBaoTtsService {
     @Resource
     private YudaoAiProperties yudaoAiProperties;
 
+    /**
+     * 语音合成,返回完整音频数据
+     */
     public byte[] convertTextToSpeech(AiTtsDO aiTtsDO, String content, String command) throws IOException {
+        // 构建请求
+        Request request = buildTtsRequest(aiTtsDO, content, command, OutputFormatEnum.MP3);
+        
+        // 发送请求并处理响应
+        try (Response response = new OkHttpClient().newCall(request).execute()) {
+            if (!response.isSuccessful()) {
+                handleErrorResponse(response);
+            }
+
+            // 读取响应体并逐行解析JSON,处理SSE流式响应
+            if (response.body() != null) {
+                try (InputStream inputStream = response.body().byteStream();
+                     java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(inputStream))) {
+                    String line;
+                    StringBuilder base64AudioBuilder = new StringBuilder();
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    boolean hasAudioData = false;
+
+                    while ((line = reader.readLine()) != null) {
+                        if (line.trim().isEmpty()) {
+                            continue;
+                        }
+
+                        try {
+                            // 解析单行JSON
+                            Map<String, Object> responseMap = objectMapper.readValue(line, Map.class);
+
+                            // 检查响应状态
+                            checkResponseStatus(responseMap);
+
+                            // 提取音频数据
+                            Object data = responseMap.get("data");
+                            if (data instanceof String) {
+                                String chunk = data.toString();
+                                if (!chunk.isEmpty()) {
+                                    base64AudioBuilder.append(chunk);
+                                    hasAudioData = true;
+                                }
+                            }
+                        } catch (Exception e) {
+                            logger.warn("解析响应行失败: {}", e.getMessage());
+                        }
+                    }
+
+                    if (hasAudioData && !base64AudioBuilder.isEmpty()) {
+                        String base64Audio = base64AudioBuilder.toString();
+                        // 解码base64音频数据
+                        return java.util.Base64.getDecoder().decode(base64Audio);
+                    } else {
+                        // 没有音频数据
+                        logger.warn("豆包TTS响应没有音频数据");
+                        throw new IOException("豆包TTS响应没有音频数据");
+                    }
+                }
+            }
+        }
+        return new byte[0];
+    }
+
+    /**
+     * 流式语音合成,实时返回音频数据
+     */
+    public void streamTextToSpeech(AiTtsDO aiTtsDO, String content, String command, Consumer<byte[]> audioDataCallback) throws IOException {
+        // 构建请求
+        Request request = buildTtsRequest(aiTtsDO, content, command, OutputFormatEnum.PCM);
+        
+        // 发送请求并处理响应
+        try (Response response = new OkHttpClient().newCall(request).execute()) {
+            if (!response.isSuccessful()) {
+                handleErrorResponse(response);
+            }
+
+            // 读取响应体并逐行解析JSON,处理SSE流式响应
+            if (response.body() != null) {
+                try (InputStream inputStream = response.body().byteStream();
+                     java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(inputStream))) {
+                    String line;
+                    ObjectMapper objectMapper = new ObjectMapper();
+
+                    while ((line = reader.readLine()) != null) {
+                        if (line.trim().isEmpty()) {
+                            continue;
+                        }
+
+                        try {
+                            // 解析单行JSON
+                            Map<String, Object> responseMap = objectMapper.readValue(line, Map.class);
+
+                            // 检查响应状态
+                            checkResponseStatus(responseMap);
+
+                            // 提取音频数据
+                            Object data = responseMap.get("data");
+                            if (data instanceof String) {
+                                String chunk = data.toString();
+                                if (!chunk.isEmpty() && audioDataCallback != null) {
+                                    // 实时解码并返回音频数据
+                                    byte[] audioData = java.util.Base64.getDecoder().decode(chunk);
+                                    logger.debug("豆包TTS流式返回音频数据,长度: {} bytes", audioData.length);
+                                    audioDataCallback.accept(audioData);
+                                }
+                            }
+                        } catch (Exception e) {
+                            logger.warn("解析响应行失败: {}", e.getMessage());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * 构建TTS请求
+     */
+    private Request buildTtsRequest(AiTtsDO aiTtsDO, String content, String command, OutputFormatEnum format) throws IOException {
+        // 获取配置
         YudaoAiProperties.DouBaoProperties doubaoProperties = yudaoAiProperties.getDoubao();
         if (doubaoProperties == null) {
             throw new IllegalArgumentException("豆包配置未设置");
@@ -45,16 +166,15 @@ public class DouBaoTtsService {
             throw new IllegalArgumentException("豆包TTS配置不完整,缺少appId或accessKey");
         }
 
-        OkHttpClient client = new OkHttpClient();
-        // 使用Map构建请求体,按照文档要求组装参数
+        // 构建请求体
         Map<String, Object> requestMap = new HashMap<>();
         Map<String, Object> reqParams = new HashMap<>();
         reqParams.put("speaker", aiTtsDO.getModel()); // 使用配置的音色
         reqParams.put("text", content); // 待合成文本
         
         Map<String, Object> audioParams = new HashMap<>();
-        audioParams.put("format", "mp3"); // 输出音频格式
-        audioParams.put("sample_rate", 16000); // 推荐采样率
+        audioParams.put("format", format); // 输出音频格式
+        audioParams.put("sample_rate", SampleRateEnum.SAMPLE_RATE_16K); // 推荐采样率
         if (aiTtsDO.getEmotion() != null && !aiTtsDO.getEmotion().isEmpty()) {
             audioParams.put("emotion", aiTtsDO.getEmotion());
         }
@@ -79,19 +199,17 @@ public class DouBaoTtsService {
         //语音指令
         if (command != null && !command.isEmpty()) {
             additions.put("context_texts", List.of(command));
-        }else if (aiTtsDO.getCommand() != null && !aiTtsDO.getCommand().isEmpty()){
+        } else if (aiTtsDO.getCommand() != null && !aiTtsDO.getCommand().isEmpty()) {
             additions.put("context_texts", List.of(aiTtsDO.getCommand()));
         }
 
         // 将 additions 映射序列化为 JSON 字符串
-        ObjectMapper additionsMapper = new ObjectMapper();
-        String additionsJson = additionsMapper.writeValueAsString(additions);
+        ObjectMapper objectMapper = new ObjectMapper();
+        String additionsJson = objectMapper.writeValueAsString(additions);
         reqParams.put("additions", additionsJson);
         requestMap.put("req_params", reqParams);
 
-
         // 转换为JSON字符串
-        ObjectMapper objectMapper = new ObjectMapper();
         String requestBody = objectMapper.writeValueAsString(requestMap);
 
         MediaType mediaType = MediaType.parse("application/json");
@@ -99,7 +217,7 @@ public class DouBaoTtsService {
         String requestId = UUID.randomUUID().toString();
 
         // 构建请求
-        Request request = new Request.Builder()
+        return new Request.Builder()
                 .url(ttsUrl)
                 .addHeader("X-Api-App-Id", appId)
                 .addHeader("X-Api-Access-Key", accessKey)
@@ -108,68 +226,26 @@ public class DouBaoTtsService {
                 .addHeader("Content-Type", "application/json")
                 .post(body)
                 .build();
-        
-        // 发送请求并流式接收响应
-        try (Response response = client.newCall(request).execute()) {
-            if (!response.isSuccessful()) {
-                String errorBody = response.body() != null ? response.body().string() : "无响应体";
-                logger.error("豆包TTS请求失败,状态码: {}, 响应: {}, 错误体: {}", 
-                        response.code(), response.message(), errorBody);
-                throw new IOException("请求失败: " + response + ",错误体: " + errorBody);
-            }
-
-            // 读取响应体并逐行解析JSON,处理SSE流式响应
-            if (response.body() != null) {
-                try (InputStream inputStream = response.body().byteStream();
-                     java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(inputStream))) {
-                    String line;
-                    StringBuilder base64AudioBuilder = new StringBuilder();
-                    ObjectMapper objectMapper2 = new ObjectMapper();
-                    boolean hasAudioData = false;
-
-                    while ((line = reader.readLine()) != null) {
-                        if (line.trim().isEmpty()) {
-                            continue;
-                        }
-
-                        try {
-                            // 解析单行JSON
-                            Map<String, Object> responseMap = objectMapper2.readValue(line, Map.class);
-
-                            // 检查响应状态
-                            int code = (int) responseMap.get("code");
-                            if (code != 0 && code != 20000000) {
-                                String message = (String) responseMap.get("message");
-                                throw new IOException("豆包TTS服务返回错误: code=" + code + ", message=" + message);
-                            }
-
-                            // 提取音频数据
-                            Object data = responseMap.get("data");
-                            if (data instanceof String) {
-                                String chunk = data.toString();
-                                if (!chunk.isEmpty()) {
-                                    base64AudioBuilder.append(chunk);
-                                    hasAudioData = true;
-                                }
-                            }
-                        } catch (Exception e) {
-                            logger.warn("解析响应行失败: {}", e.getMessage());
-                        }
-                    }
+    }
 
-                    if (hasAudioData && !base64AudioBuilder.isEmpty()) {
-                        String base64Audio = base64AudioBuilder.toString();
+    /**
+     * 处理错误响应
+     */
+    private void handleErrorResponse(Response response) throws IOException {
+        String errorBody = response.body() != null ? response.body().string() : "无响应体";
+        logger.error("豆包TTS请求失败,状态码: {}, 响应: {}, 错误体: {}", 
+                response.code(), response.message(), errorBody);
+        throw new IOException("请求失败: " + response + ",错误体: " + errorBody);
+    }
 
-                        // 解码base64音频数据
-                        return java.util.Base64.getDecoder().decode(base64Audio);
-                    } else {
-                        // 没有音频数据
-                        logger.warn("豆包TTS响应没有音频数据");
-                        throw new IOException("豆包TTS响应没有音频数据");
-                    }
-                }
-            }
+    /**
+     * 检查响应状态
+     */
+    private void checkResponseStatus(Map<String, Object> responseMap) throws IOException {
+        int code = (int) responseMap.get("code");
+        if (code != 0 && code != 20000000) {
+            String message = (String) responseMap.get("message");
+            throw new IOException("豆包TTS服务返回错误: code=" + code + ", message=" + message);
         }
-        return new byte[0];
     }
 }

+ 4 - 3
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/tts/AiTtsServiceImpl.java → byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/tts/impl/AiTtsServiceImpl.java

@@ -1,4 +1,4 @@
-package cn.iocoder.byzs.module.ai.service.tts;
+package cn.iocoder.byzs.module.ai.service.tts.impl;
 
 import cn.hutool.core.collection.CollUtil;
 import cn.iocoder.byzs.framework.common.pojo.PageResult;
@@ -9,9 +9,10 @@ import cn.iocoder.byzs.module.ai.dal.dataobject.model.AiChatRoleDO;
 import cn.iocoder.byzs.module.ai.dal.dataobject.tts.AiTtsDO;
 import cn.iocoder.byzs.module.ai.dal.mysql.model.AiChatRoleMapper;
 import cn.iocoder.byzs.module.ai.dal.mysql.tts.AiTtsMapper;
-import cn.iocoder.byzs.module.ai.util.tts.StreamTtsService;
+import cn.iocoder.byzs.module.ai.service.tts.AiTtsService;
+import cn.iocoder.byzs.module.ai.service.tts.AliyunTtsService;
+import cn.iocoder.byzs.module.ai.service.tts.DouBaoTtsService;
 import cn.iocoder.byzs.module.infra.api.file.FileApi;
-import com.alibaba.nls.client.protocol.OutputFormatEnum;
 import jakarta.annotation.Resource;
 import org.springframework.stereotype.Service;
 import org.springframework.validation.annotation.Validated;

+ 52 - 0
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/util/tts/StreamingDouBaoTtsService.java

@@ -0,0 +1,52 @@
+package cn.iocoder.byzs.module.ai.util.tts;
+
+import cn.iocoder.byzs.module.ai.dal.dataobject.tts.AiTtsDO;
+import cn.iocoder.byzs.module.ai.service.tts.DouBaoTtsService;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Consumer;
+
+/**
+ * 豆包TTS流式处理服务
+ * 将同步的豆包TTS转换为模拟流式处理
+ */
+@Service
+@Slf4j
+public class StreamingDouBaoTtsService {
+
+    @Resource
+    private DouBaoTtsService douBaoTtsService;
+
+    /**
+     * 开始TTS语音合成
+     */
+    public void startTts(AiTtsDO aiTtsDO) {
+        log.info("豆包TTS服务已准备就绪");
+    }
+
+    /**
+     * 发送文本到TTS服务进行语音合成(流式处理)
+     */
+    public void sendText(AiTtsDO aiTtsDO, String text, Consumer<byte[]> audioDataCallback) {
+        if (text == null || text.trim().isEmpty()) {
+            return;
+        }
+
+        try {
+            // 使用豆包TTS服务进行流式合成
+            douBaoTtsService.streamTextToSpeech(aiTtsDO, text, null, audioDataCallback);
+        } catch (Exception e) {
+            log.error("豆包TTS合成失败", e);
+            throw new RuntimeException("豆包TTS合成失败", e);
+        }
+    }
+
+    /**
+     * 结束TTS语音合成
+     */
+    public void stopTts() {
+        log.info("豆包TTS服务已停止");
+    }
+}