Ver código fonte

1、试题选项按照ABCD排序
2、课程小智对话第一个初始答案单独走tts服务方法流传回

liyanbo 8 meses atrás
pai
commit
7a08816815

+ 1 - 1
byzs-course/src/main/java/cn/iocoder/byzs/module/bjdx/dal/mysql/coursequestoption/CourseQuestOptionMapper.java

@@ -26,7 +26,7 @@ public interface CourseQuestOptionMapper extends BaseMapperX<CourseQuestOptionDO
                 .likeIfPresent(CourseQuestOptionDO::getCqoOption, reqVO.getCqoOption())
                 .eqIfPresent(CourseQuestOptionDO::getCqoSocre, reqVO.getCqoSocre())
                 .eqIfPresent(CourseQuestOptionDO::getCqoIsAnswer, reqVO.getCqoIsAnswer())
-                .orderByDesc(CourseQuestOptionDO::getId));
+                .orderByAsc(CourseQuestOptionDO::getCqoValue));
     }
 
     List<CourseQuestOptionDO> selectListByQuestId(Set<Long> questIdSet);

+ 1 - 1
byzs-module-ai/src/main/java/cn/iocoder/byzs/module/ai/service/chat/AiChatMessageServiceImpl.java

@@ -305,7 +305,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
                     sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
                 }
             });
-            streamTtsService.setOnCompleteCallback(() -> {sink2.complete();});
+            streamTtsService.setOnCompleteCallback(sink2::complete);
         });
 
         // 使用merge而非mergeSequential,确保任一流完成不阻塞其他流

+ 414 - 72
byzs-web/src/main/java/cn/iocoder/byzs/module/web/service/ai/WebAiServiceImpl.java

@@ -2,9 +2,11 @@ package cn.iocoder.byzs.module.web.service.ai;
 
 import cn.hutool.core.util.ObjUtil;
 import cn.iocoder.byzs.framework.common.pojo.CommonResult;
+import cn.iocoder.byzs.framework.common.util.object.BeanUtils;
 import cn.iocoder.byzs.module.ai.controller.admin.chat.vo.message.AiChatMessageSendReqVO;
 import cn.iocoder.byzs.module.ai.controller.admin.chat.vo.message.AiChatMessageSendRespVO;
 import cn.iocoder.byzs.module.ai.dal.dataobject.chat.AiChatConversationDO;
+import cn.iocoder.byzs.module.ai.dal.dataobject.chat.AiChatMessageDO;
 import cn.iocoder.byzs.module.ai.dal.dataobject.model.AiChatRoleDO;
 import cn.iocoder.byzs.module.ai.dal.dataobject.tts.AiTtsDO;
 import cn.iocoder.byzs.module.ai.dal.mysql.tts.AiTtsMapper;
@@ -15,14 +17,15 @@ import cn.iocoder.byzs.module.ai.util.tts.StreamTtsService;
 import cn.iocoder.byzs.module.ai.util.tts.WavHeader;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.ai.chat.messages.MessageType;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.stereotype.Service;
 import org.springframework.validation.annotation.Validated;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.scheduler.Schedulers;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Base64;
+import java.time.LocalDateTime;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -33,9 +36,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static cn.iocoder.byzs.framework.common.exception.util.ServiceExceptionUtil.exception;
-import static cn.iocoder.byzs.module.ai.enums.ErrorCodeConstants.CHAT_CONVERSATION_NOT_EXISTS;
-import static cn.iocoder.byzs.framework.common.pojo.CommonResult.error;
 import static cn.iocoder.byzs.framework.common.pojo.CommonResult.success;
+import static cn.iocoder.byzs.module.ai.enums.ErrorCodeConstants.CHAT_CONVERSATION_NOT_EXISTS;
 
 /**
  * webAi Service 实现类
@@ -59,6 +61,10 @@ public class WebAiServiceImpl {
     @Resource
     private ObjectProvider<StreamTtsService> streamTtsServiceProvider;
 
+    /**
+     * 发送指定回答的SSE流式响应
+     * 确保TEXT类型文本数据优先且可靠地发送到前端,同时提供AUDIO音频流
+     */
     public Flux<CommonResult<AiChatMessageSendRespVO>> sendSpecifiedAnswerStream(AiChatMessageSendReqVO sendReqVO, Long userId) {
         // 1. 校验对话存在
         AiChatConversationDO conversation = chatConversationService
@@ -68,7 +74,33 @@ public class WebAiServiceImpl {
         }
 
         // 2. 获取TTS配置
-        AiChatRoleDO chatRole = chatRoleService.getChatRole(conversation.getRoleId());
+        AiTtsDO aiTtsDO = getTtsConfig(conversation.getRoleId());
+
+        // 3. 获取回答内容
+        String contentAnswer = sendReqVO.getContentAnswer();
+        log.info("开始处理文本内容: {}", contentAnswer);
+
+        // 4. 创建TTS服务实例
+        StreamTtsService streamTtsService = streamTtsServiceProvider.getObject();
+
+        try {
+            // 5. 初始化TTS服务
+            streamTtsService.startTts(aiTtsDO);
+            return createSseFlux(sendReqVO, userId, conversation, contentAnswer, streamTtsService);
+        } catch (Exception e) {
+            log.error("发送指定回答失败", e);
+            AtomicBoolean tempTtsStopped = new AtomicBoolean(false);
+            cleanupTtsResources(streamTtsService, tempTtsStopped);
+            // 即使发生异常,也要返回文本数据,确保前端至少能收到文本
+            return Flux.just(createFallbackTextResponse(sendReqVO, userId, conversation, contentAnswer));
+        }
+    }
+
+    /**
+     * 获取TTS配置信息
+     */
+    private AiTtsDO getTtsConfig(Long roleId) {
+        AiChatRoleDO chatRole = chatRoleService.getChatRole(roleId);
         if (chatRole == null || chatRole.getTtsId() == null) {
             throw exception(ErrorCodeConstants.TTS_NOT_EXISTS);
         }
@@ -76,102 +108,412 @@ public class WebAiServiceImpl {
         if (aiTtsDO == null) {
             throw exception(ErrorCodeConstants.TTS_NOT_EXISTS);
         }
+        return aiTtsDO;
+    }
 
-        // 3. 初始化TTS服务
-        StreamTtsService streamTtsService = streamTtsServiceProvider.getObject();
-        streamTtsService.startTts(aiTtsDO);
+    /**
+     * 创建SSE流
+     */
+    private Flux<CommonResult<AiChatMessageSendRespVO>> createSseFlux(
+            AiChatMessageSendReqVO sendReqVO, Long userId, AiChatConversationDO conversation,
+            String contentAnswer, StreamTtsService streamTtsService) {
+        return Flux.<CommonResult<AiChatMessageSendRespVO>>create(sink -> {
+                    // 初始化句子处理相关组件
+                    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
+                        Thread thread = new Thread(r, "text-processor");
+                        thread.setDaemon(true);
+                        return thread;
+                    });
+                    AtomicReference<ScheduledFuture<?>> ttsTask = new AtomicReference<>();
+                    StringBuilder contentTTSBuffer = new StringBuilder(contentAnswer);
+                    AtomicBoolean ttsStopped = new AtomicBoolean(false);
+                    AtomicBoolean allTextProcessed = new AtomicBoolean(false);
 
-        // 4. 处理指定回答内容
-        String contentAnswer = sendReqVO.getContentAnswer();
-        StringBuffer contentTTSBuffer = new StringBuffer(contentAnswer);
-        Pattern sentencePattern = Pattern.compile("[。!?;\n\r]");
-
-        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
-        AtomicReference<ScheduledFuture<?>> ttsTask = new AtomicReference<>();
-
-        // 5. 创建文本流
-        Flux<CommonResult<AiChatMessageSendRespVO>> textStream = Flux.just(success(
-                new AiChatMessageSendRespVO()
-                        .setEventType("TEXT")
-                        .setReceive(new AiChatMessageSendRespVO.Message().setContent(contentAnswer))
-        )).doOnComplete(() -> {
-            processRemainingText(streamTtsService, contentTTSBuffer);
-            if (ttsTask.get() != null) {
-                ttsTask.get().cancel(false);
-            }
-            scheduler.shutdown();
-        }).doOnError(throwable -> {
-            streamTtsService.stopTts();
-        }).doFinally(signalType -> {
-            streamTtsService.stopTts();
-        });
+                    try {
+                        // 发送文本数据(带type)
+                        sendTextData(sink, sendReqVO, userId, conversation, contentAnswer);
+
+                        // 创建音频流并订阅
+                        createAndSubscribeToAudioStream(sink, streamTtsService, scheduler, ttsTask, ttsStopped);
+
+                        // 开始处理文本分段并发送到TTS
+                        Pattern sentencePattern = Pattern.compile("[。!?;\n\r]");
+                        processTextSegments(streamTtsService, contentTTSBuffer, sentencePattern,
+                                scheduler, ttsTask, ttsStopped, allTextProcessed, sink);
+
+                        // 添加超时检测(60秒)
+                        ScheduledFuture<?> timeoutTask = scheduler.schedule(() -> {
+                            if (!ttsStopped.get()) {
+                                log.warn("TTS处理超时,强制终止SSE流");
+                                cleanupResources(streamTtsService, scheduler, ttsTask, ttsStopped, sink);
+                            }
+                        }, 60, TimeUnit.SECONDS);
+
+                        // 当SSE流取消时的处理
+                        sink.onCancel(() -> {
+                            log.info("SSE流被取消");
+                            if (timeoutTask != null) {
+                                timeoutTask.cancel(false);
+                            }
+                            cleanupResources(streamTtsService, scheduler, ttsTask, ttsStopped, sink);
+                        });
+
+                    } catch (Exception e) {
+                        log.error("创建SSE流异常", e);
+                        cleanupResources(streamTtsService, scheduler, ttsTask, ttsStopped, sink);
+                        sink.error(e);
+                    }
+                }).subscribeOn(Schedulers.boundedElastic())
+                .doFinally(signalType -> {
+                    log.info("SSE流已终止: {}", signalType);
+                });
+    }
 
-        // 6. 创建音频流
-        Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.create(sink -> {
-            AtomicBoolean isFirstChunk = new AtomicBoolean(true);
+    /**
+     * 发送文本数据到SSE流
+     */
+    private void sendTextData(FluxSink<CommonResult<AiChatMessageSendRespVO>> sink,
+                              AiChatMessageSendReqVO sendReqVO, Long userId,
+                              AiChatConversationDO conversation, String contentAnswer) {
+        AiChatMessageSendRespVO textResp = new AiChatMessageSendRespVO();
+        textResp.setEventType("TEXT");
+
+        AiChatMessageDO userMessage = new AiChatMessageDO().setConversationId(conversation.getId()).setReplyId(null)
+                .setModel(conversation.getModel()).setModelId(conversation.getModelId()).setUserId(userId).setRoleId(conversation.getRoleId())
+                .setType(MessageType.USER.getValue()).setContent(sendReqVO.getContent()).setUseContext(sendReqVO.getUseContext());
+        userMessage.setCreateTime(LocalDateTime.now());
+        textResp.setSend(BeanUtils.toBean(userMessage, AiChatMessageSendRespVO.Message.class));
+
+        AiChatMessageDO assistantMessage = new AiChatMessageDO().setConversationId(conversation.getId()).setReplyId(null)
+                .setModel(conversation.getModel()).setModelId(conversation.getModelId()).setUserId(userId).setRoleId(conversation.getRoleId())
+                .setType(MessageType.ASSISTANT.getValue()).setContent(contentAnswer).setUseContext(sendReqVO.getUseContext());
+        assistantMessage.setCreateTime(LocalDateTime.now());
+        textResp.setReceive(BeanUtils.toBean(assistantMessage, AiChatMessageSendRespVO.Message.class));
+
+        CommonResult<AiChatMessageSendRespVO> textResult = success(textResp);
+        log.info("准备发送文本数据到前端");
+        sink.next(textResult);
+        log.info("文本数据已发送到前端");
+    }
+
+    /**
+     * 创建并订阅音频流
+     */
+    private void createAndSubscribeToAudioStream(FluxSink<CommonResult<AiChatMessageSendRespVO>> mainSink,
+                                                 StreamTtsService streamTtsService,
+                                                 ScheduledExecutorService scheduler,
+                                                 AtomicReference<ScheduledFuture<?>> ttsTask,
+                                                 AtomicBoolean ttsStopped) {
+        Flux.<CommonResult<AiChatMessageSendRespVO>>create(audioSink -> {
+            AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
+
+            // 设置音频数据回调
             streamTtsService.setAudioDataCallback(audioBytes -> {
                 try {
-                    byte[] processedAudio;
-                    if (isFirstChunk.getAndSet(false)) {
-                        processedAudio = WavHeader.addWavHeader(audioBytes, 16000, 16, 1);
-                        log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
-                    } else {
-                        processedAudio = audioBytes;
-                    }
-                    String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
+                    byte[] processedAudio = processAudioData(audioBytes, isFirstChunk);
+                    String base64Audio = java.util.Base64.getEncoder().encodeToString(processedAudio);
+
                     AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
                     audioResp.setEventType("AUDIO");
                     audioResp.setAudioData(base64Audio);
-                    sink.next(success(audioResp));
+                    audioSink.next(success(audioResp));
                 } catch (Exception e) {
                     log.error("[TTS处理异常] 音频编码失败", e);
-                    sink.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
+                    audioSink.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
                 }
             });
-            streamTtsService.setOnCompleteCallback(sink::complete);
 
-            // 立即处理文本
-            ttsTask.set(scheduler.schedule(() -> {
-                Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
-                if (matcher.find()) {
-                    processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
-                } else if (!contentTTSBuffer.isEmpty()) {
-                    processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
+            // 设置完成回调
+            streamTtsService.setOnCompleteCallback(() -> {
+                log.info("TTS转换完成,准备终止SSE流");
+
+                // 确保只停止一次TTS服务
+                if (ttsStopped.compareAndSet(false, true)) {
+                    try {
+                        log.info("TTS完成回调中调用stopTts()");
+                        streamTtsService.stopTts();
+                        log.info("TTS完成回调中stopTts()调用完成");
+                    } catch (Exception e) {
+                        log.error("停止TTS服务异常", e);
+                    }
                 }
-            }, 100, TimeUnit.MILLISECONDS));
-        });
 
-        // 7. 合并流并返回
-        return Flux.merge(textStream, audioStream)
-                .doFinally(signalType -> {
-                    streamTtsService.setAudioDataCallback(null);
-                    streamTtsService.setOnCompleteCallback(null);
-                    scheduler.shutdownNow();
-                });
+                // 先完成音频流
+                try {
+                    if (!audioSink.isCancelled()) {
+                        audioSink.complete();
+                        log.info("音频流已成功完成");
+                    }
+                } catch (Exception e) {
+                    log.error("完成音频流异常", e);
+                }
+
+                // 确保主SSE流终止
+                try {
+                    if (!mainSink.isCancelled()) {
+                        log.info("TTS完成回调中终止主SSE流");
+                        // 添加一个小延迟确保所有数据都已发送
+                        scheduler.schedule(() -> {
+                            if (!mainSink.isCancelled()) {
+                                mainSink.complete();
+                                log.info("主SSE流已成功终止");
+                            }
+                        }, 100, TimeUnit.MILLISECONDS);
+                    }
+                } catch (Exception e) {
+                    log.error("终止主SSE流异常", e);
+                }
+            });
+        }).subscribe(
+                chunk -> {
+                    if (!mainSink.isCancelled()) {
+                        mainSink.next(chunk);
+                    }
+                },
+                error -> {
+                    log.error("音频流处理异常", error);
+                    try {
+                        if (!mainSink.isCancelled()) {
+                            mainSink.error(error);
+                        }
+                    } catch (Exception e) {
+                        log.error("主SSE流设置错误异常", e);
+                    }
+                    cleanupResources(streamTtsService, scheduler, ttsTask, ttsStopped, mainSink);
+                },
+                () -> {
+                    log.info("音频流处理完成,准备终止主SSE流");
+                    // 音频流完成时,如果主SSE流还未终止,主动终止
+                    try {
+                        if (!mainSink.isCancelled()) {
+                            log.info("音频流完成后主动终止主SSE流");
+                            mainSink.complete();
+                            log.info("主SSE流已成功终止");
+                        }
+                    } catch (Exception e) {
+                        log.error("终止主SSE流异常", e);
+                    }
+                }
+        );
+    }
+
+    /**
+     * 处理音频数据(添加WAV头)
+     */
+    private byte[] processAudioData(byte[] audioBytes, AtomicBoolean isFirstChunk) {
+        if (isFirstChunk.getAndSet(false)) {
+            // 仅首包添加WAV头
+            byte[] processedAudio = WavHeader.addWavHeader(audioBytes, 16000, 16, 1);
+            log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
+            return processedAudio;
+        } else {
+            // 后续包直接使用原始PCM数据
+            return audioBytes;
+        }
+    }
+
+    /**
+     * 处理文本分段
+     */
+    private void processTextSegments(StreamTtsService streamTtsService, StringBuilder buffer,
+                                     Pattern sentencePattern, ScheduledExecutorService scheduler,
+                                     AtomicReference<ScheduledFuture<?>> ttsTask, AtomicBoolean ttsStopped,
+                                     AtomicBoolean allTextProcessed, FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
+        if (buffer.isEmpty()) {
+            log.info("文本为空,无需处理");
+            handleTextComplete(streamTtsService, scheduler, ttsStopped, allTextProcessed, sink);
+            return;
+        }
+
+        // 立即处理文本
+        Matcher matcher = sentencePattern.matcher(buffer);
+        if (matcher.find()) {
+            processCompleteSentence(streamTtsService, buffer, matcher);
+            // 继续调度处理剩余文本
+            scheduleNextProcessing(streamTtsService, buffer, sentencePattern, scheduler,
+                    ttsTask, ttsStopped, allTextProcessed, sink);
+        } else if (buffer.length() > 50) { // 最长50字未结束也处理
+            processCompleteSentence(streamTtsService, buffer, buffer.length());
+            // 继续调度处理剩余文本
+            scheduleNextProcessing(streamTtsService, buffer, sentencePattern, scheduler,
+                    ttsTask, ttsStopped, allTextProcessed, sink);
+        } else {
+            // 文本较短且未结束,直接处理全部
+            log.info("TTS合成短文本: {}", buffer.toString());
+            streamTtsService.sendText(buffer.toString());
+            buffer.setLength(0);
+            handleTextComplete(streamTtsService, scheduler, ttsStopped, allTextProcessed, sink);
+        }
+    }
+
+    /**
+     * 调度下一次文本处理
+     */
+    private void scheduleNextProcessing(StreamTtsService streamTtsService, StringBuilder buffer,
+                                        Pattern sentencePattern, ScheduledExecutorService scheduler,
+                                        AtomicReference<ScheduledFuture<?>> ttsTask, AtomicBoolean ttsStopped,
+                                        AtomicBoolean allTextProcessed, FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
+        if (!buffer.isEmpty()) {
+            // 延迟200ms执行,合并短时间内处理的文本片段
+            if (ttsTask.get() != null) {
+                ttsTask.get().cancel(false); // 取消之前的延迟任务
+            }
+            ttsTask.set(scheduler.schedule(() -> {
+                processTextSegments(streamTtsService, buffer, sentencePattern, scheduler,
+                        ttsTask, ttsStopped, allTextProcessed, sink);
+            }, 200, TimeUnit.MILLISECONDS));
+        } else {
+            // 所有文本处理完毕
+            handleTextComplete(streamTtsService, scheduler, ttsStopped, allTextProcessed, sink);
+        }
     }
 
-    // 处理完整句子
-    private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, Matcher matcher) {
+    /**
+     * 处理文本完成后的逻辑
+     */
+    private void handleTextComplete(StreamTtsService streamTtsService, ScheduledExecutorService scheduler,
+                                    AtomicBoolean ttsStopped, AtomicBoolean allTextProcessed,
+                                    FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
+        allTextProcessed.set(true);
+        log.info("所有文本处理完毕,准备通知TTS服务文本已发送完毕");
+
+        // 关键修改:立即通知TTS服务文本已全部发送完毕
+        try {
+            // 设置标志位
+            ttsStopped.set(true);
+            // 调用stopTts()明确通知TTS服务文本已全部发送
+            streamTtsService.stopTts();
+            log.info("已通知TTS服务文本发送完毕,等待TTS合成完成回调");
+        } catch (Exception e) {
+            log.error("通知TTS服务文本发送完毕异常: {}", e.getMessage());
+        }
+
+        // 添加额外的超时检测,作为最后的保障
+        scheduler.schedule(() -> {
+            if (allTextProcessed.get() && !ttsStopped.get()) {
+                log.info("所有文本已发送到TTS服务,但TTS未完成回调,主动终止TTS服务和SSE流");
+                try {
+                    // 设置标志位
+                    ttsStopped.set(true);
+                    // 尝试停止TTS服务
+                    try {
+                        streamTtsService.stopTts();
+                        log.info("超时后主动停止TTS服务完成");
+                    } catch (Exception e) {
+                        log.error("停止TTS服务异常: {}", e.getMessage());
+                        // 即使停止失败,也要确保SSE流终止
+                    }
+
+                    // 确保SSE流终止,无论TTS服务是否成功停止
+                    if (sink != null && !sink.isCancelled()) {
+                        log.info("在handleTextComplete超时检测中主动终止主SSE流");
+                        sink.complete();
+                        log.info("主SSE流已成功终止");
+                    }
+                } catch (Exception e) {
+                    log.error("超时检测处理异常", e);
+                }
+            }
+        }, 60, TimeUnit.SECONDS); // 等待60秒后检查
+    }
+
+    /**
+     * 处理完整句子
+     */
+    private void processCompleteSentence(StreamTtsService streamTtsService, StringBuilder buffer, Matcher matcher) {
         String sentence = buffer.substring(0, matcher.end());
         streamTtsService.sendText(sentence);
         buffer.delete(0, matcher.end());
         log.info("TTS合成完整句: {}", sentence);
     }
 
-    // 处理指定长度文本
-    private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, int length) {
+    /**
+     * 处理指定长度文本
+     */
+    private void processCompleteSentence(StreamTtsService streamTtsService, StringBuilder buffer, int length) {
         String sentence = buffer.substring(0, length);
         streamTtsService.sendText(sentence);
         buffer.delete(0, length);
         log.info("TTS合成长文本: {}", sentence);
     }
 
-    // 处理剩余文本
-    private void processRemainingText(StreamTtsService streamTtsService, StringBuffer buffer) {
-        if (!buffer.isEmpty()) {
-            streamTtsService.sendText(buffer.toString());
-            buffer.setLength(0);
+    /**
+     * 清理所有资源
+     */
+    private void cleanupResources(StreamTtsService streamTtsService,
+                                  ScheduledExecutorService scheduler,
+                                  AtomicReference<ScheduledFuture<?>> ttsTask,
+                                  AtomicBoolean ttsStopped,
+                                  FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
+        if (ttsTask != null && ttsTask.get() != null) {
+            ttsTask.get().cancel(false);
+        }
+        if (scheduler != null && !scheduler.isShutdown()) {
+            scheduler.shutdownNow();
+        }
+        cleanupTtsResources(streamTtsService, ttsStopped);
+        ttsStopped.set(true);
+
+        // 确保SSE流终止
+        try {
+            if (sink != null && !sink.isCancelled()) {
+                log.info("在cleanupResources中终止主SSE流");
+                sink.complete();
+                log.info("主SSE流已成功终止");
+            }
+        } catch (Exception e) {
+            log.error("在cleanupResources中终止主SSE流异常", e);
         }
     }
 
+    /**
+     * 清理TTS服务资源
+     */
+    private void cleanupTtsResources(StreamTtsService streamTtsService, AtomicBoolean ttsStopped) {
+        try {
+            if (streamTtsService != null) {
+                log.info("开始清理TTS服务资源");
+
+                // 检查ttsStopped标志位,避免重复停止TTS服务
+                if (!ttsStopped.get()) {
+                    try {
+                        log.info("调用streamTtsService.stopTts()");
+                        streamTtsService.stopTts();
+                        log.info("streamTtsService.stopTts()调用完成");
+                    } catch (Exception e) {
+                        log.error("停止TTS服务异常", e);
+                    }
+                } else {
+                    log.info("TTS服务已停止,跳过stopTts()  调用");
+                }
+            }
+        } catch (Exception e) {
+            log.error("清理TTS资源异常", e);
+        }
+    }
+
+
+    /**
+     * 创建降级文本响应
+     */
+    private CommonResult<AiChatMessageSendRespVO> createFallbackTextResponse(
+            AiChatMessageSendReqVO sendReqVO, Long userId,
+            AiChatConversationDO conversation, String contentAnswer) {
+        AiChatMessageSendRespVO fallbackTextResp = new AiChatMessageSendRespVO();
+        fallbackTextResp.setEventType("TEXT");
+
+        AiChatMessageDO userMessage = new AiChatMessageDO().setConversationId(conversation.getId()).setReplyId(null)
+                .setModel(conversation.getModel()).setModelId(conversation.getModelId()).setUserId(userId).setRoleId(conversation.getRoleId())
+                .setType(MessageType.USER.getValue()).setContent(sendReqVO.getContent()).setUseContext(sendReqVO.getUseContext());
+        userMessage.setCreateTime(LocalDateTime.now());
+        fallbackTextResp.setSend(BeanUtils.toBean(userMessage, AiChatMessageSendRespVO.Message.class));
+
+        AiChatMessageDO message = new AiChatMessageDO().setConversationId(conversation.getId()).setReplyId(null)
+                .setModel(conversation.getModel()).setModelId(conversation.getModelId()).setUserId(userId).setRoleId(conversation.getRoleId())
+                .setType(String.valueOf(MessageType.ASSISTANT)).setContent(contentAnswer).setUseContext(sendReqVO.getUseContext());
+        message.setCreateTime(LocalDateTime.now());
+        fallbackTextResp.setReceive(BeanUtils.toBean(message, AiChatMessageSendRespVO.Message.class));
+
+        return success(fallbackTextResp);
+    }
 }