java實現文件分片傳輸


需求:

最近開發了一個及時聊天系統,用到了一個公司自己開發的MQ,這個MQ單條消息只支持512KB,所以在傳輸大於512KB文件就會有問題,所以傳輸文件時要分批次傳遞。

來說一下方案:

1.封裝消息頭,當然這個消息頭的長度要是固定的,如果位數不夠需要補位。

2.把消息頭數據和限制大小文件數據都放在一個byte數組中,進行組裝數據,文件數據需要拆分多次,每次都要攜帶消息頭。

3.接收端拆分byte數組,把文件部分保存到文件。

 

簡單描述一下我的思路:

1.發送者發送消息,我這里是放到MQ里了,這里從一個文件里讀文件,每次都1024個字節,讀出來后組裝頭信息,把數據發送到MQ中。

2.接受者接收消息,我在MQ中讀取數據,把數據拿到后,拆分數據包,根據頭信息創建文件,記得用可以追加的文件流,這樣每次都跟據頭信息去保存或追加文件。

 

來上代碼:

1.發送者

/**
     * 發送者
     */
    private static void sender(SynchronousQueue<byte[]> queue) {
        int limitLen = 1024;
        String fileName = "1.txt";
        String filePath = "D:\\1.txt";
        File file = new File(filePath);
        //僅顯示文佳大小使用
        byte[] message = FileUtil.File2byte(file);
        System.out.println("文件總大小:"+message.length);
try {
            FileInputStream fis = new FileInputStream(file);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] b = new byte[limitLen];
            int n;
            while ((n = fis.read(b)) != -1) {
                bos.write(b, 0, n);
                //發送消息
                senderItem(b, fileName, filePath, queue);
            }
            fis.close();
            bos.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void senderItem(byte[] message, String fileName, String filePath, SynchronousQueue<byte[]> queue) {
        //全部數據byte數組
        byte[] byteFull = new byte[500 + message.length];
        //定義消息頭:文件名|全路徑|本條消息大小|文件大小
        String header = fileName + "|" + filePath + "|" + byteFull.length + "|" + message.length;
        byte[] headerByte = header.getBytes();

        //這里先把頭部數據存進全部數據的字節數組
        for (int i = 0; i < headerByte.length; i++) {
            byteFull[i] = headerByte[i];
        }

        //由於消息位為固定大小(500),現在的消息大小不夠500,這里解析時就會有問題,所引消息需要補位。
        //計算補位數
        int size = 500 - headerByte.length;
        //為頭陪補位
        for (; size < 500; size++) {
            byteFull[size] = '\0';
        }

        //文件內容
        for (int i = 0; i < message.length; i++) {
            byteFull[i + 500] = message[i];
        }

        //發送消息
        try {
            queue.put(byteFull);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 

2.接收者

private static void receiver(SynchronousQueue<byte[]> queue) throws InterruptedException {

        byte[] byteFull = queue.take();
        //聲明文件的數組
        byte[] fileByte = new byte[byteFull.length - 500];
        //獲取文件的內容
        for (int i = 0; i < fileByte.length; i++) {
            fileByte[i] = byteFull[500 + i];
        }

        byte[] header = new byte[500];
        System.arraycopy(byteFull, 0, header, 0, 500);

        String full = new String(header);
        String[] value = full.split("\\|");

        String fileName = value[0];
        String filePath = value[1];
        //消息長度
        Integer fullLen = Integer.parseInt(value[2].trim());
        //文件總長度
        Integer fileLen = Integer.parseInt(value[3].trim());


        String msg = value[3].trim();
        byte[] newFile = new byte[fullLen - 500];
        System.arraycopy(byteFull, 500, newFile, 0, fullLen - 500);
        System.out.println("new file len is:" + (fullLen - 500));
        System.out.println("newPath:" + (fullLen - 500));

        FileUtil.saveFileByBytes(newFile, "D:\\temp\\", fileName);
        System.out.println("receive msg is:" + msg);

    }

3.測試

private static class Writer extends Thread {
        SynchronousQueue<byte[]> queue;

        public Writer(SynchronousQueue<byte[]> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
                sender(queue);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    private static class Reader extends Thread {
        SynchronousQueue<byte[]> queue;

        public Reader(SynchronousQueue<byte[]> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    receiver(queue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        SynchronousQueue<byte[]> queue = new SynchronousQueue<>();
        new Writer(queue).start();
        new Reader(queue).start();
    }

 測試結果:存了3次。

文件總大小:2595
new file len is:1024
newPath:1024
receive msg is:1024
new file len is:1024
newPath:1024
receive msg is:1024
new file len is:1024
newPath:1024
receive msg is:1024

 

幫助類FileUtil

package org.test.nio.demo5;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;

/**
 * 文件操作
 *
 * @author zhennan
 * @version 1.0
 * @date 2022/3/20 15:33
 */
public class FileUtil {
    /**
     * 將文件轉換成Byte數組
     *
     * @param pathStr
     * @return
     */
    public static byte[] getBytesByFile(String pathStr) {
        File file = new File(pathStr);
        try {
            FileInputStream fis = new FileInputStream(file);
            ByteArrayOutputStream bos = new ByteArrayOutputStream(1000);
            byte[] b = new byte[1000];
            int n;
            while ((n = fis.read(b)) != -1) {
                bos.write(b, 0, n);
            }
            fis.close();
            byte[] data = bos.toByteArray();
            bos.close();
            return data;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 將Byte數組轉換成文件
     *
     * @param bytes
     * @param filePath
     * @param fileName
     */
    public static void saveFileByBytes(byte[] bytes, String filePath, String fileName) {
        BufferedOutputStream bos = null;
        FileOutputStream fos = null;
        File file = null;
        try {
            File dir = new File(filePath);
            if (!dir.exists() && dir.isDirectory()) {// 判斷文件目錄是否存在
                dir.mkdirs();
            }
            file = new File(filePath + File.separator + fileName);
            fos = new FileOutputStream(file, true);
            bos = new BufferedOutputStream(fos);
            bos.write(bytes);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (bos != null) {
                try {
                    bos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (fos != null) {
                try {
                    fos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * file轉byte
     *
     * @param tradeFile
     * @return
     */
    public static byte[] File2byte(File tradeFile) {
        byte[] buffer = null;
        try {
            FileInputStream fis = new FileInputStream(tradeFile);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] b = new byte[1024];
            int n;
            while ((n = fis.read(b)) != -1) {
                bos.write(b, 0, n);
            }
            fis.close();
            bos.close();
            buffer = bos.toByteArray();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buffer;
    }

    /**
     * byte轉string
     *
     * @param bytes
     * @return
     */
    public static String byte2String(byte[] bytes) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < bytes.length; i++) {
            if (bytes[i] != 0) {
                sb.append((char) bytes[i]);
            } else {
                break;
            }
        }
        return sb.toString();
    }

    /**
     * byte轉char
     *
     * @param bytes
     * @return
     */
    public static char[] getChars(byte[] bytes) {
        Charset cs = Charset.forName("GB2312");
        ByteBuffer bb = ByteBuffer.allocate(bytes.length);
        bb.put(bytes);
        bb.flip();
        CharBuffer cb = cs.decode(bb);
        return cb.array();
    }

    /**
     * char轉byte
     */
    public static byte[] getBytes(char[] chars) {
        Charset cs = Charset.forName("GB2312");
        CharBuffer cb = CharBuffer.allocate(chars.length);
        cb.put(chars);
        cb.flip();
        ByteBuffer bb = cs.encode(cb);
        return bb.array();

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM