|
@@ -212,16 +212,45 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
AtomicReference<ScheduledFuture<?>> ttsTask;
|
|
AtomicReference<ScheduledFuture<?>> ttsTask;
|
|
|
StringBuffer contentTTSBuffer;
|
|
StringBuffer contentTTSBuffer;
|
|
|
Pattern sentencePattern;
|
|
Pattern sentencePattern;
|
|
|
|
|
+ Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.empty();
|
|
|
|
|
|
|
|
if (useTts) {
|
|
if (useTts) {
|
|
|
// 只有当需要使用TTS服务时才创建实例
|
|
// 只有当需要使用TTS服务时才创建实例
|
|
|
streamTtsService = streamTtsServiceProvider.getObject();
|
|
streamTtsService = streamTtsServiceProvider.getObject();
|
|
|
- streamTtsService.startTts(aiTtsDO);
|
|
|
|
|
-
|
|
|
|
|
contentTTSBuffer = new StringBuffer();
|
|
contentTTSBuffer = new StringBuffer();
|
|
|
sentencePattern = Pattern.compile("[。!?;\n\r]"); // 增加换行符支持
|
|
sentencePattern = Pattern.compile("[。!?;\n\r]"); // 增加换行符支持
|
|
|
scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
|
ttsTask = new AtomicReference<>();
|
|
ttsTask = new AtomicReference<>();
|
|
|
|
|
+
|
|
|
|
|
+ // 先创建音频流并设置回调,再启动TTS服务
|
|
|
|
|
+ audioStream = Flux.create(sink2 -> {
|
|
|
|
|
+ AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
|
|
|
|
|
+ streamTtsService.setAudioDataCallback(audioBytes -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ byte[] processedAudio;
|
|
|
|
|
+ if (isFirstChunk.getAndSet(false)) {
|
|
|
|
|
+ // 仅首包添加WAV头
|
|
|
|
|
+ processedAudio = WavHeader.addWavHeader(audioBytes, SampleRateEnum.SAMPLE_RATE_16K.value, 16, 1);
|
|
|
|
|
+ log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 后续包直接使用原始PCM数据
|
|
|
|
|
+ processedAudio = audioBytes;
|
|
|
|
|
+ }
|
|
|
|
|
+ String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
|
|
|
|
|
+ AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
|
|
|
|
|
+ audioResp.setEventType("AUDIO");
|
|
|
|
|
+ audioResp.setAudioData(base64Audio);
|
|
|
|
|
+ sink2.next(success(audioResp));
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("[TTS处理异常] 音频编码失败", e);
|
|
|
|
|
+ sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ streamTtsService.setOnCompleteCallback(sink2::complete);
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 启动TTS服务
|
|
|
|
|
+ streamTtsService.startTts(aiTtsDO);
|
|
|
} else {
|
|
} else {
|
|
|
streamTtsService = null;
|
|
streamTtsService = null;
|
|
|
sentencePattern = null;
|
|
sentencePattern = null;
|
|
@@ -290,7 +319,11 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
scheduler.shutdown(); // 关闭调度器
|
|
scheduler.shutdown(); // 关闭调度器
|
|
|
|
|
|
|
|
// 通知TTS服务文本发送完成
|
|
// 通知TTS服务文本发送完成
|
|
|
- streamTtsService.stopTts();
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ streamTtsService.stopTts();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("停止TTS服务失败", e);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 忽略租户,因为 Flux 异步无法透传租户
|
|
// 忽略租户,因为 Flux 异步无法透传租户
|
|
@@ -303,48 +336,29 @@ public class AiChatMessageServiceImpl implements AiChatMessageService {
|
|
|
new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage())));
|
|
new AiChatMessageDO().setId(assistantMessage.getId()).setContent(throwable.getMessage())));
|
|
|
// 发生错误时停止TTS服务
|
|
// 发生错误时停止TTS服务
|
|
|
if (useTts && streamTtsService != null) {
|
|
if (useTts && streamTtsService != null) {
|
|
|
- streamTtsService.stopTts();
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ streamTtsService.stopTts();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("停止TTS服务失败", e);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
})
|
|
})
|
|
|
// ==== 添加finally块清理 ====
|
|
// ==== 添加finally块清理 ====
|
|
|
.doFinally(signalType -> {
|
|
.doFinally(signalType -> {
|
|
|
// 通知TTS服务文本发送完成
|
|
// 通知TTS服务文本发送完成
|
|
|
if (useTts && streamTtsService != null) {
|
|
if (useTts && streamTtsService != null) {
|
|
|
- streamTtsService.stopTts();
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ streamTtsService.stopTts();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("停止TTS服务失败", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 确保调度器被关闭
|
|
|
|
|
+ if (useTts && scheduler != null && !scheduler.isShutdown()) {
|
|
|
|
|
+ scheduler.shutdownNow();
|
|
|
}
|
|
}
|
|
|
}).onErrorResume(error -> Flux.just(error(ErrorCodeConstants.CHAT_STREAM_ERROR)));
|
|
}).onErrorResume(error -> Flux.just(error(ErrorCodeConstants.CHAT_STREAM_ERROR)));
|
|
|
|
|
|
|
|
- // 创建音频流 - 只有当需要使用TTS服务时才创建音频流
|
|
|
|
|
- Flux<CommonResult<AiChatMessageSendRespVO>> audioStream = Flux.empty();
|
|
|
|
|
-
|
|
|
|
|
- if (useTts) {
|
|
|
|
|
- audioStream = Flux.create(sink2 -> {
|
|
|
|
|
- AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
|
|
|
|
|
- streamTtsService.setAudioDataCallback(audioBytes -> {
|
|
|
|
|
- try {
|
|
|
|
|
- byte[] processedAudio;
|
|
|
|
|
- if (isFirstChunk.getAndSet(false)) {
|
|
|
|
|
- // 仅首包添加WAV头
|
|
|
|
|
- processedAudio = WavHeader.addWavHeader(audioBytes, SampleRateEnum.SAMPLE_RATE_16K.value, 16, 1);
|
|
|
|
|
- log.info("首包音频带WAV头,长度={} bytes", processedAudio.length);
|
|
|
|
|
- } else {
|
|
|
|
|
- // 后续包直接使用原始PCM数据
|
|
|
|
|
- processedAudio = audioBytes;
|
|
|
|
|
- }
|
|
|
|
|
- String base64Audio = Base64.getEncoder().encodeToString(processedAudio);
|
|
|
|
|
- AiChatMessageSendRespVO audioResp = new AiChatMessageSendRespVO();
|
|
|
|
|
- audioResp.setEventType("AUDIO");
|
|
|
|
|
- audioResp.setAudioData(base64Audio);
|
|
|
|
|
- sink2.next(success(audioResp));
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("[TTS处理异常] 音频编码失败", e);
|
|
|
|
|
- sink2.error(new RuntimeException("TTS音频处理失败: " + e.getMessage(), e));
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
- streamTtsService.setOnCompleteCallback(sink2::complete);
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
// 使用merge而非mergeSequential,确保任一流完成不阻塞其他流
|
|
// 使用merge而非mergeSequential,确保任一流完成不阻塞其他流
|
|
|
return Flux.merge(textStream, audioStream)
|
|
return Flux.merge(textStream, audioStream)
|
|
|
.doFinally(signalType -> {
|
|
.doFinally(signalType -> {
|