diff --git a/vetti-common/src/main/java/com/vetti/common/ai/elevenLabs/ElevenLabsStreamClient.java b/vetti-common/src/main/java/com/vetti/common/ai/elevenLabs/ElevenLabsStreamClient.java index a4c13b4..20711ed 100644 --- a/vetti-common/src/main/java/com/vetti/common/ai/elevenLabs/ElevenLabsStreamClient.java +++ b/vetti-common/src/main/java/com/vetti/common/ai/elevenLabs/ElevenLabsStreamClient.java @@ -4,7 +4,6 @@ import cn.hutool.json.JSONUtil; import com.google.gson.Gson; import com.vetti.common.ai.elevenLabs.vo.VoiceSettings; import com.vetti.common.ai.elevenLabs.vo.VoicesResponse; -import com.vetti.common.config.RuoYiConfig; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -19,7 +18,9 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.websocket.Session; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -64,48 +65,50 @@ public class ElevenLabsStreamClient { HttpEntity responseEntity = response.getEntity(); if (responseEntity != null) { try (InputStream inputStream = responseEntity.getContent();) { + // 锁(数组是为了内部类可写) + boolean[] sending = new boolean[]{false}; //用来合并零散的碎片 - ByteArrayOutputStream smallChunkBuffer = new ByteArrayOutputStream(); // + ByteArrayOutputStream smallChunkBuffer = new ByteArrayOutputStream(); byte[] buffer = new byte[4096]; int bytesRead; - int n = 0; while ((bytesRead = inputStream.read(buffer)) != -1) { + try{ + // 如果上一包还在发送 → 阻塞等待 + while (sending[0]) { + // 轻量等待,不耗 CPU + Thread.sleep(1); + } + }catch (Exception e){} //语音流合并到2KB左右进行发送 if(smallChunkBuffer.size() >= 3072){ + // 🔥开始发送 → 上锁 + sending[0] = true; log.info("语音流大于"+smallChunkBuffer.size()+"啦,发送完成!!!"); byte[] merged = smallChunkBuffer.toByteArray(); smallChunkBuffer.reset(); - session.getAsyncRemote().sendBinary(ByteBuffer.wrap(merged)); - try { - Thread.sleep(50); - }catch (Exception e){} + session.getAsyncRemote().sendBinary(ByteBuffer.wrap(merged), result -> { + if (!result.isOK()) { + log.info("发送失败: " + result.getException()); + } + // 🔥发送完毕 → 解锁 + sending[0] = false; + }); } - //发送三次告诉前端要合成一次语音 -// if(n == 2){ -// Map dataText = new HashMap<>(); -// dataText.put("type","voiceMiddleEnd"); -// dataText.put("content",""); -// session.getBasicRemote().sendText(JSONUtil.toJsonStr(dataText)); -// //重置一下 -// n = 0; -// } // 零散的碎片 → 加入缓冲区,不立即发送 smallChunkBuffer.write(buffer, 0, bytesRead); - n++; } //都加完缓冲区,最最后一次发送 if(smallChunkBuffer.size() > 2){ - log.info("最后一次发送,语音流大于"+smallChunkBuffer.size()+"啦,发送完成!!!"); + // 如果上一包还在发送 → 阻塞等待 byte[] merged = smallChunkBuffer.toByteArray(); smallChunkBuffer.reset(); - session.getAsyncRemote().sendBinary(ByteBuffer.wrap(merged)); - try { - Thread.sleep(50); - }catch (Exception e){} + session.getAsyncRemote().sendBinary(ByteBuffer.wrap(merged), result -> { + log.info("最后一次发送,语音流大于"+smallChunkBuffer.size()+"啦,发送完成!!!"); + }); } //返回结束点 try { - Thread.sleep(50); + Thread.sleep(20); }catch (Exception e){} Map dataText = new HashMap<>(); dataText.put("type","voiceEnd");