java实现文件分片传输


需求:

最近开发了一个及时聊天系统,用到了一个公司自己开发的MQ,这个MQ单条消息只支持512KB,所以在传输大于512KB文件就会有问题,所以传输文件时要分批次传递。

来说一下方案:

1.封装消息头,当然这个消息头的长度要是固定的,如果位数不够需要补位。

2.把消息头数据和限制大小文件数据都放在一个byte数组中,进行组装数据,文件数据需要拆分多次,每次都要携带消息头。

3.接收端拆分byte数组,把文件部分保存到文件。

 

简单描述一下我的思路:

1.发送者发送消息,我这里是放到MQ里了,这里从一个文件里读文件,每次都1024个字节,读出来后组装头信息,把数据发送到MQ中。

2.接受者接收消息,我在MQ中读取数据,把数据拿到后,拆分数据包,根据头信息创建文件,记得用可以追加的文件流,这样每次都跟据头信息去保存或追加文件。

 

来上代码:

1.发送者

/**
     * 发送者
     */
    private static void sender(SynchronousQueue<byte[]> queue) {
        int limitLen = 1024;
        String fileName = "1.txt";
        String filePath = "D:\\1.txt";
        File file = new File(filePath);
        //仅显示文佳大小使用
        byte[] message = FileUtil.File2byte(file);
        System.out.println("文件总大小:"+message.length);
try {
            FileInputStream fis = new FileInputStream(file);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] b = new byte[limitLen];
            int n;
            while ((n = fis.read(b)) != -1) {
                bos.write(b, 0, n);
                //发送消息
                senderItem(b, fileName, filePath, queue);
            }
            fis.close();
            bos.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void senderItem(byte[] message, String fileName, String filePath, SynchronousQueue<byte[]> queue) {
        //全部数据byte数组
        byte[] byteFull = new byte[500 + message.length];
        //定义消息头:文件名|全路径|本条消息大小|文件大小
        String header = fileName + "|" + filePath + "|" + byteFull.length + "|" + message.length;
        byte[] headerByte = header.getBytes();

        //这里先把头部数据存进全部数据的字节数组
        for (int i = 0; i < headerByte.length; i++) {
            byteFull[i] = headerByte[i];
        }

        //由于消息位为固定大小(500),现在的消息大小不够500,这里解析时就会有问题,所引消息需要补位。
        //计算补位数
        int size = 500 - headerByte.length;
        //为头陪补位
        for (; size < 500; size++) {
            byteFull[size] = '\0';
        }

        //文件内容
        for (int i = 0; i < message.length; i++) {
            byteFull[i + 500] = message[i];
        }

        //发送消息
        try {
            queue.put(byteFull);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 

2.接收者

private static void receiver(SynchronousQueue<byte[]> queue) throws InterruptedException {

        byte[] byteFull = queue.take();
        //声明文件的数组
        byte[] fileByte = new byte[byteFull.length - 500];
        //获取文件的内容
        for (int i = 0; i < fileByte.length; i++) {
            fileByte[i] = byteFull[500 + i];
        }

        byte[] header = new byte[500];
        System.arraycopy(byteFull, 0, header, 0, 500);

        String full = new String(header);
        String[] value = full.split("\\|");

        String fileName = value[0];
        String filePath = value[1];
        //消息长度
        Integer fullLen = Integer.parseInt(value[2].trim());
        //文件总长度
        Integer fileLen = Integer.parseInt(value[3].trim());


        String msg = value[3].trim();
        byte[] newFile = new byte[fullLen - 500];
        System.arraycopy(byteFull, 500, newFile, 0, fullLen - 500);
        System.out.println("new file len is:" + (fullLen - 500));
        System.out.println("newPath:" + (fullLen - 500));

        FileUtil.saveFileByBytes(newFile, "D:\\temp\\", fileName);
        System.out.println("receive msg is:" + msg);

    }

3.测试

private static class Writer extends Thread {
        SynchronousQueue<byte[]> queue;

        public Writer(SynchronousQueue<byte[]> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
                sender(queue);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    private static class Reader extends Thread {
        SynchronousQueue<byte[]> queue;

        public Reader(SynchronousQueue<byte[]> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    receiver(queue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        SynchronousQueue<byte[]> queue = new SynchronousQueue<>();
        new Writer(queue).start();
        new Reader(queue).start();
    }

 测试结果:存了3次。

文件总大小:2595
new file len is:1024
newPath:1024
receive msg is:1024
new file len is:1024
newPath:1024
receive msg is:1024
new file len is:1024
newPath:1024
receive msg is:1024

 

帮助类FileUtil

package org.test.nio.demo5;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;

/**
 * 文件操作
 *
 * @author zhennan
 * @version 1.0
 * @date 2022/3/20 15:33
 */
public class FileUtil {
    /**
     * 将文件转换成Byte数组
     *
     * @param pathStr
     * @return
     */
    public static byte[] getBytesByFile(String pathStr) {
        File file = new File(pathStr);
        try {
            FileInputStream fis = new FileInputStream(file);
            ByteArrayOutputStream bos = new ByteArrayOutputStream(1000);
            byte[] b = new byte[1000];
            int n;
            while ((n = fis.read(b)) != -1) {
                bos.write(b, 0, n);
            }
            fis.close();
            byte[] data = bos.toByteArray();
            bos.close();
            return data;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 将Byte数组转换成文件
     *
     * @param bytes
     * @param filePath
     * @param fileName
     */
    public static void saveFileByBytes(byte[] bytes, String filePath, String fileName) {
        BufferedOutputStream bos = null;
        FileOutputStream fos = null;
        File file = null;
        try {
            File dir = new File(filePath);
            if (!dir.exists() && dir.isDirectory()) {// 判断文件目录是否存在
                dir.mkdirs();
            }
            file = new File(filePath + File.separator + fileName);
            fos = new FileOutputStream(file, true);
            bos = new BufferedOutputStream(fos);
            bos.write(bytes);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (bos != null) {
                try {
                    bos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (fos != null) {
                try {
                    fos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * file转byte
     *
     * @param tradeFile
     * @return
     */
    public static byte[] File2byte(File tradeFile) {
        byte[] buffer = null;
        try {
            FileInputStream fis = new FileInputStream(tradeFile);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] b = new byte[1024];
            int n;
            while ((n = fis.read(b)) != -1) {
                bos.write(b, 0, n);
            }
            fis.close();
            bos.close();
            buffer = bos.toByteArray();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buffer;
    }

    /**
     * byte转string
     *
     * @param bytes
     * @return
     */
    public static String byte2String(byte[] bytes) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < bytes.length; i++) {
            if (bytes[i] != 0) {
                sb.append((char) bytes[i]);
            } else {
                break;
            }
        }
        return sb.toString();
    }

    /**
     * byte转char
     *
     * @param bytes
     * @return
     */
    public static char[] getChars(byte[] bytes) {
        Charset cs = Charset.forName("GB2312");
        ByteBuffer bb = ByteBuffer.allocate(bytes.length);
        bb.put(bytes);
        bb.flip();
        CharBuffer cb = cs.decode(bb);
        return cb.array();
    }

    /**
     * char转byte
     */
    public static byte[] getBytes(char[] chars) {
        Charset cs = Charset.forName("GB2312");
        CharBuffer cb = CharBuffer.allocate(chars.length);
        cb.put(chars);
        cb.flip();
        ByteBuffer bb = cs.encode(cb);
        return bb.array();

    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM