|
|
@@ -174,9 +174,18 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
AiModelDO model = modalService.validateModel(conversation.getModelId());
|
|
|
StreamingChatModel chatModel = modalService.getChatModel(model.getId());
|
|
|
//角色
|
|
|
- AiChatRoleDO chatRole = chatRoleService.getChatRole(conversation.getRoleId());
|
|
|
+ AiChatRoleDO chatRole = null;
|
|
|
+ if (conversation.getRoleId() != null) {
|
|
|
+ chatRole = chatRoleService.getChatRole(conversation.getRoleId());
|
|
|
+ }
|
|
|
//发声人
|
|
|
- AiTtsDO aiTtsDO = ttsMapper.selectById(chatRole.getTtsId());
|
|
|
+ AiTtsDO aiTtsDO = null;
|
|
|
+ if (chatRole != null) {
|
|
|
+ aiTtsDO = ttsMapper.selectById(chatRole.getTtsId());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 添加useTts标志,判断是否使用TTS服务
|
|
|
+ boolean useTts = aiTtsDO != null;
|
|
|
|
|
|
// 2. 知识库找回
|
|
|
List<AiKnowledgeSegmentSearchRespBO> knowledgeSegments = recallKnowledgeSegment(sendReqVO.getContent(),
|
|
|
@@ -197,27 +206,40 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
Flux<ChatResponse> streamResponse = chatModel.stream(prompt);
|
|
|
|
|
|
// 4.3 初始化TTS服务 - 创建新的实例而非使用共享实例
|
|
|
- // 在sendChatMessageStream方法中获取实例
|
|
|
- StreamTtsService streamTtsService = streamTtsServiceProvider.getObject();
|
|
|
- streamTtsService.startTts(aiTtsDO);
|
|
|
+ StreamTtsService streamTtsService;
|
|
|
+ ScheduledExecutorService scheduler;
|
|
|
+ AtomicReference<ScheduledFuture<?>> ttsTask;
|
|
|
+ StringBuffer contentTTSBuffer;
|
|
|
+ Pattern sentencePattern;
|
|
|
+
|
|
|
+ if (useTts) {
|
|
|
+ // 只有当需要使用TTS服务时才创建实例
|
|
|
+ streamTtsService = streamTtsServiceProvider.getObject();
|
|
|
+ streamTtsService.startTts(aiTtsDO);
|
|
|
+
|
|
|
+ contentTTSBuffer = new StringBuffer();
|
|
|
+ sentencePattern = Pattern.compile("[。!?;\n\r]"); // 增加换行符支持
|
|
|
+ scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
|
+ ttsTask = new AtomicReference<>();
|
|
|
+ } else {
|
|
|
+ streamTtsService = null;
|
|
|
+ sentencePattern = null;
|
|
|
+ scheduler = null;
|
|
|
+ ttsTask = null;
|
|
|
+ contentTTSBuffer = null;
|
|
|
+ }
|
|
|
|
|
|
// 4.4 流式返回并处理TTS
|
|
|
StringBuffer contentBuffer = new StringBuffer();
|
|
|
- StringBuffer contentTTSBuffer = new StringBuffer();
|
|
|
- // 添加句子结束符正则表达式
|
|
|
- Pattern sentencePattern = Pattern.compile("[。!?;\n\r]"); // 增加换行符支持
|
|
|
-
|
|
|
- ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
|
- AtomicReference<ScheduledFuture<?>> ttsTask = new AtomicReference<>();
|
|
|
|
|
|
- Flux<CommonResult<AiChatMessageSendRespVO>> textStream = streamResponse.map(chunk -> {
|
|
|
+ Flux<CommonResult<AiChatMessageSendRespVO>> textStream = streamResponse.map(chunk -> {
|
|
|
// 处理知识库的返回,只有首次才有
|
|
|
List<AiChatMessageRespVO.KnowledgeSegment> segments = null;
|
|
|
if (StrUtil.isEmpty(contentBuffer)) {
|
|
|
Map<Long, AiKnowledgeDocumentDO> documentMap = TenantUtils.executeIgnore(() ->
|
|
|
knowledgeDocumentService.getKnowledgeDocumentMap(
|
|
|
convertSet(knowledgeSegments, AiKnowledgeSegmentSearchRespBO::getDocumentId)));
|
|
|
- segments = BeanUtils.toBean(knowledgeSegments, AiChatMessageRespVO.KnowledgeSegment.class, segment -> {
|
|
|
+ segments = BeanUtils.toBean(knowledgeSegments, AiChatMessageRespVO.KnowledgeSegment.class, segment -> {
|
|
|
AiKnowledgeDocumentDO document = documentMap.get(segment.getDocumentId());
|
|
|
segment.setDocumentName(document != null ? document.getName() : null);
|
|
|
});
|
|
|
@@ -228,22 +250,26 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
newContent = StrUtil.nullToDefault(newContent, ""); // 避免 null 的情况
|
|
|
|
|
|
contentBuffer.append(newContent);
|
|
|
- contentTTSBuffer.append(newContent);
|
|
|
-// System.out.println("==============newContent: " + newContent);
|
|
|
|
|
|
- // 发送新内容到TTS服务进行语音合成
|
|
|
- if (ttsTask.get() != null) {
|
|
|
- ttsTask.get().cancel(false); // 取消之前的延迟任务
|
|
|
- }
|
|
|
- // 延迟500ms执行,合并短时间内到达的文本片段
|
|
|
- ttsTask.set(scheduler.schedule(() -> {
|
|
|
- Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
|
|
|
- if (matcher.find()) {
|
|
|
- processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
|
|
|
- } else if (contentTTSBuffer.length() > 50) { // 最长50字未结束也处理
|
|
|
- processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
|
|
|
+ // 只有当需要使用TTS服务时才处理TTS相关逻辑
|
|
|
+ if (useTts) {
|
|
|
+ contentTTSBuffer.append(newContent);
|
|
|
+ log.debug("TTS新内容: {}", newContent);
|
|
|
+
|
|
|
+ // 发送新内容到TTS服务进行语音合成
|
|
|
+ if (ttsTask.get() != null) {
|
|
|
+ ttsTask.get().cancel(false); // 取消之前的延迟任务
|
|
|
}
|
|
|
- }, 500, TimeUnit.MILLISECONDS));
|
|
|
+ // 延迟500ms执行,合并短时间内到达的文本片段
|
|
|
+ ttsTask.set(scheduler.schedule(() -> {
|
|
|
+ Matcher matcher = sentencePattern.matcher(contentTTSBuffer);
|
|
|
+ if (matcher.find()) {
|
|
|
+ processCompleteSentence(streamTtsService, contentTTSBuffer, matcher);
|
|
|
+ } else if (contentTTSBuffer.length() > 50) { // 最长50字未结束也处理
|
|
|
+ processCompleteSentence(streamTtsService, contentTTSBuffer, contentTTSBuffer.length());
|
|
|
+ }
|
|
|
+ }, 500, TimeUnit.MILLISECONDS));
|
|
|
+ }
|
|
|
|
|
|
CommonResult<AiChatMessageSendRespVO> result = success(new AiChatMessageSendRespVO()
|
|
|
.setEventType("TEXT")
|
|
|
@@ -253,95 +279,121 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
return result;
|
|
|
}).doOnComplete(() -> {
|
|
|
|
|
|
- processRemainingText(streamTtsService, contentTTSBuffer); // 处理剩余文本
|
|
|
+ // 只有当需要使用TTS服务时才处理TTS相关逻辑
|
|
|
+ if (useTts) {
|
|
|
+ processRemainingText(streamTtsService, contentTTSBuffer); // 处理剩余文本
|
|
|
|
|
|
- // 忽略租户,因为 Flux 异步无法透传租户
|
|
|
- TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
|
|
|
- new AiChatMessageDO().setId(assistantMessage.getId()).setContent(contentBuffer.toString())));
|
|
|
+ if (ttsTask.get() != null) {
|
|
|
+ ttsTask.get().cancel(false);
|
|
|
+ }
|
|
|
+ scheduler.shutdown(); // 关闭调度器
|
|
|
|
|
|
- if (ttsTask.get() != null) {
|
|
|
- ttsTask.get().cancel(false);
|
|
|
+ // 通知TTS服务文本发送完成
|
|
|
+ streamTtsService.stopTts();
|
|
|
}
|
|
|
- scheduler.shutdown(); // 关闭调度器
|
|
|
|
|
|
- // 通知TTS服务文本发送完成
|
|
|
- streamTtsService.stopTts();
|
|
|
+ // 忽略租户,因为 Flux 异步无法透传租户
|
|
|
+ TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
|
|
|
+ new AiChatMessageDO().setId(assistantMessage.getId()).setContent(contentBuffer.toString())));
|
|
|
}).doOnError(throwable -> {
|
|
|
log.error("[sendChatMessageStream][userId({}) sendReqVO({}) 发生异常]", userId, sendReqVO, throwable);
|
|
|
// 忽略租户,因为 Flux 异步无法透传租户
|
|
|
TenantUtils.executeIgnore(() -> chatMessageMapper.updateById(
|
|
|
new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage())));
|
|
|
// 发生错误时停止TTS服务
|
|
|
- streamTtsService.stopTts();
|
|
|
+ if (useTts && streamTtsService != null) {
|
|
|
+ streamTtsService.stopTts();
|
|
|
+ }
|
|
|
})
|
|
|
// ==== 添加finally块清理 ====
|
|
|
.doFinally(signalType -> {
|
|
|
// 通知TTS服务文本发送完成
|
|
|
- streamTtsService.stopTts();
|
|
|
+ if (useTts && streamTtsService != null) {
|
|
|
+ streamTtsService.stopTts();
|
|
|
+ }
|
|
|
}).onErrorResume(error -> Flux.just(error(ErrorCodeConstants.CHAT_STREAM_ERROR)));
|
|
|
|
|
|
- // 创建音频流
|
|
|
- Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.create(sink2 -> {
|
|
|
- AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
|
|
|
- streamTtsService.setAudioDataCallback(audioBytes -> {
|
|
|
- try {
|
|
|
- byte[] processedAudio;
|
|
|
- if (isFirstChunk.getAndSet(false)) {
|
|
|
- // 仅首包添加WAV头
|
|
|
- processedAudio = WavHeader.addWavHeader(audioBytes, 16000, 16, 1);
|
|
|
- log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
|
|
|
- } else {
|
|
|
- // 后续包直接使用原始PCM数据
|
|
|
- processedAudio = audioBytes;
|
|
|
+ // 创建音频流 - 只有当需要使用TTS服务时才创建音频流
|
|
|
+ Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.empty();
|
|
|
+
|
|
|
+ if (useTts) {
|
|
|
+ audioStream = Flux.create(sink2 -> {
|
|
|
+ AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
|
|
|
+ streamTtsService.setAudioDataCallback(audioBytes -> {
|
|
|
+ try {
|
|
|
+ byte[] processedAudio;
|
|
|
+ if (isFirstChunk.getAndSet(false)) {
|
|
|
+ // 仅首包添加WAV头
|
|
|
+ processedAudio = WavHeader.addWavHeader(audioBytes, 16000, 16, 1);
|
|
|
+ log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
|
|
|
+ } else {
|
|
|
+ // 后续包直接使用原始PCM数据
|
|
|
+ processedAudio = audioBytes;
|
|
|
+ }
|
|
|
+ String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
|
|
|
+ AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
|
|
|
+ audioResp.setEventType("AUDIO");
|
|
|
+ audioResp.setAudioData(base64Audio);
|
|
|
+ sink2.next(success(audioResp));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[TTS处理异常] 音频编码失败", e);
|
|
|
+ sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
|
|
|
}
|
|
|
-// byte[] processedAudio = addWavHeader(audioBytes, 24000, 16, 1);
|
|
|
- String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
|
|
|
- AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
|
|
|
- audioResp.setEventType("AUDIO");
|
|
|
- audioResp.setAudioData(base64Audio);
|
|
|
- sink2.next(success(audioResp));
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("[TTS处理异常] 音频编码失败", e);
|
|
|
- sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
|
|
|
- }
|
|
|
+ });
|
|
|
+ streamTtsService.setOnCompleteCallback(sink2::complete);
|
|
|
});
|
|
|
- streamTtsService.setOnCompleteCallback(sink2::complete);
|
|
|
- });
|
|
|
+ }
|
|
|
|
|
|
// 使用merge而非mergeSequential,确保任一流完成不阻塞其他流
|
|
|
return Flux.merge(textStream, audioStream)
|
|
|
.doFinally(signalType -> {
|
|
|
// 双重保险:无论哪个流先完成,最终都清理资源
|
|
|
- streamTtsService.setAudioDataCallback(null);
|
|
|
- streamTtsService.setOnCompleteCallback(null);
|
|
|
+ if (useTts && streamTtsService != null) {
|
|
|
+ streamTtsService.setAudioDataCallback(null);
|
|
|
+ streamTtsService.setOnCompleteCallback(null);
|
|
|
+ }
|
|
|
+ // 确保调度器被关闭
|
|
|
+ if (useTts && scheduler != null && !scheduler.isShutdown()) {
|
|
|
+ scheduler.shutdownNow();
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|
|
|
// 处理完整句子
|
|
|
private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, Matcher matcher) {
|
|
|
+ if (streamTtsService == null || buffer == null || matcher == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
String sentence = buffer.substring(0, matcher.end());
|
|
|
- System.out.println("==============[处理完整句子[[buffer: " + sentence);
|
|
|
+ log.debug("[处理完整句子][buffer: {}", sentence);
|
|
|
streamTtsService.sendText(sentence);
|
|
|
buffer.delete(0, matcher.end());
|
|
|
- log.info("TTS合成完整句: {}", sentence); // 替换System.out为日志
|
|
|
+ log.info("TTS合成完整句: {}", sentence);
|
|
|
}
|
|
|
|
|
|
// 处理指定长度文本
|
|
|
private void processCompleteSentence(StreamTtsService streamTtsService, StringBuffer buffer, int length) {
|
|
|
+ if (streamTtsService == null || buffer == null || length <= 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
String sentence = buffer.substring(0, length);
|
|
|
- System.out.println("==============[处理指定长度文本[[buffer: " + sentence);
|
|
|
+ log.debug("[处理指定长度文本][buffer: {}", sentence);
|
|
|
streamTtsService.sendText(sentence);
|
|
|
buffer.delete(0, length);
|
|
|
- log.info("TTS合成长文本: {}", sentence); // 替换System.out为日志
|
|
|
+ log.info("TTS合成长文本: {}", sentence);
|
|
|
}
|
|
|
|
|
|
// 处理剩余文本
|
|
|
private void processRemainingText(StreamTtsService streamTtsService, StringBuffer buffer) {
|
|
|
- if (!buffer.isEmpty()) {
|
|
|
- System.out.println("TTS合成剩余文本: " + buffer);
|
|
|
- streamTtsService.sendText(buffer.toString());
|
|
|
- buffer.setLength(0);
|
|
|
+ if (streamTtsService == null || buffer == null || buffer.isEmpty()) {
|
|
|
+ return;
|
|
|
}
|
|
|
+
|
|
|
+ log.info("TTS合成剩余文本: {}", buffer);
|
|
|
+ streamTtsService.sendText(buffer.toString());
|
|
|
+ buffer.setLength(0);
|
|
|
}
|
|
|
|
|
|
private List<AiKnowledgeSegmentSearchRespBO> recallKnowledgeSegment(String content,
|
|
|
@@ -507,4 +559,4 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
return chatMessageMapper.selectPage(pageReqVO);
|
|
|
}
|
|
|
|
|
|
-}
|
|
|
+}
|