|
|
@@ -68,15 +68,6 @@ public class WebAiServiceImpl {
|
|
|
@Resource
|
|
|
private StreamingDouBaoTtsService streamingDouBaoTtsService;
|
|
|
|
|
|
- // 豆包TTS的sink引用,用于发送音频数据
|
|
|
- private AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef;
|
|
|
-
|
|
|
- // 标记是否是首次发送豆包TTS音频数据
|
|
|
- private final AtomicBoolean isFirstDouBaoAudio = new AtomicBoolean(true);
|
|
|
-
|
|
|
- // 豆包TTS任务计数器
|
|
|
- private final AtomicInteger douBaoTtsTaskCount = new AtomicInteger(0);
|
|
|
-
|
|
|
/**
|
|
|
* 发送指定回答的SSE流式响应
|
|
|
* 确保TEXT类型文本数据优先且可靠地发送到前端,同时提供AUDIO音频流
|
|
|
@@ -110,10 +101,6 @@ public class WebAiServiceImpl {
|
|
|
if (isDouBaoTts) {
|
|
|
// 初始化豆包TTS服务
|
|
|
streamingDouBaoTtsService.startTts(aiTtsDO);
|
|
|
- // 重置豆包TTS的sink引用
|
|
|
- this.douBaoSinkRef = new AtomicReference<>();
|
|
|
- // 重置豆包TTS的首次音频标记
|
|
|
- isFirstDouBaoAudio.set(true);
|
|
|
} else {
|
|
|
// 初始化阿里云TTS服务
|
|
|
streamingAliyunTtsService.startTts(aiTtsDO);
|
|
|
@@ -164,6 +151,12 @@ public class WebAiServiceImpl {
|
|
|
StringBuilder contentTTSBuffer = new StringBuilder(contentAnswer);
|
|
|
AtomicBoolean ttsStopped = new AtomicBoolean(false);
|
|
|
AtomicBoolean allTextProcessed = new AtomicBoolean(false);
|
|
|
+ // 豆包TTS的sink引用,用于发送音频数据
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef = new AtomicReference<>();
|
|
|
+ // 标记是否是首次发送豆包TTS音频数据
|
|
|
+ AtomicBoolean isFirstDouBaoAudio = new AtomicBoolean(true);
|
|
|
+ // 豆包TTS任务计数器
|
|
|
+ AtomicInteger douBaoTtsTaskCount = new AtomicInteger(0);
|
|
|
|
|
|
try {
|
|
|
// 发送文本数据(带type)
|
|
|
@@ -172,7 +165,7 @@ public class WebAiServiceImpl {
|
|
|
// 处理音频流
|
|
|
if (isDouBaoTts) {
|
|
|
// 为豆包TTS设置sink
|
|
|
- this.douBaoSinkRef.set(sink);
|
|
|
+ douBaoSinkRef.set(sink);
|
|
|
} else {
|
|
|
// 为阿里云TTS设置音频数据回调
|
|
|
AtomicBoolean isFirstChunk = new AtomicBoolean(true); // 首包标志位
|
|
|
@@ -202,7 +195,7 @@ public class WebAiServiceImpl {
|
|
|
Pattern sentencePattern = Pattern.compile("[。!?;\n\r]");
|
|
|
if (isDouBaoTts) {
|
|
|
processTextSegmentsForDouBao(aiTtsDO, contentTTSBuffer, sentencePattern,
|
|
|
- scheduler, ttsTask, ttsStopped, allTextProcessed, sink);
|
|
|
+ scheduler, ttsTask, ttsStopped, allTextProcessed, sink, douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
} else {
|
|
|
processTextSegments(streamingAliyunTtsService, contentTTSBuffer, sentencePattern,
|
|
|
scheduler, ttsTask, ttsStopped, allTextProcessed, sink);
|
|
|
@@ -522,10 +515,11 @@ public class WebAiServiceImpl {
|
|
|
private void createAndSubscribeToDouBaoAudioStream(FluxSink<CommonResult<AiChatMessageSendRespVO>> mainSink,
|
|
|
ScheduledExecutorService scheduler,
|
|
|
AtomicReference<ScheduledFuture<?>> ttsTask,
|
|
|
- AtomicBoolean ttsStopped, AiTtsDO aiTtsDO) {
|
|
|
+ AtomicBoolean ttsStopped, AiTtsDO aiTtsDO,
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef) {
|
|
|
Flux.<CommonResult<AiChatMessageSendRespVO>>create(audioSink -> {
|
|
|
// 保存豆包TTS的sink引用
|
|
|
- this.douBaoSinkRef.set(audioSink);
|
|
|
+ douBaoSinkRef.set(audioSink);
|
|
|
}).subscribe(
|
|
|
chunk -> {
|
|
|
if (!mainSink.isCancelled()) {
|
|
|
@@ -565,31 +559,33 @@ public class WebAiServiceImpl {
|
|
|
private void processTextSegmentsForDouBao(AiTtsDO aiTtsDO, StringBuilder buffer,
|
|
|
Pattern sentencePattern, ScheduledExecutorService scheduler,
|
|
|
AtomicReference<ScheduledFuture<?>> ttsTask, AtomicBoolean ttsStopped,
|
|
|
- AtomicBoolean allTextProcessed, FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
|
|
|
+ AtomicBoolean allTextProcessed, FluxSink<CommonResult<AiChatMessageSendRespVO>> sink,
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef,
|
|
|
+ AtomicBoolean isFirstDouBaoAudio, AtomicInteger douBaoTtsTaskCount) {
|
|
|
if (buffer.isEmpty()) {
|
|
|
log.info("文本为空,无需处理");
|
|
|
- handleDouBaoTextComplete(aiTtsDO, scheduler, ttsStopped, allTextProcessed, sink);
|
|
|
+ handleDouBaoTextComplete(aiTtsDO, scheduler, ttsStopped, allTextProcessed, sink, douBaoSinkRef, douBaoTtsTaskCount);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// 立即处理文本
|
|
|
Matcher matcher = sentencePattern.matcher(buffer);
|
|
|
if (matcher.find()) {
|
|
|
- processDouBaoCompleteSentence(aiTtsDO, buffer, matcher);
|
|
|
+ processDouBaoCompleteSentence(aiTtsDO, buffer, matcher, douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
// 继续调度处理剩余文本
|
|
|
scheduleDouBaoNextProcessing(aiTtsDO, buffer, sentencePattern, scheduler,
|
|
|
- ttsTask, ttsStopped, allTextProcessed, sink);
|
|
|
+ ttsTask, ttsStopped, allTextProcessed, sink, douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
} else if (buffer.length() > 50) { // 最长50字未结束也处理
|
|
|
- processDouBaoCompleteSentence(aiTtsDO, buffer, buffer.length());
|
|
|
+ processDouBaoCompleteSentence(aiTtsDO, buffer, buffer.length(), douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
// 继续调度处理剩余文本
|
|
|
scheduleDouBaoNextProcessing(aiTtsDO, buffer, sentencePattern, scheduler,
|
|
|
- ttsTask, ttsStopped, allTextProcessed, sink);
|
|
|
+ ttsTask, ttsStopped, allTextProcessed, sink, douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
} else {
|
|
|
// 文本较短且未结束,直接处理全部
|
|
|
log.info("豆包TTS合成短文本: {}", buffer.toString());
|
|
|
- processDouBaoTts(aiTtsDO, buffer.toString());
|
|
|
+ processDouBaoTts(aiTtsDO, buffer.toString(), douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
buffer.setLength(0);
|
|
|
- handleDouBaoTextComplete(aiTtsDO, scheduler, ttsStopped, allTextProcessed, sink);
|
|
|
+ handleDouBaoTextComplete(aiTtsDO, scheduler, ttsStopped, allTextProcessed, sink, douBaoSinkRef, douBaoTtsTaskCount);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -599,7 +595,9 @@ public class WebAiServiceImpl {
|
|
|
private void scheduleDouBaoNextProcessing(AiTtsDO aiTtsDO, StringBuilder buffer,
|
|
|
Pattern sentencePattern, ScheduledExecutorService scheduler,
|
|
|
AtomicReference<ScheduledFuture<?>> ttsTask, AtomicBoolean ttsStopped,
|
|
|
- AtomicBoolean allTextProcessed, FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
|
|
|
+ AtomicBoolean allTextProcessed, FluxSink<CommonResult<AiChatMessageSendRespVO>> sink,
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef,
|
|
|
+ AtomicBoolean isFirstDouBaoAudio, AtomicInteger douBaoTtsTaskCount) {
|
|
|
if (!buffer.isEmpty()) {
|
|
|
// 延迟200ms执行,合并短时间内处理的文本片段
|
|
|
if (ttsTask.get() != null) {
|
|
|
@@ -607,11 +605,11 @@ public class WebAiServiceImpl {
|
|
|
}
|
|
|
ttsTask.set(scheduler.schedule(() -> {
|
|
|
processTextSegmentsForDouBao(aiTtsDO, buffer, sentencePattern, scheduler,
|
|
|
- ttsTask, ttsStopped, allTextProcessed, sink);
|
|
|
+ ttsTask, ttsStopped, allTextProcessed, sink, douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
}, 200, TimeUnit.MILLISECONDS));
|
|
|
} else {
|
|
|
// 所有文本处理完毕
|
|
|
- handleDouBaoTextComplete(aiTtsDO, scheduler, ttsStopped, allTextProcessed, sink);
|
|
|
+ handleDouBaoTextComplete(aiTtsDO, scheduler, ttsStopped, allTextProcessed, sink, douBaoSinkRef, douBaoTtsTaskCount);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -620,7 +618,9 @@ public class WebAiServiceImpl {
|
|
|
*/
|
|
|
private void handleDouBaoTextComplete(AiTtsDO aiTtsDO, ScheduledExecutorService scheduler,
|
|
|
AtomicBoolean ttsStopped, AtomicBoolean allTextProcessed,
|
|
|
- FluxSink<CommonResult<AiChatMessageSendRespVO>> sink) {
|
|
|
+ FluxSink<CommonResult<AiChatMessageSendRespVO>> sink,
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef,
|
|
|
+ AtomicInteger douBaoTtsTaskCount) {
|
|
|
allTextProcessed.set(true);
|
|
|
log.info("所有文本处理完毕,准备通知TTS服务文本已发送完毕");
|
|
|
|
|
|
@@ -634,8 +634,8 @@ public class WebAiServiceImpl {
|
|
|
// 再等待一段时间,确保所有音频数据都已发送
|
|
|
Thread.sleep(500);
|
|
|
// 关闭豆包TTS音频流
|
|
|
- if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
|
|
|
- this.douBaoSinkRef.get().complete();
|
|
|
+ if (douBaoSinkRef != null && douBaoSinkRef.get() != null) {
|
|
|
+ douBaoSinkRef.get().complete();
|
|
|
log.info("豆包TTS音频流已关闭");
|
|
|
// 停止豆包TTS服务
|
|
|
streamingDouBaoTtsService.stopTts();
|
|
|
@@ -680,9 +680,11 @@ public class WebAiServiceImpl {
|
|
|
/**
|
|
|
* 处理豆包TTS完整句子
|
|
|
*/
|
|
|
- private void processDouBaoCompleteSentence(AiTtsDO aiTtsDO, StringBuilder buffer, Matcher matcher) {
|
|
|
+ private void processDouBaoCompleteSentence(AiTtsDO aiTtsDO, StringBuilder buffer, Matcher matcher,
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef,
|
|
|
+ AtomicBoolean isFirstDouBaoAudio, AtomicInteger douBaoTtsTaskCount) {
|
|
|
String sentence = buffer.substring(0, matcher.end());
|
|
|
- processDouBaoTts(aiTtsDO, sentence);
|
|
|
+ processDouBaoTts(aiTtsDO, sentence, douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
buffer.delete(0, matcher.end());
|
|
|
log.info("豆包TTS合成完整句: {}", sentence);
|
|
|
}
|
|
|
@@ -690,9 +692,11 @@ public class WebAiServiceImpl {
|
|
|
/**
|
|
|
* 处理豆包TTS指定长度文本
|
|
|
*/
|
|
|
- private void processDouBaoCompleteSentence(AiTtsDO aiTtsDO, StringBuilder buffer, int length) {
|
|
|
+ private void processDouBaoCompleteSentence(AiTtsDO aiTtsDO, StringBuilder buffer, int length,
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef,
|
|
|
+ AtomicBoolean isFirstDouBaoAudio, AtomicInteger douBaoTtsTaskCount) {
|
|
|
String sentence = buffer.substring(0, length);
|
|
|
- processDouBaoTts(aiTtsDO, sentence);
|
|
|
+ processDouBaoTts(aiTtsDO, sentence, douBaoSinkRef, isFirstDouBaoAudio, douBaoTtsTaskCount);
|
|
|
buffer.delete(0, length);
|
|
|
log.info("豆包TTS合成长文本: {}", sentence);
|
|
|
}
|
|
|
@@ -700,7 +704,9 @@ public class WebAiServiceImpl {
|
|
|
/**
|
|
|
* 处理豆包TTS合成
|
|
|
*/
|
|
|
- private void processDouBaoTts(AiTtsDO aiTtsDO, String text) {
|
|
|
+ private void processDouBaoTts(AiTtsDO aiTtsDO, String text,
|
|
|
+ AtomicReference<FluxSink<CommonResult<AiChatMessageSendRespVO>>> douBaoSinkRef,
|
|
|
+ AtomicBoolean isFirstDouBaoAudio, AtomicInteger douBaoTtsTaskCount) {
|
|
|
if (text == null || text.trim().isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
@@ -739,9 +745,9 @@ public class WebAiServiceImpl {
|
|
|
log.info("豆包TTS合成成功,准备发送音频数据");
|
|
|
|
|
|
// 将音频数据发送到前端
|
|
|
- if (this.douBaoSinkRef != null && this.douBaoSinkRef.get() != null) {
|
|
|
+ if (douBaoSinkRef != null && douBaoSinkRef.get() != null) {
|
|
|
try {
|
|
|
- this.douBaoSinkRef.get().next(success(audioResp));
|
|
|
+ douBaoSinkRef.get().next(success(audioResp));
|
|
|
log.info("豆包TTS音频数据发送成功");
|
|
|
} catch (Exception e) {
|
|
|
log.error("发送豆包TTS音频数据失败", e);
|