|
|
@@ -42,6 +42,7 @@ import org.springframework.ai.chat.model.ChatResponse;
|
|
|
import org.springframework.ai.chat.model.StreamingChatModel;
|
|
|
import org.springframework.ai.chat.prompt.ChatOptions;
|
|
|
import org.springframework.ai.chat.prompt.Prompt;
|
|
|
+import org.springframework.beans.factory.ObjectProvider;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
import reactor.core.publisher.DirectProcessor;
|
|
|
@@ -55,6 +56,7 @@ import java.util.*;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
@@ -99,8 +101,12 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
private AiKnowledgeDocumentService knowledgeDocumentService;
|
|
|
@Resource
|
|
|
private AiToolService toolService;
|
|
|
+// @Resource
|
|
|
+// private StreamTtsService streamTtsService; // 注入TTS服务
|
|
|
+ // 在AiChatMessageServiceImpl中注入
|
|
|
@Resource
|
|
|
- private StreamTtsService streamTtsService; // 注入TTS服务
|
|
|
+ private ObjectProvider<StreamTtsService> streamTtsServiceProvider; // 使用ObjectProvider获取原型bean
|
|
|
+
|
|
|
@Resource
|
|
|
private AiTtsMapper ttsMapper;
|
|
|
|
|
|
@@ -190,6 +196,12 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
|
|
|
// 创建一个Processor用于合并文本和音频流
|
|
|
// 创建一个共享的响应对象
|
|
|
+ // 4.3 初始化TTS服务 - 创建新的实例而非使用共享实例
|
|
|
+
|
|
|
+ // 在sendChatMessageStream方法中获取实例
|
|
|
+ StreamTtsService streamTtsService = streamTtsServiceProvider.getObject();
|
|
|
+ streamTtsService.startTts(aiTtsDO);
|
|
|
+
|
|
|
AtomicReference<CommonResult<AiChatMessageSendRespVO>> sharedResponse = new AtomicReference<>();
|
|
|
|
|
|
DirectProcessor<CommonResult<AiChatMessageSendRespVO>> processor = DirectProcessor.create();
|
|
|
@@ -202,7 +214,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
StringBuffer contentBuffer = new StringBuffer();
|
|
|
StringBuffer contentTTSBuffer = new StringBuffer();
|
|
|
// 添加句子结束符正则表达式
|
|
|
- Pattern sentencePattern = Pattern.compile("[。!?;,\n\r]"); // 增加换行符支持
|
|
|
+ Pattern sentencePattern = Pattern.compile("[。!?;\n\r]"); // 增加换行符支持
|
|
|
|
|
|
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
|
AtomicReference<ScheduledFuture<?>> ttsTask = new AtomicReference<>();
|
|
|
@@ -233,14 +245,14 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
ttsTask.get().cancel(false); // 取消之前的延迟任务
|
|
|
}
|
|
|
// 延迟500ms执行,合并短时间内到达的文本片段
|
|
|
-// ttsTask.set(scheduler.schedule(() -> {
|
|
|
+ ttsTask.set(scheduler.schedule(() -> {
|
|
|
Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
|
|
|
if (matcher.find()) {
|
|
|
- processCompleteSentence(contentTTSBuffer, matcher);
|
|
|
+ processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
|
|
|
} else if (contentTTSBuffer.length() > 20) { // 最长20字未结束也处理
|
|
|
- processCompleteSentence(contentTTSBuffer, contentTTSBuffer.length());
|
|
|
+ processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
|
|
|
}
|
|
|
-// }, 500, TimeUnit.MILLISECONDS));
|
|
|
+ }, 500, TimeUnit.MILLISECONDS));
|
|
|
|
|
|
CommonResult<AiChatMessageSendRespVO> result = success(new AiChatMessageSendRespVO()
|
|
|
.setEventType("TEXT")
|
|
|
@@ -251,7 +263,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
return result;
|
|
|
}).doOnComplete(() -> {
|
|
|
|
|
|
- processRemainingText(contentTTSBuffer); // 处理剩余文本
|
|
|
+ processRemainingText(streamTtsService, contentTTSBuffer); // 处理剩余文本
|
|
|
|
|
|
// 忽略租户,因为 Flux 异步无法透传租户
|
|
|
TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
|
|
|
@@ -283,6 +295,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
// ==== 添加finally块清理 ====
|
|
|
.doFinally(signalType -> {
|
|
|
streamTtsService.setAudioDataCallback(null);
|
|
|
+
|
|
|
}).onErrorResume(error -> Flux.just(error(ErrorCodeConstants.CHAT_STREAM_ERROR)));
|
|
|
|
|
|
// 创建音频流
|
|
|
@@ -318,7 +331,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
}
|
|
|
|
|
|
// 处理完整句子
|
|
|
- private void processCompleteSentence(StringBuffer buffer, Matcher matcher) {
|
|
|
+ private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, Matcher matcher) {
|
|
|
String sentence = buffer.substring(0, matcher.end());
|
|
|
System.out.println("==============[处理完整句子[[buffer: " + sentence);
|
|
|
streamTtsService.sendText(sentence);
|
|
|
@@ -327,7 +340,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
}
|
|
|
|
|
|
// 处理指定长度文本
|
|
|
- private void processCompleteSentence(StringBuffer buffer, int length) {
|
|
|
+ private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, int length) {
|
|
|
String sentence = buffer.substring(0, length);
|
|
|
System.out.println("==============[处理指定长度文本[[buffer: " + sentence);
|
|
|
streamTtsService.sendText(sentence);
|
|
|
@@ -336,7 +349,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
}
|
|
|
|
|
|
// 处理剩余文本
|
|
|
- private void processRemainingText(StringBuffer buffer) {
|
|
|
+ private void processRemainingText(StreamTtsService streamTtsService, StringBuffer buffer) {
|
|
|
if (!buffer.isEmpty()) {
|
|
|
System.out.println("TTS合成剩余文本: " + buffer);
|
|
|
streamTtsService.sendText(buffer.toString());
|