From 54053728abb20ba9aee01996e21bbdce17d2c219 Mon Sep 17 00:00:00 2001 From: wangxiangshun Date: Sun, 5 Oct 2025 13:08:40 +0800 Subject: [PATCH] =?UTF-8?q?websocket=20=E7=94=9F=E6=88=90=E5=AE=8C?= =?UTF-8?q?=E6=95=B4=E7=9A=84=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vetti/socket/VoiceWebSocketHandler.java | 226 ++++++++---------- .../com/vetti/socket/vo/FileMetadata.java | 20 ++ .../vetti/socket/vo/FileTransferState.java | 27 +++ 3 files changed, 153 insertions(+), 120 deletions(-) create mode 100644 vetti-admin/src/main/java/com/vetti/socket/vo/FileMetadata.java create mode 100644 vetti-admin/src/main/java/com/vetti/socket/vo/FileTransferState.java diff --git a/vetti-admin/src/main/java/com/vetti/socket/VoiceWebSocketHandler.java b/vetti-admin/src/main/java/com/vetti/socket/VoiceWebSocketHandler.java index d9e24ca..86b2898 100644 --- a/vetti-admin/src/main/java/com/vetti/socket/VoiceWebSocketHandler.java +++ b/vetti-admin/src/main/java/com/vetti/socket/VoiceWebSocketHandler.java @@ -1,156 +1,142 @@ package com.vetti.socket; -import com.vetti.socket.vo.VoicePartMessage; -import org.springframework.stereotype.Component; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; +import com.vetti.socket.vo.FileMetadata; +import com.vetti.socket.vo.FileTransferState; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.*; import org.springframework.web.socket.handler.TextWebSocketHandler; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.nio.ByteBuffer; +import java.nio.file.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; -import java.util.Base64; -@Component +@Slf4j public class VoiceWebSocketHandler extends TextWebSocketHandler { - - // 存储每个客户端的语音分片,key: clientId, value: 分片映射 - private final Map> clientVoiceParts = new ConcurrentHashMap<>(); - // 存储每个客户端的总分片数,key: clientId - private final Map clientTotalParts = new ConcurrentHashMap<>(); - // 用于并发控制的锁 - private final Map clientLocks = new ConcurrentHashMap<>(); - // JSON序列化工具 + // 存储客户端的文件传输状态:clientId -> FileTransferState + private final Map transferStates = new ConcurrentHashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); - // 语音文件保存目录 - private static final String VOICE_STORAGE_DIR = "voice_files/"; + private static final String STORAGE_DIR = "received_files/"; - public VoiceWebSocketHandler() { - // 初始化存储目录 - File dir = new File(VOICE_STORAGE_DIR); - if (!dir.exists()) { - dir.mkdirs(); + // 初始化存储目录 + static { + try { + Files.createDirectories(Paths.get(STORAGE_DIR)); + } catch (IOException e) { + throw new RuntimeException("无法创建文件存储目录", e); } } @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { - String clientId = getClientId(session); - if (clientId != null) { - // 初始化客户端数据结构 - clientVoiceParts.put(clientId, new TreeMap<>()); // TreeMap保证分片有序 - clientLocks.putIfAbsent(clientId, new ReentrantLock()); - System.out.println("客户端连接建立: " + clientId); - } + String clientId = (String) session.getAttributes().get("clientId"); + transferStates.put(clientId, new FileTransferState()); + System.out.println("客户端连接: " + clientId); } + // 处理文本消息(文件元数据) @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - String clientId = getClientId(session); - if (clientId == null) { - System.err.println("无法获取客户端ID"); - return; - } + String clientId = (String) session.getAttributes().get("clientId"); + FileMetadata metadata = objectMapper.readValue(message.getPayload(), FileMetadata.class); - try { - // 解析前端发送的JSON消息 - VoicePartMessage voiceMessage = objectMapper.readValue(message.getPayload(), VoicePartMessage.class); + // 初始化文件传输状态 + FileTransferState state = transferStates.get(clientId); + state.setFileName(metadata.getFileName()); + state.setTotalSize(metadata.getTotalSize()); + state.setTotalParts(metadata.getTotalParts()); + state.setOutputStream(new FileOutputStream(STORAGE_DIR + metadata.getFileName())); - // 处理语音分片 - if ("voice_part".equals(voiceMessage.getType())) { - processVoicePart(clientId, voiceMessage, session); + System.out.println("开始接收文件: " + metadata.getFileName() + " (" + metadata.getTotalParts() + "个分片)"); + + // 确认已收到元数据 + session.sendMessage(new TextMessage("{\"type\":\"metadata_ack\"}")); + } + + // 处理二进制消息(文件分片) + @Override + protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message){ + try{ + log.info("开始-接收文件分片数据流"); + String clientId = (String) session.getAttributes().get("clientId"); + FileTransferState state = transferStates.get(clientId); + + if (state == null || state.getOutputStream() == null) { + session.sendMessage(new TextMessage("{\"type\":\"error\", \"message\":\"未收到文件元数据\"}")); + return; } - } catch (Exception e) { - System.err.println("处理消息出错: " + e.getMessage()); + log.info("进行中-接收文件分片数据流"); + // 解析分片数据 + ByteBuffer payload = message.getPayload(); + // int partNumber = payload.getInt(); // 前4字节是分片编号 + byte[] data = new byte[payload.remaining()]; + payload.get(data); + + // 写入文件 + state.getOutputStream().write(data); + state.incrementReceivedParts(); + + // 发送进度更新(每5个分片或最后一个分片) + if (state.getReceivedParts() % 5 == 0 || state.getReceivedParts() == state.getTotalParts()) { + double progress = (double) state.getReceivedParts() / state.getTotalParts() * 100; + session.sendMessage(new TextMessage( + "{\"type\":\"progress\", \"progress\":" + progress + "}" + )); + + // 检查是否接收完成 + if (state.getReceivedParts() == state.getTotalParts()) { + log.info("生成完整的文件-接收文件分片数据流"); + completeFileTransfer(session, state, clientId); + } + } + }catch (Exception e){ e.printStackTrace(); } } + // 完成文件传输 + private void completeFileTransfer(WebSocketSession session, FileTransferState state, String clientId) throws IOException { + // 关闭文件输出流 + state.getOutputStream().close(); + + // 验证文件大小 + File file = new File(STORAGE_DIR + state.getFileName()); + boolean fileValid = file.length() == state.getTotalSize(); + + // 发送完成消息 + String result = fileValid ? + "{\"type\":\"complete\", \"message\":\"文件接收完成\", \"filePath\":\"" + file.getAbsolutePath() + "\"}" : + "{\"type\":\"error\", \"message\":\"文件损坏,大小不匹配\"}"; + session.sendMessage(new TextMessage(result)); + + System.out.println("文件接收" + (fileValid ? "完成" : "失败") + ": " + state.getFileName()); + + // 清理状态 + transferStates.remove(clientId); + } + @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { - String clientId = getClientId(session); - if (clientId != null) { - // 清理客户端资源 - clientVoiceParts.remove(clientId); - clientTotalParts.remove(clientId); - clientLocks.remove(clientId); - System.out.println("客户端连接关闭: " + clientId); - } - } + String clientId = (String) session.getAttributes().get("clientId"); + FileTransferState state = transferStates.remove(clientId); - /** - * 处理语音分片 - */ - private void processVoicePart(String clientId, VoicePartMessage message, WebSocketSession session) throws Exception { - ReentrantLock lock = clientLocks.get(clientId); - lock.lock(); // 加锁确保线程安全 - try { - // 保存总分片数 - clientTotalParts.put(clientId, message.getTotalParts()); - - // 解码Base64数据并存储分片 - byte[] voiceData = Base64.getDecoder().decode(message.getData()); - clientVoiceParts.get(clientId).put(message.getPartNumber(), voiceData); - - System.out.printf("接收客户端 %s 的分片 %d/%d%n", - clientId, message.getPartNumber() + 1, message.getTotalParts()); - - // 检查是否所有分片都已接收 - checkAndMergeParts(clientId, session); - } finally { - lock.unlock(); // 释放锁 - } - } - - /** - * 检查是否所有分片都已接收,如果是则合并 - */ - private void checkAndMergeParts(String clientId, WebSocketSession session) throws Exception { - Map parts = clientVoiceParts.get(clientId); - Integer totalParts = clientTotalParts.get(clientId); - - if (parts == null || totalParts == null) { - return; - } - - // 所有分片都已接收 - if (parts.size() == totalParts) { - System.out.println("所有分片接收完成,开始合并: " + clientId); - - // 生成唯一文件名 - String fileName = clientId + "_" + System.currentTimeMillis() + ".wav"; - Path outputPath = Paths.get(VOICE_STORAGE_DIR + fileName); - - // 合并分片 - try (FileOutputStream fos = new FileOutputStream(outputPath.toFile())) { - for (byte[] part : parts.values()) { - fos.write(part); - } + // 关闭可能存在的文件流 + if (state != null && state.getOutputStream() != null) { + try { + state.getOutputStream().close(); + // 删除未完成的文件 + Files.deleteIfExists(Paths.get(STORAGE_DIR + state.getFileName())); + } catch (IOException e) { + e.printStackTrace(); } - - System.out.println("语音文件合并完成,保存路径: " + outputPath); - - // 向客户端发送处理完成消息 - Map response = new HashMap<>(); - response.put("type", "complete"); - response.put("message", "语音接收完成"); - response.put("fileName", fileName); - session.sendMessage(new TextMessage(objectMapper.writeValueAsString(response))); - - // 清理已合并的分片数据 - clientVoiceParts.get(clientId).clear(); } + System.out.println("客户端断开连接: " + clientId); } - /** - * 从会话中获取客户端ID - */ - private String getClientId(WebSocketSession session) { - return (String) session.getAttributes().get("clientId"); + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + System.err.println("传输错误: " + exception.getMessage()); + session.close(CloseStatus.SERVER_ERROR); } } diff --git a/vetti-admin/src/main/java/com/vetti/socket/vo/FileMetadata.java b/vetti-admin/src/main/java/com/vetti/socket/vo/FileMetadata.java new file mode 100644 index 0000000..4b50e69 --- /dev/null +++ b/vetti-admin/src/main/java/com/vetti/socket/vo/FileMetadata.java @@ -0,0 +1,20 @@ +package com.vetti.socket.vo; + +/** + * 文件元数据类 + */ +public class FileMetadata { + + private String fileName; + private long totalSize; + private int totalParts; + + // getter和setter方法 + public String getFileName() { return fileName; } + public void setFileName(String fileName) { this.fileName = fileName; } + public long getTotalSize() { return totalSize; } + public void setTotalSize(long totalSize) { this.totalSize = totalSize; } + public int getTotalParts() { return totalParts; } + public void setTotalParts(int totalParts) { this.totalParts = totalParts; } + +} diff --git a/vetti-admin/src/main/java/com/vetti/socket/vo/FileTransferState.java b/vetti-admin/src/main/java/com/vetti/socket/vo/FileTransferState.java new file mode 100644 index 0000000..8555af4 --- /dev/null +++ b/vetti-admin/src/main/java/com/vetti/socket/vo/FileTransferState.java @@ -0,0 +1,27 @@ +package com.vetti.socket.vo; + +import java.io.FileOutputStream; + +/** + * 文件传输状态类 + */ +public class FileTransferState { + + private String fileName; + private long totalSize; + private int totalParts; + private int receivedParts = 0; + private FileOutputStream outputStream; + + // getter和setter方法 + public String getFileName() { return fileName; } + public void setFileName(String fileName) { this.fileName = fileName; } + public long getTotalSize() { return totalSize; } + public void setTotalSize(long totalSize) { this.totalSize = totalSize; } + public int getTotalParts() { return totalParts; } + public void setTotalParts(int totalParts) { this.totalParts = totalParts; } + public int getReceivedParts() { return receivedParts; } + public void incrementReceivedParts() { this.receivedParts++; } + public FileOutputStream getOutputStream() { return outputStream; } + public void setOutputStream(FileOutputStream outputStream) { this.outputStream = outputStream; } +}