|
@@ -57,6 +57,7 @@ import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.regex.Matcher;
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
import java.util.regex.Pattern;
|
|
@@ -194,22 +195,11 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
Prompt prompt = buildPrompt(conversation, historyMessages, knowledgeSegments, model, sendReqVO);
|
|
Prompt prompt = buildPrompt(conversation, historyMessages, knowledgeSegments, model, sendReqVO);
|
|
|
Flux<ChatResponse> streamResponse = chatModel.stream(prompt);
|
|
Flux<ChatResponse> streamResponse = chatModel.stream(prompt);
|
|
|
|
|
|
|
|
- // 创建一个Processor用于合并文本和音频流
|
|
|
|
|
- // 创建一个共享的响应对象
|
|
|
|
|
// 4.3 初始化TTS服务 - 创建新的实例而非使用共享实例
|
|
// 4.3 初始化TTS服务 - 创建新的实例而非使用共享实例
|
|
|
-
|
|
|
|
|
// 在sendChatMessageStream方法中获取实例
|
|
// 在sendChatMessageStream方法中获取实例
|
|
|
StreamTtsService streamTtsService = streamTtsServiceProvider.getObject();
|
|
StreamTtsService streamTtsService = streamTtsServiceProvider.getObject();
|
|
|
streamTtsService.startTts(aiTtsDO);
|
|
streamTtsService.startTts(aiTtsDO);
|
|
|
|
|
|
|
|
- AtomicReference<CommonResult<AiChatMessageSendRespVO>> sharedResponse = new AtomicReference<>();
|
|
|
|
|
-
|
|
|
|
|
- DirectProcessor<CommonResult<AiChatMessageSendRespVO>> processor = DirectProcessor.create();
|
|
|
|
|
- FluxSink<CommonResult<AiChatMessageSendRespVO>> sink = processor.sink();
|
|
|
|
|
-
|
|
|
|
|
- // 4.3 初始化TTS服务
|
|
|
|
|
- streamTtsService.startTts(aiTtsDO);
|
|
|
|
|
-
|
|
|
|
|
// 4.4 流式返回并处理TTS
|
|
// 4.4 流式返回并处理TTS
|
|
|
StringBuffer contentBuffer = new StringBuffer();
|
|
StringBuffer contentBuffer = new StringBuffer();
|
|
|
StringBuffer contentTTSBuffer = new StringBuffer();
|
|
StringBuffer contentTTSBuffer = new StringBuffer();
|
|
@@ -249,7 +239,7 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
|
|
Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
|
|
|
if (matcher.find()) {
|
|
if (matcher.find()) {
|
|
|
processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
|
|
processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
|
|
|
- } else if (contentTTSBuffer.length() > 20) { // 最长20字未结束也处理
|
|
|
|
|
|
|
+ } else if (contentTTSBuffer.length() > 50) { // 最长50字未结束也处理
|
|
|
processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
|
|
processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
|
|
|
}
|
|
}
|
|
|
}, 500, TimeUnit.MILLISECONDS));
|
|
}, 500, TimeUnit.MILLISECONDS));
|
|
@@ -259,7 +249,6 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
.setSend(BeanUtils.toBean(userMessage, AiChatMessageSendRespVO.Message.class))
|
|
.setSend(BeanUtils.toBean(userMessage, AiChatMessageSendRespVO.Message.class))
|
|
|
.setReceive(BeanUtils.toBean(assistantMessage, AiChatMessageSendRespVO.Message.class)
|
|
.setReceive(BeanUtils.toBean(assistantMessage, AiChatMessageSendRespVO.Message.class)
|
|
|
.setContent(newContent).setSegments(segments)));
|
|
.setContent(newContent).setSegments(segments)));
|
|
|
- sharedResponse.set(result);
|
|
|
|
|
return result;
|
|
return result;
|
|
|
}).doOnComplete(() -> {
|
|
}).doOnComplete(() -> {
|
|
|
|
|
|
|
@@ -269,7 +258,6 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
|
|
TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
|
|
|
new AiChatMessageDO().setId(assistantMessage.getId()).setContent(contentBuffer.toString())));
|
|
new AiChatMessageDO().setId(assistantMessage.getId()).setContent(contentBuffer.toString())));
|
|
|
|
|
|
|
|
-
|
|
|
|
|
if (ttsTask.get() != null) {
|
|
if (ttsTask.get() != null) {
|
|
|
ttsTask.get().cancel(false);
|
|
ttsTask.get().cancel(false);
|
|
|
}
|
|
}
|
|
@@ -277,9 +265,6 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
|
|
|
|
|
// 通知TTS服务文本发送完成
|
|
// 通知TTS服务文本发送完成
|
|
|
streamTtsService.stopTts();
|
|
streamTtsService.stopTts();
|
|
|
- // ==== 添加回调清理 ====
|
|
|
|
|
- streamTtsService.setAudioDataCallback(null);
|
|
|
|
|
- sink.complete(); // 完成流
|
|
|
|
|
}).doOnError(throwable -> {
|
|
}).doOnError(throwable -> {
|
|
|
log.error("[sendChatMessageStream][userId({}) sendReqVO({}) 发生异常]", userId, sendReqVO, throwable);
|
|
log.error("[sendChatMessageStream][userId({}) sendReqVO({}) 发生异常]", userId, sendReqVO, throwable);
|
|
|
// 忽略租户,因为 Flux 异步无法透传租户
|
|
// 忽略租户,因为 Flux 异步无法透传租户
|
|
@@ -287,38 +272,39 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage())));
|
|
new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage())));
|
|
|
// 发生错误时停止TTS服务
|
|
// 发生错误时停止TTS服务
|
|
|
streamTtsService.stopTts();
|
|
streamTtsService.stopTts();
|
|
|
- // ==== 添加回调清理 ====
|
|
|
|
|
- streamTtsService.setAudioDataCallback(null);
|
|
|
|
|
- // =====================
|
|
|
|
|
- sink.error(throwable); // 传递错误
|
|
|
|
|
})
|
|
})
|
|
|
// ==== 添加finally块清理 ====
|
|
// ==== 添加finally块清理 ====
|
|
|
.doFinally(signalType -> {
|
|
.doFinally(signalType -> {
|
|
|
- streamTtsService.setAudioDataCallback(null);
|
|
|
|
|
-
|
|
|
|
|
|
|
+ // 通知TTS服务文本发送完成
|
|
|
|
|
+ streamTtsService.stopTts();
|
|
|
}).onErrorResume(error -> Flux.just(error(ErrorCodeConstants.CHAT_STREAM_ERROR)));
|
|
}).onErrorResume(error -> Flux.just(error(ErrorCodeConstants.CHAT_STREAM_ERROR)));
|
|
|
|
|
|
|
|
// 创建音频流
|
|
// 创建音频流
|
|
|
Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.create(sink2 -> {
|
|
Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.create(sink2 -> {
|
|
|
|
|
+ AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
|
|
|
streamTtsService.setAudioDataCallback(audioBytes -> {
|
|
streamTtsService.setAudioDataCallback(audioBytes -> {
|
|
|
try {
|
|
try {
|
|
|
- // 确保TTS输出WAV格式(带文件头)
|
|
|
|
|
- byte[] wavAudioWithHeader = addWavHeader(audioBytes, 24000, 16, 1);
|
|
|
|
|
- String base64Audio = Base64.getEncoder().encodeToString(wavAudioWithHeader);
|
|
|
|
|
|
|
+ byte[] processedAudio;
|
|
|
|
|
+ if (isFirstChunk.getAndSet(false)) {
|
|
|
|
|
+ // 仅首包添加WAV头
|
|
|
|
|
+ processedAudio = addWavHeader(audioBytes, 16000, 16, 1);
|
|
|
|
|
+ log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 后续包直接使用原始PCM数据
|
|
|
|
|
+ processedAudio = audioBytes;
|
|
|
|
|
+ }
|
|
|
|
|
+// byte[] processedAudio = addWavHeader(audioBytes, 24000, 16, 1);
|
|
|
|
|
+ String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
|
|
|
AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
|
|
AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
|
|
|
audioResp.setEventType("AUDIO");
|
|
audioResp.setEventType("AUDIO");
|
|
|
audioResp.setAudioData(base64Audio);
|
|
audioResp.setAudioData(base64Audio);
|
|
|
sink2.next(success(audioResp));
|
|
sink2.next(success(audioResp));
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("[TTS处理异常][userId({}) sendReqVO({})] 音频编码失败", userId, sendReqVO, e); // 更详细的日志
|
|
|
|
|
|
|
+ log.error("[TTS处理异常] 音频编码失败", e);
|
|
|
sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
|
|
sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
- // 设置TTS完成回调,终止音频流
|
|
|
|
|
- streamTtsService.setOnCompleteCallback(() -> {
|
|
|
|
|
- sink2.complete();
|
|
|
|
|
- log.info("音频流已完成");
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ streamTtsService.setOnCompleteCallback(() -> {sink2.complete();});
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
// 使用merge而非mergeSequential,确保任一流完成不阻塞其他流
|
|
// 使用merge而非mergeSequential,确保任一流完成不阻塞其他流
|