需求:
最近开发了一个及时聊天系统,用到了一个公司自己开发的MQ,这个MQ单条消息只支持512KB,所以在传输大于512KB文件就会有问题,所以传输文件时要分批次传递。
来说一下方案:
1.封装消息头,当然这个消息头的长度要是固定的,如果位数不够需要补位。
2.把消息头数据和限制大小文件数据都放在一个byte数组中,进行组装数据,文件数据需要拆分多次,每次都要携带消息头。
3.接收端拆分byte数组,把文件部分保存到文件。
简单描述一下我的思路:
1.发送者发送消息,我这里是放到MQ里了,这里从一个文件里读文件,每次都1024个字节,读出来后组装头信息,把数据发送到MQ中。
2.接受者接收消息,我在MQ中读取数据,把数据拿到后,拆分数据包,根据头信息创建文件,记得用可以追加的文件流,这样每次都跟据头信息去保存或追加文件。
来上代码:
1.发送者
/** * 发送者 */ private static void sender(SynchronousQueue<byte[]> queue) { int limitLen = 1024; String fileName = "1.txt"; String filePath = "D:\\1.txt"; File file = new File(filePath); //仅显示文佳大小使用 byte[] message = FileUtil.File2byte(file); System.out.println("文件总大小:"+message.length); try { FileInputStream fis = new FileInputStream(file); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] b = new byte[limitLen]; int n; while ((n = fis.read(b)) != -1) { bos.write(b, 0, n); //发送消息 senderItem(b, fileName, filePath, queue); } fis.close(); bos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void senderItem(byte[] message, String fileName, String filePath, SynchronousQueue<byte[]> queue) { //全部数据byte数组 byte[] byteFull = new byte[500 + message.length]; //定义消息头:文件名|全路径|本条消息大小|文件大小 String header = fileName + "|" + filePath + "|" + byteFull.length + "|" + message.length; byte[] headerByte = header.getBytes(); //这里先把头部数据存进全部数据的字节数组 for (int i = 0; i < headerByte.length; i++) { byteFull[i] = headerByte[i]; } //由于消息位为固定大小(500),现在的消息大小不够500,这里解析时就会有问题,所引消息需要补位。 //计算补位数 int size = 500 - headerByte.length; //为头陪补位 for (; size < 500; size++) { byteFull[size] = '\0'; } //文件内容 for (int i = 0; i < message.length; i++) { byteFull[i + 500] = message[i]; } //发送消息 try { queue.put(byteFull); } catch (Exception e) { e.printStackTrace(); } }
2.接收者
private static void receiver(SynchronousQueue<byte[]> queue) throws InterruptedException { byte[] byteFull = queue.take(); //声明文件的数组 byte[] fileByte = new byte[byteFull.length - 500]; //获取文件的内容 for (int i = 0; i < fileByte.length; i++) { fileByte[i] = byteFull[500 + i]; } byte[] header = new byte[500]; System.arraycopy(byteFull, 0, header, 0, 500); String full = new String(header); String[] value = full.split("\\|"); String fileName = value[0]; String filePath = value[1]; //消息长度 Integer fullLen = Integer.parseInt(value[2].trim()); //文件总长度 Integer fileLen = Integer.parseInt(value[3].trim()); String msg = value[3].trim(); byte[] newFile = new byte[fullLen - 500]; System.arraycopy(byteFull, 500, newFile, 0, fullLen - 500); System.out.println("new file len is:" + (fullLen - 500)); System.out.println("newPath:" + (fullLen - 500)); FileUtil.saveFileByBytes(newFile, "D:\\temp\\", fileName); System.out.println("receive msg is:" + msg); }
3.测试
private static class Writer extends Thread { SynchronousQueue<byte[]> queue; public Writer(SynchronousQueue<byte[]> queue) { this.queue = queue; } @Override public void run() { try { TimeUnit.SECONDS.sleep(1); sender(queue); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Reader extends Thread { SynchronousQueue<byte[]> queue; public Reader(SynchronousQueue<byte[]> queue) { this.queue = queue; } @Override public void run() { while (true) { try { receiver(queue); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { SynchronousQueue<byte[]> queue = new SynchronousQueue<>(); new Writer(queue).start(); new Reader(queue).start(); }
测试结果:存了3次。
文件总大小:2595 new file len is:1024 newPath:1024 receive msg is:1024 new file len is:1024 newPath:1024 receive msg is:1024 new file len is:1024 newPath:1024 receive msg is:1024
帮助类FileUtil
package org.test.nio.demo5; import java.io.*; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; /** * 文件操作 * * @author zhennan * @version 1.0 * @date 2022/3/20 15:33 */ public class FileUtil { /** * 将文件转换成Byte数组 * * @param pathStr * @return */ public static byte[] getBytesByFile(String pathStr) { File file = new File(pathStr); try { FileInputStream fis = new FileInputStream(file); ByteArrayOutputStream bos = new ByteArrayOutputStream(1000); byte[] b = new byte[1000]; int n; while ((n = fis.read(b)) != -1) { bos.write(b, 0, n); } fis.close(); byte[] data = bos.toByteArray(); bos.close(); return data; } catch (Exception e) { e.printStackTrace(); } return null; } /** * 将Byte数组转换成文件 * * @param bytes * @param filePath * @param fileName */ public static void saveFileByBytes(byte[] bytes, String filePath, String fileName) { BufferedOutputStream bos = null; FileOutputStream fos = null; File file = null; try { File dir = new File(filePath); if (!dir.exists() && dir.isDirectory()) {// 判断文件目录是否存在 dir.mkdirs(); } file = new File(filePath + File.separator + fileName); fos = new FileOutputStream(file, true); bos = new BufferedOutputStream(fos); bos.write(bytes); } catch (Exception e) { e.printStackTrace(); } finally { if (bos != null) { try { bos.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * file转byte * * @param tradeFile * @return */ public static byte[] File2byte(File tradeFile) { byte[] buffer = null; try { FileInputStream fis = new FileInputStream(tradeFile); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] b = new byte[1024]; int n; while ((n = fis.read(b)) != -1) { bos.write(b, 0, n); } fis.close(); bos.close(); buffer = bos.toByteArray(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return buffer; } /** * byte转string * * @param bytes * @return */ public static String byte2String(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < bytes.length; i++) { if (bytes[i] != 0) { sb.append((char) bytes[i]); } else { break; } } return sb.toString(); } /** * byte转char * * @param bytes * @return */ public static char[] getChars(byte[] bytes) { Charset cs = Charset.forName("GB2312"); ByteBuffer bb = ByteBuffer.allocate(bytes.length); bb.put(bytes); bb.flip(); CharBuffer cb = cs.decode(bb); return cb.array(); } /** * char转byte */ public static byte[] getBytes(char[] chars) { Charset cs = Charset.forName("GB2312"); CharBuffer cb = CharBuffer.allocate(chars.length); cb.put(chars); cb.flip(); ByteBuffer bb = cs.encode(cb); return bb.array(); } }
