實踐 1-2:多線程讀寫文件


多線程讀寫同一個文件分多種情況:

  • 多線程同時讀同一個文件,在這種情況下並不會造成沖突

  • 多線程同時寫同一個文件,會造成寫數據丟失

  • 多線程同時對同一個文件進行寫和讀,會造成臟讀

如果要處理多線程讀寫文件造成的數據不一致的問題,第一個想到的就是加鎖。在java.concurrent.locksReadWriteLock分別定義了樂觀鎖讀鎖和悲觀鎖寫鎖,將以上的情況都考慮到了,可以很好地處理多線程讀寫同一個文件的情況。但是既然加鎖,必然會導致多線程在讀寫文件時效率較低,在不同情形下似乎有更好的解決方案:

  1. 通過ReadWriteLock為文件讀寫過程加鎖,防止數據與預想不一致,同時降低了多線程處理的效率

  2. 如果多線程頻繁寫入少量數據,可創建一個類緩存需要寫入的數據,並且按時批量寫入數據,減少頻繁操作文件及加鎖操作帶來的問題。

  3. 可通過RandomAccessFile規划好不同位置,多線程同時操作不同位置的寫入

 

有關RandomAccessFile,之前在JAVA篇:Java IO (三)訪問文件--轉換流和文件流 對RandomAccessFile進行了簡單的了解。

 

1 實踐一 多線程寫文件RandomAccessFile不加鎖

RandomAccessFile可選模式是rrwrws,或者rwd,寫入時不會清空原數據,會在指定位置覆蓋原本內容寫入新內容。

 

所以:

  1. 清空RandomAccessFile打開的文件的方法為 rw.setLength(0)

  2. 為了防止多線程間寫入內容互相覆蓋需要規划好寫入的位置,插入的話會更加麻煩

  3. 規划好寫入的位置有兩種,a)如果每行寫入的字數相同很簡單可以計算得到足夠的位置 b)預留足夠的位置

  4. 預留足夠的位置的結果其實會有些瑕疵,結果如下所示:

     

     

     

  
  /* 多線程寫入文件指定位置 */
    class RWrite implements Runnable{
        public int pos;
        public String text;
        public String fileName;
​
        public RWrite(int pos,String text,String fileName){
            this.pos = pos;
            this.text = text;
            this.fileName = fileName;
​
        }
        @Override
        public void run(){
            try {
                RandomAccessFile rw = new RandomAccessFile(this.fileName,"rw");
                rw.seek(pos);
                rw.writeBytes(text);
                rw.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
​
​
    public void test(){
        ExecutorService pool = Executors.newFixedThreadPool(10);
        int linwsize = 20;//必須預留足夠的一行的空間,否則會導致覆蓋,但是預留空間過大,也會出現空的字符
        int lines = 50;//需要寫入100行
​
        String fileName = "a.csv";
​
        RandomAccessFile rw = null;
        try {
            rw = new RandomAccessFile(fileName,"rw");
            rw.setLength(0);//清空文件
            /*寫入標題欄*/
            rw.writeBytes("index,text\n");
            rw.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
​
​
        for(int i=1;i<=lines;i++){
            pool.execute(new RWrite(i*linwsize,i+", text"+i+"\n",fileName));
        }
        pool.shutdown();
​
    }
 

 

2 實踐二 將寫入文件交給一個線程進行

類似於生產者消費者模式,只是只有一個消費者。

本來只想簡單地寫一下的,但是因為在某處將0寫成了9調試了許久。

   /* 將寫入工作交給一個線程 */
    /* 加工數據的Task。生產者 */
    class DataTask implements Runnable{
        BlockingQueue<String> data_put;
        private int start;
        private int end;
​
        public DataTask(BlockingQueue<String> data_put,int start,int end){
            this.data_put = data_put;
            this.start = start;
            this.end = end;
        }
        @Override
        public void run(){
            /*System.out.println(String.format("%s-%s:%d-%d開始",
                    System.currentTimeMillis(),
                    Thread.currentThread().getName(),
                    this.start,this.end));*/
            for(int i=start;i<=end;i++){
                
                try {
                    /* 加工數據需要時間,隨機 */
                    Thread.sleep(10+new Random().nextInt(20));
                    String s = String.format("%s-%s:%d-%d[%d]\n",
                            System.currentTimeMillis(),
                            Thread.currentThread().getName(),
                            this.start,this.end,i);
                    data_put.put(i+"\n");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
​
                }
            }
            System.out.println(String.format("%s-%s:%d-%d結束",
                    System.currentTimeMillis(),
                    Thread.currentThread().getName(),
                    this.start,this.end));
​
        }
    }
    /* 緩存提交的數據,並寫入 */
    class WriteTask implements Runnable{
        private BlockingQueue<String> data_in = new ArrayBlockingQueue<>(10);
        private byte[] buffer = new byte[1024];
        private int th = (int)(1024*0.8);
        int length=0;
        private String fileName;
​
        public WriteTask(String fileName){
            this.fileName = fileName;
            try {
                /* 清空要寫入數據的文件 */
                FileOutputStream fileOutputStream = new FileOutputStream(fileName);
                fileOutputStream.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public BlockingQueue<String> get_Queue(){
            return data_in;
        }
        private void  write(){
            if(length==0) return;
            try {
                //System.out.println(length);
                //System.out.println(new String(buffer));
                System.out.println("開始寫入……");
                FileOutputStream fileOutputStream = new FileOutputStream(fileName,true);
                fileOutputStream.write(buffer,0,length);
                fileOutputStream.close();
                System.out.println(length+"寫入完成。");
                length = 0;
​
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
​
​
        }
        private void close(){
            //System.out.println(new String(buffer));
            this.write();
           // System.out.println(length);
            //data_in = null;
        }
        @Override
        public void run(){
            while (true){
                try {
                    byte[] tmp= data_in.take().getBytes();
                    System.arraycopy(tmp,0,buffer,length,tmp.length);
                    length = length+tmp.length;
​
                    if(length>=th){
                        this.write();
                    }
                } catch (InterruptedException e) {
                    //e.printStackTrace();
                    break;
​
                }
            }
        }
    }
​
    public void test3(){
        ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
​
        String fileName = "b.csv";
​
        WriteTask writeTask = new WriteTask(fileName);
        pool.execute(writeTask);
​
        int num = 20;
        int writenum = 100;
​
        for(int i=0;i<num;i++){
            //System.out.println(i*writenum+"---"+((i+1)*writenum-1));
            pool.execute(new DataTask(writeTask.get_Queue(),i*writenum,((i+1)*writenum-1)));
        }
        pool.shutdown();
​
        while (true){
            try {
                pool.awaitTermination(500,TimeUnit.MILLISECONDS);
                if(pool.getActiveCount()==1){
                    writeTask.close();
                    Thread.sleep(10);
                    pool.shutdownNow();
                }
                if(pool.getActiveCount()==0){
                    break;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
​
​
​
    }

 

3 實踐三 多線程復制文件RandomAccessFile不加鎖

使用相同的偏移量,進行讀寫,也能防止多線程寫入時發生沖突。

復制文件包含讀寫操作,好處是不必自己規划偏移量。

   /* 多線程復制文件 */
    /* 多線程寫入文件指定位置 */
    class RCopy implements Runnable{
        public int pos;
        public int len;
        public String readFile;
        public String writeFlie;
​
        public RCopy(String readFile,String writeFlie,int pos,int len){
            this.pos = pos;
            this.len = len;
            this.readFile = readFile;
            this.writeFlie = writeFlie;
​
        }
        @Override
        public void run(){
            byte[] bytes = new byte[len];
            try {
                RandomAccessFile rr = new RandomAccessFile(this.readFile,"r");
                RandomAccessFile rw = new RandomAccessFile(this.writeFlie,"rw");
                rr.seek(pos);
                rw.seek(pos);
                rr.read(bytes);
                rw.write(bytes);
                rr.close();
                rw.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
​
    public void test2(){
        ExecutorService pool = Executors.newFixedThreadPool(10);
​
​
        String readFile = "b.csv";
        String writeFlie = "a.csv";
​
        long totalLen = 0;
        int len = 1024; /* 每個task寫入的大小 */try {
            //讀取需要復制文件的大小
            RandomAccessFile file = new RandomAccessFile(readFile,"r");
            totalLen = file.length();
            System.out.println("length:"+totalLen);
            //清空需要寫入的文件
            file = new RandomAccessFile(writeFlie,"rw");
            file.setLength(0);
​
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
​
        int tasknum = 11;
​
​
        for(int i=0;i<totalLen;i = i+len){
            int alen = len;
            if(i+len>totalLen) alen = (int)totalLen-i;
            //System.out.println(i+":"+alen);
            pool.execute(new RCopy(readFile,writeFlie,i,alen));
        }
        pool.shutdown();
​
    }

 

 

X 參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM