需求:
最近開發了一個及時聊天系統,用到了一個公司自己開發的MQ,這個MQ單條消息只支持512KB,所以在傳輸大於512KB文件就會有問題,所以傳輸文件時要分批次傳遞。
來說一下方案:
1.封裝消息頭,當然這個消息頭的長度要是固定的,如果位數不夠需要補位。
2.把消息頭數據和限制大小文件數據都放在一個byte數組中,進行組裝數據,文件數據需要拆分多次,每次都要攜帶消息頭。
3.接收端拆分byte數組,把文件部分保存到文件。
簡單描述一下我的思路:
1.發送者發送消息,我這里是放到MQ里了,這里從一個文件里讀文件,每次都1024個字節,讀出來后組裝頭信息,把數據發送到MQ中。
2.接受者接收消息,我在MQ中讀取數據,把數據拿到后,拆分數據包,根據頭信息創建文件,記得用可以追加的文件流,這樣每次都跟據頭信息去保存或追加文件。
來上代碼:
1.發送者
/** * 發送者 */ private static void sender(SynchronousQueue<byte[]> queue) { int limitLen = 1024; String fileName = "1.txt"; String filePath = "D:\\1.txt"; File file = new File(filePath); //僅顯示文佳大小使用 byte[] message = FileUtil.File2byte(file); System.out.println("文件總大小:"+message.length); try { FileInputStream fis = new FileInputStream(file); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] b = new byte[limitLen]; int n; while ((n = fis.read(b)) != -1) { bos.write(b, 0, n); //發送消息 senderItem(b, fileName, filePath, queue); } fis.close(); bos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void senderItem(byte[] message, String fileName, String filePath, SynchronousQueue<byte[]> queue) { //全部數據byte數組 byte[] byteFull = new byte[500 + message.length]; //定義消息頭:文件名|全路徑|本條消息大小|文件大小 String header = fileName + "|" + filePath + "|" + byteFull.length + "|" + message.length; byte[] headerByte = header.getBytes(); //這里先把頭部數據存進全部數據的字節數組 for (int i = 0; i < headerByte.length; i++) { byteFull[i] = headerByte[i]; } //由於消息位為固定大小(500),現在的消息大小不夠500,這里解析時就會有問題,所引消息需要補位。 //計算補位數 int size = 500 - headerByte.length; //為頭陪補位 for (; size < 500; size++) { byteFull[size] = '\0'; } //文件內容 for (int i = 0; i < message.length; i++) { byteFull[i + 500] = message[i]; } //發送消息 try { queue.put(byteFull); } catch (Exception e) { e.printStackTrace(); } }
2.接收者
private static void receiver(SynchronousQueue<byte[]> queue) throws InterruptedException { byte[] byteFull = queue.take(); //聲明文件的數組 byte[] fileByte = new byte[byteFull.length - 500]; //獲取文件的內容 for (int i = 0; i < fileByte.length; i++) { fileByte[i] = byteFull[500 + i]; } byte[] header = new byte[500]; System.arraycopy(byteFull, 0, header, 0, 500); String full = new String(header); String[] value = full.split("\\|"); String fileName = value[0]; String filePath = value[1]; //消息長度 Integer fullLen = Integer.parseInt(value[2].trim()); //文件總長度 Integer fileLen = Integer.parseInt(value[3].trim()); String msg = value[3].trim(); byte[] newFile = new byte[fullLen - 500]; System.arraycopy(byteFull, 500, newFile, 0, fullLen - 500); System.out.println("new file len is:" + (fullLen - 500)); System.out.println("newPath:" + (fullLen - 500)); FileUtil.saveFileByBytes(newFile, "D:\\temp\\", fileName); System.out.println("receive msg is:" + msg); }
3.測試
private static class Writer extends Thread { SynchronousQueue<byte[]> queue; public Writer(SynchronousQueue<byte[]> queue) { this.queue = queue; } @Override public void run() { try { TimeUnit.SECONDS.sleep(1); sender(queue); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Reader extends Thread { SynchronousQueue<byte[]> queue; public Reader(SynchronousQueue<byte[]> queue) { this.queue = queue; } @Override public void run() { while (true) { try { receiver(queue); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { SynchronousQueue<byte[]> queue = new SynchronousQueue<>(); new Writer(queue).start(); new Reader(queue).start(); }
測試結果:存了3次。
文件總大小:2595 new file len is:1024 newPath:1024 receive msg is:1024 new file len is:1024 newPath:1024 receive msg is:1024 new file len is:1024 newPath:1024 receive msg is:1024
幫助類FileUtil
package org.test.nio.demo5; import java.io.*; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; /** * 文件操作 * * @author zhennan * @version 1.0 * @date 2022/3/20 15:33 */ public class FileUtil { /** * 將文件轉換成Byte數組 * * @param pathStr * @return */ public static byte[] getBytesByFile(String pathStr) { File file = new File(pathStr); try { FileInputStream fis = new FileInputStream(file); ByteArrayOutputStream bos = new ByteArrayOutputStream(1000); byte[] b = new byte[1000]; int n; while ((n = fis.read(b)) != -1) { bos.write(b, 0, n); } fis.close(); byte[] data = bos.toByteArray(); bos.close(); return data; } catch (Exception e) { e.printStackTrace(); } return null; } /** * 將Byte數組轉換成文件 * * @param bytes * @param filePath * @param fileName */ public static void saveFileByBytes(byte[] bytes, String filePath, String fileName) { BufferedOutputStream bos = null; FileOutputStream fos = null; File file = null; try { File dir = new File(filePath); if (!dir.exists() && dir.isDirectory()) {// 判斷文件目錄是否存在 dir.mkdirs(); } file = new File(filePath + File.separator + fileName); fos = new FileOutputStream(file, true); bos = new BufferedOutputStream(fos); bos.write(bytes); } catch (Exception e) { e.printStackTrace(); } finally { if (bos != null) { try { bos.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * file轉byte * * @param tradeFile * @return */ public static byte[] File2byte(File tradeFile) { byte[] buffer = null; try { FileInputStream fis = new FileInputStream(tradeFile); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] b = new byte[1024]; int n; while ((n = fis.read(b)) != -1) { bos.write(b, 0, n); } fis.close(); bos.close(); buffer = bos.toByteArray(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return buffer; } /** * byte轉string * * @param bytes * @return */ public static String byte2String(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < bytes.length; i++) { if (bytes[i] != 0) { sb.append((char) bytes[i]); } else { break; } } return sb.toString(); } /** * byte轉char * * @param bytes * @return */ public static char[] getChars(byte[] bytes) { Charset cs = Charset.forName("GB2312"); ByteBuffer bb = ByteBuffer.allocate(bytes.length); bb.put(bytes); bb.flip(); CharBuffer cb = cs.decode(bb); return cb.array(); } /** * char轉byte */ public static byte[] getBytes(char[] chars) { Charset cs = Charset.forName("GB2312"); CharBuffer cb = CharBuffer.allocate(chars.length); cb.put(chars); cb.flip(); ByteBuffer bb = cs.encode(cb); return bb.array(); } }