STT流式输入业务逻辑处理

This commit is contained in:
2025-10-18 23:02:42 +08:00
parent 9ed89d2015
commit 1bce9c4fa3
5 changed files with 429 additions and 13 deletions

View File

@@ -1,6 +1,7 @@
package com.vetti.socket;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.vetti.common.ai.elevenLabs.ElevenLabsClient;
import com.vetti.common.ai.gpt.OpenAiStreamClient;
@@ -9,9 +10,15 @@ import com.vetti.common.ai.whisper.WhisperClient;
import com.vetti.common.config.RuoYiConfig;
import com.vetti.common.utils.spring.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.AudioSystem;
import javax.sound.sampled.DataLine;
import javax.sound.sampled.TargetDataLine;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@@ -19,9 +26,11 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
* 语音面试 web处理器
@@ -31,11 +40,41 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class ChatWebSocketHandler {
@Value("${whisper.apiUrl}")
private String API_URL;
@Value("${whisper.model}")
private String MODEL;
@Value("${whisper.apiKey}")
private String apiKey;
@Value("${whisper.language}")
private String language;
/**
* 16kHz
*/
private static final int SAMPLE_RATE = 16000;
/**
* 4 KB 每次读取
*/
private static final int BUFFER_SIZE = 4096;
/**
* 每样本 16 位
*/
private static final int BITS_PER_SAMPLE = 16;
/**
* 缓存客户端流式解析的语音文本数据
*/
private final Map<String,String> cacheClientTts = new ConcurrentHashMap<>();
/**
* 缓存客户端调用OpenAi中的websocket-STT 流式传输数据
*/
private final Map<String, WebSocket> cacheWebSocket = new ConcurrentHashMap<>();
// 语音文件保存目录
private static final String VOICE_STORAGE_DIR = "/voice_files/";
@@ -61,6 +100,8 @@ public class ChatWebSocketHandler {
public void onOpen(Session session, @PathParam("clientId") String clientId) {
log.info("WebSocket 链接已建立:{}", clientId);
cacheClientTts.put(clientId,new String());
//初始化STT流式语音转换文本的socket链接
createWhisperRealtimeSocket(clientId);
}
// 接收文本消息
@@ -148,20 +189,18 @@ public class ChatWebSocketHandler {
log.info("3、开始接收数据流时间:{}",System.currentTimeMillis()/1000);
try{
//接收到数据流后直接就进行SST处理
//拿到文件进行文字转换
saveAsWebM(bytes,pathUrl);
WhisperClient whisperClient = SpringUtils.getBean(WhisperClient.class);
String resultText = whisperClient.handleVoiceToText(pathUrl);
log.info("STT:{}",resultText);
//进行客户端文本数据存储
String cacheString = cacheClientTts.get(clientId);
if(StrUtil.isNotEmpty(cacheString)){
cacheString = cacheString+resultText;
}else {
cacheString = resultText;
//发送消息
WebSocket webSocket = cacheWebSocket.get(clientId);
if(webSocket != null){
log.info("3.1 开始发送数据音频流啦");
// 将音频数据转换为 Base64 编码的字符串
String base64Audio = Base64.getEncoder().encodeToString(bytes);
String message = "{ \"type\": \"input_audio_buffer.append\", \"audio\": \"" + base64Audio + "\" }";
webSocket.send(message);
// 3. 提交音频并请求转录
webSocket.send("{\"type\": \"input_audio_buffer.commit\"}");
webSocket.send("{\"type\": \"response.create\"}");
}
cacheClientTts.put(clientId,cacheString);
}catch (Exception e){
e.printStackTrace();
}
@@ -243,5 +282,87 @@ public class ChatWebSocketHandler {
return null;
}
/**
* 创建STT WebSocket 客户端链接
* @param clientId 客户端ID
*/
private void createWhisperRealtimeSocket(String clientId){
try{
OkHttpClient client = new OkHttpClient();
CountDownLatch latch = new CountDownLatch(1);
// 设置 WebSocket 请求
Request request = new Request.Builder()
.url(API_URL)
.addHeader("Authorization", "Bearer " + apiKey)
.addHeader("OpenAI-Beta", "realtime=v1")
.build();
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
System.out.println("✅ WebSocket 连接成功");
//发送配置
JSONObject config = new JSONObject();
JSONObject sessionConfig = new JSONObject();
JSONObject transcription = new JSONObject();
JSONObject turnDetection = new JSONObject();
// 配置转录参数
transcription.put("model", "gpt-4o-mini-transcribe");
transcription.put("language", language); // 中文
// 配置断句检测
turnDetection.put("type", "server_vad");
turnDetection.put("prefix_padding_ms", 300);
turnDetection.put("silence_duration_ms", 10);
// 组装完整配置
sessionConfig.put("input_audio_transcription", transcription);
sessionConfig.put("turn_detection", turnDetection);
config.put("type", "transcription_session.update");
config.put("session", sessionConfig);
webSocket.send(config.toString());
// 1. 启动音频缓冲
webSocket.send("{\"type\": \"input_audio_buffer.start\"}");
//存储客户端webSocket对象,对数据进行隔离处理
cacheWebSocket.put(clientId,webSocket);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
System.out.println("📩 收到转录结果: " + text);
//对数据进行解析
if(StrUtil.isNotEmpty(text)){
Map<String,String> mapResultData = JSONUtil.toBean(text,Map.class);
if("conversation.item.input_audio_transcription.delta".equals(mapResultData.get("type"))){
String resultText = mapResultData.get("delta");
//进行客户端文本数据存储
String cacheString = cacheClientTts.get(clientId);
if(StrUtil.isNotEmpty(cacheString)){
cacheString = cacheString+resultText;
}else {
cacheString = resultText;
}
cacheClientTts.put(clientId,cacheString);
}
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
System.err.println("❌ 连接失败: " + t.getMessage());
latch.countDown();
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
System.out.println("⚠️ 连接即将关闭: " + reason);
webSocket.close(1000, null);
latch.countDown();
}
});
// 等待 WebSocket 关闭
latch.await();
}catch (Exception e){
e.printStackTrace();
}
}
}