STT 数据流处理

This commit is contained in:
2025-10-19 23:48:52 +08:00
parent f5f5afb649
commit 0d0c6c32f0
2 changed files with 121 additions and 106 deletions

4
.idea/misc.xml generated
View File

@@ -13,5 +13,9 @@
</set> </set>
</option> </option>
</component> </component>
<component name="PWA">
<option name="enabled" value="true" />
<option name="wasEnabledAtLeastOnce" value="true" />
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_21" project-jdk-name="21" project-jdk-type="JavaSDK" /> <component name="ProjectRootManager" version="2" languageLevel="JDK_21" project-jdk-name="21" project-jdk-type="JavaSDK" />
</project> </project>

View File

@@ -6,6 +6,7 @@ import cn.hutool.json.JSONUtil;
import com.vetti.common.ai.elevenLabs.ElevenLabsClient; import com.vetti.common.ai.elevenLabs.ElevenLabsClient;
import com.vetti.common.ai.gpt.OpenAiStreamClient; import com.vetti.common.ai.gpt.OpenAiStreamClient;
import com.vetti.common.ai.gpt.service.OpenAiStreamListenerService; import com.vetti.common.ai.gpt.service.OpenAiStreamListenerService;
import com.vetti.common.ai.whisper.WhisperClient;
import com.vetti.common.config.RuoYiConfig; import com.vetti.common.config.RuoYiConfig;
import com.vetti.common.utils.spring.SpringUtils; import com.vetti.common.utils.spring.SpringUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -13,17 +14,16 @@ import okhttp3.*;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.AudioInputStream; import javax.sound.sampled.AudioInputStream;
import javax.sound.sampled.AudioSystem; import javax.sound.sampled.AudioSystem;
import javax.sound.sampled.UnsupportedAudioFileException;
import javax.websocket.*; import javax.websocket.*;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.*; import java.io.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Base64; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -70,6 +70,12 @@ public class ChatWebSocketHandler {
*/ */
private final Map<String, WebSocket> cacheWebSocket = new ConcurrentHashMap<>(); private final Map<String, WebSocket> cacheWebSocket = new ConcurrentHashMap<>();
/**
* 为每个会话维护分片缓存(线程安全,支持多用户)
*/
private final ConcurrentHashMap<String, List<byte[]>> fragmentCache = new ConcurrentHashMap<>();
// 语音文件保存目录 // 语音文件保存目录
private static final String VOICE_STORAGE_DIR = "/voice_files/"; private static final String VOICE_STORAGE_DIR = "/voice_files/";
@@ -97,6 +103,7 @@ public class ChatWebSocketHandler {
cacheClientTts.put(clientId,new String()); cacheClientTts.put(clientId,new String());
//初始化STT流式语音转换文本的socket链接 //初始化STT流式语音转换文本的socket链接
createWhisperRealtimeSocket(clientId); createWhisperRealtimeSocket(clientId);
} }
// 接收文本消息 // 接收文本消息
@@ -109,16 +116,30 @@ public class ChatWebSocketHandler {
Map<String,String> mapResult = JSONUtil.toBean(JSONUtil.parseObj(message),Map.class); Map<String,String> mapResult = JSONUtil.toBean(JSONUtil.parseObj(message),Map.class);
String resultFlag = mapResult.get("msg"); String resultFlag = mapResult.get("msg");
if("done".equals(resultFlag)){ if("done".equals(resultFlag)){
log.info("1、开始处理时间:{}",System.currentTimeMillis()/1000);
//开始合并语音流
List<byte[]> fragments = fragmentCache.get(clientId);
// 合并所有分片为完整语音数据
byte[] fullVoiceData = mergeFragments(fragments);
// 生成唯一文件名
String fileName = clientId + "_" + System.currentTimeMillis() + ".webm";
String pathUrl = RuoYiConfig.getProfile()+VOICE_STORAGE_DIR + fileName;
log.info("文件路径为:{}", pathUrl);
log.info("文件流的大小为:{}",fullVoiceData.length);
saveAsWebM(fullVoiceData,pathUrl);
//开始转换
WhisperClient whisperClient = SpringUtils.getBean(WhisperClient.class);
String cacheResultText = whisperClient.handleVoiceToText(pathUrl);
//发送消息 //发送消息
WebSocket webSocket = cacheWebSocket.get(clientId); // WebSocket webSocket = cacheWebSocket.get(clientId);
// webSocket.send("{\"type\": \"input_audio_buffer.commit\"}"); // webSocket.send("{\"type\": \"input_audio_buffer.commit\"}");
// webSocket.send("{\"type\": \"response.create\"}"); // webSocket.send("{\"type\": \"response.create\"}");
// if(webSocket != null){ // if(webSocket != null){
// webSocket.close(1000,null); // webSocket.close(1000,null);
// } // }
//语音结束,开始进行回答解析 //语音结束,开始进行回答解析
String cacheResultText = cacheClientTts.get(clientId); // String cacheResultText = cacheClientTts.get(clientId);
log.info("返回的结果为:{}",cacheResultText); log.info("返回的结果为:{}",cacheResultText);
if(StrUtil.isEmpty(cacheResultText)){ if(StrUtil.isEmpty(cacheResultText)){
cacheResultText = "Hello , How are you?"; cacheResultText = "Hello , How are you?";
@@ -179,6 +200,51 @@ public class ChatWebSocketHandler {
} }
} }
// // 接收二进制消息(流数据)
// @OnMessage
// public void onBinaryMessage(Session session, @PathParam("clientId") String clientId, ByteBuffer byteBuffer) {
// log.info("1、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
// log.info("客户端ID为:{}", clientId);
// // 处理二进制流数据
// byte[] bytes = new byte[byteBuffer.remaining()];
// //从缓冲区中读取数据并存储到指定的字节数组中
// byteBuffer.get(bytes);
// log.info("2、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
// // 生成唯一文件名
// String fileName = clientId + "_" + System.currentTimeMillis() + ".wav";
// String pathUrl = RuoYiConfig.getProfile()+VOICE_STORAGE_DIR + fileName;
// log.info("文件路径为:{}", pathUrl);
// log.info("3、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
// try{
// log.info("文件流的大小为:{}",bytes.length);
// saveAsWebM(bytes,pathUrl);
// //接收到数据流后直接就进行SST处理
// //发送消息
// WebSocket webSocket = cacheWebSocket.get(clientId);
// log.info("获取的socket对象为:{}",webSocket);
// if(webSocket != null){
//// 1. 启动音频缓冲
//// webSocket.send("{\"type\": \"input_audio_buffer.start\"}");
// log.info("3.1 开始发送数据音频流啦");
// // 将音频数据转换为 Base64 编码的字符串
// //进行转换
// // 转换音频格式
// AudioFormat format = new AudioFormat(SAMPLE_RATE, BITS_PER_SAMPLE, 1, true, false);
// byte[] outputAudioBytes = convertAudio(bytes, format);
// String base64Audio = Base64.getEncoder().encodeToString(outputAudioBytes);
// String message = "{ \"type\": \"input_audio_buffer.append\", \"audio\": \"" + base64Audio + "\" }";
// webSocket.send(message);
// log.info("4、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
// // 3. 提交音频并请求转录
//// webSocket.send("{\"type\": \"input_audio_buffer.commit\"}");
//// webSocket.send("{\"type\": \"response.create\"}");
// }
// }catch (Exception e){
// e.printStackTrace();
// }
//
// }
// 接收二进制消息(流数据) // 接收二进制消息(流数据)
@OnMessage @OnMessage
public void onBinaryMessage(Session session, @PathParam("clientId") String clientId, ByteBuffer byteBuffer) { public void onBinaryMessage(Session session, @PathParam("clientId") String clientId, ByteBuffer byteBuffer) {
@@ -188,40 +254,15 @@ public class ChatWebSocketHandler {
byte[] bytes = new byte[byteBuffer.remaining()]; byte[] bytes = new byte[byteBuffer.remaining()];
//从缓冲区中读取数据并存储到指定的字节数组中 //从缓冲区中读取数据并存储到指定的字节数组中
byteBuffer.get(bytes); byteBuffer.get(bytes);
log.info("2、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
// 生成唯一文件名
String fileName = clientId + "_" + System.currentTimeMillis() + ".wav";
String pathUrl = RuoYiConfig.getProfile()+VOICE_STORAGE_DIR + fileName;
log.info("文件路径为:{}", pathUrl);
log.info("3、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
try{
log.info("文件流的大小为:{}",bytes.length);
saveAsWebM(bytes,pathUrl);
//接收到数据流后直接就进行SST处理
//发送消息
WebSocket webSocket = cacheWebSocket.get(clientId);
log.info("获取的socket对象为:{}",webSocket);
if(webSocket != null){
// 1. 启动音频缓冲
// webSocket.send("{\"type\": \"input_audio_buffer.start\"}");
log.info("3.1 开始发送数据音频流啦");
// 将音频数据转换为 Base64 编码的字符串
//进行转换
// 转换音频格式
AudioFormat format = new AudioFormat(SAMPLE_RATE, BITS_PER_SAMPLE, 1, true, false);
byte[] outputAudioBytes = convertAudio(bytes, format);
String base64Audio = Base64.getEncoder().encodeToString(outputAudioBytes);
String message = "{ \"type\": \"input_audio_buffer.append\", \"audio\": \"" + base64Audio + "\" }";
webSocket.send(message);
log.info("4、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
// 3. 提交音频并请求转录
// webSocket.send("{\"type\": \"input_audio_buffer.commit\"}");
// webSocket.send("{\"type\": \"response.create\"}");
}
}catch (Exception e){
e.printStackTrace();
}
// 1. 获取当前会话的缓存
List<byte[]> fragments = fragmentCache.get(clientId);
if (fragments == null) {
fragments = new ArrayList<>();
fragmentCache.put(clientId, fragments);
}
fragments.add(bytes);
fragmentCache.put(clientId, fragments);
} }
// 连接关闭时调用 // 连接关闭时调用
@@ -237,27 +278,27 @@ public class ChatWebSocketHandler {
throwable.printStackTrace(); throwable.printStackTrace();
} }
public static byte[] convertAudio(byte[] inputAudioBytes, AudioFormat targetFormat) throws Exception { // public static byte[] convertAudio(byte[] inputAudioBytes, AudioFormat targetFormat) throws Exception {
// 将 byte[] 转换为 AudioInputStream // // 将 byte[] 转换为 AudioInputStream
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputAudioBytes); // ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(inputAudioBytes);
AudioInputStream inputAudioStream = new AudioInputStream(byteArrayInputStream, targetFormat, inputAudioBytes.length); // AudioInputStream inputAudioStream = new AudioInputStream(byteArrayInputStream, targetFormat, inputAudioBytes.length);
//
// 创建目标格式的 AudioInputStream // // 创建目标格式的 AudioInputStream
AudioInputStream outputAudioStream = AudioSystem.getAudioInputStream(targetFormat, inputAudioStream); // AudioInputStream outputAudioStream = AudioSystem.getAudioInputStream(targetFormat, inputAudioStream);
//
// 获取输出音频的 byte[] // // 获取输出音频的 byte[]
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); // ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[1024]; // byte[] buffer = new byte[1024];
int bytesRead; // int bytesRead;
//
// 从 AudioInputStream 读取数据并写入 ByteArrayOutputStream // // 从 AudioInputStream 读取数据并写入 ByteArrayOutputStream
while ((bytesRead = outputAudioStream.read(buffer)) != -1) { // while ((bytesRead = outputAudioStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, bytesRead); // byteArrayOutputStream.write(buffer, 0, bytesRead);
} // }
//
// 返回转换后的 byte[] // // 返回转换后的 byte[]
return byteArrayOutputStream.toByteArray(); // return byteArrayOutputStream.toByteArray();
} // }
/** /**
* 将字节数组保存为WebM文件 * 将字节数组保存为WebM文件
@@ -405,56 +446,26 @@ public class ChatWebSocketHandler {
} }
} }
private void handleVoice(String inputPath,String outputPath){
double trimMs = 270; // 要去掉的尾部时长(毫秒)
try {
// 1. 解析音频格式和总长度
AudioInputStream audioIn = AudioSystem.getAudioInputStream(new File(inputPath));
AudioFormat format = audioIn.getFormat();
long totalBytes = audioIn.getFrameLength() * format.getFrameSize(); // 总字节数
// 2. 计算300毫秒对应的字节数 /**
float sampleRate = format.getSampleRate(); // 采样率Hz * 合并分片数组为完整字节数组
int frameSize = format.getFrameSize(); // 每帧字节数(位深/8 * 声道数) */
double trimSeconds = trimMs / 1000.0; // 转换为秒 private byte[] mergeFragments(List<byte[]> fragments) {
long trimBytes = (long) (sampleRate * trimSeconds * frameSize); // 要去掉的字节数 // 计算总长度
int totalLength = 0;
// 3. 计算需要保留的字节数(避免负数) for (byte[] fragment : fragments) {
long keepBytes = Math.max(0, totalBytes - trimBytes); totalLength += fragment.length;
if (keepBytes == 0) {
System.out.println("音频长度小于300毫秒无法截断");
return;
}
File file = new File(outputPath);
// 创建空文件
boolean isCreated = file.createNewFile();
if (isCreated) {
System.out.println("空文件创建成功:" + file.getAbsolutePath());
} else {
System.out.println("文件已存在:" + file.getAbsolutePath());
}
// 4. 读取并保留前半部分去掉最后300毫秒
try (InputStream in = new FileInputStream(inputPath);
OutputStream out = new FileOutputStream(outputPath)) {
byte[] buffer = new byte[4096];
long totalRead = 0;
int bytesRead;
while (totalRead < keepBytes && (bytesRead = in.read(buffer)) != -1) {
long remaining = keepBytes - totalRead;
int writeBytes = (remaining < bytesRead) ? (int) remaining : bytesRead;
out.write(buffer, 0, writeBytes);
totalRead += writeBytes;
}
System.out.println("处理完成,去掉了最后" + trimMs + "毫秒,保留了" + totalRead + "字节");
}
} catch (UnsupportedAudioFileException | IOException e) {
e.printStackTrace();
} }
// 拼接所有分片
byte[] result = new byte[totalLength];
int offset = 0;
for (byte[] fragment : fragments) {
System.arraycopy(fragment, 0, result, offset, fragment.length);
offset += fragment.length;
}
return result;
} }
} }