多線程讀寫同一個文件分多種情況:
-
多線程同時讀同一個文件,在這種情況下並不會造成沖突
-
多線程同時寫同一個文件,會造成寫數據丟失
-
多線程同時對同一個文件進行寫和讀,會造成臟讀
如果要處理多線程讀寫文件造成的數據不一致的問題,第一個想到的就是加鎖。在java.concurrent.locks
中ReadWriteLock
分別定義了樂觀鎖讀鎖和悲觀鎖寫鎖,將以上的情況都考慮到了,可以很好地處理多線程讀寫同一個文件的情況。但是既然加鎖,必然會導致多線程在讀寫文件時效率較低,在不同情形下似乎有更好的解決方案:
-
通過
ReadWriteLock
為文件讀寫過程加鎖,防止數據與預想不一致,同時降低了多線程處理的效率 -
如果多線程頻繁寫入少量數據,可創建一個類緩存需要寫入的數據,並且按時批量寫入數據,減少頻繁操作文件及加鎖操作帶來的問題。
-
可通過RandomAccessFile規划好不同位置,多線程同時操作不同位置的寫入
有關RandomAccessFile,之前在
1 實踐一 多線程寫文件RandomAccessFile不加鎖
RandomAccessFile可選模式是r
,rw
,rws
,或者rwd
,寫入時不會清空原數據,會在指定位置覆蓋原本內容寫入新內容。
所以:
-
清空RandomAccessFile打開的文件的方法為
rw.setLength(0)
-
為了防止多線程間寫入內容互相覆蓋需要規划好寫入的位置,插入的話會更加麻煩
-
規划好寫入的位置有兩種,a)如果每行寫入的字數相同很簡單可以計算得到足夠的位置 b)預留足夠的位置
-
預留足夠的位置的結果其實會有些瑕疵,結果如下所示:
/* 多線程寫入文件指定位置 */ class RWrite implements Runnable{ public int pos; public String text; public String fileName; public RWrite(int pos,String text,String fileName){ this.pos = pos; this.text = text; this.fileName = fileName; } @Override public void run(){ try { RandomAccessFile rw = new RandomAccessFile(this.fileName,"rw"); rw.seek(pos); rw.writeBytes(text); rw.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } public void test(){ ExecutorService pool = Executors.newFixedThreadPool(10); int linwsize = 20;//必須預留足夠的一行的空間,否則會導致覆蓋,但是預留空間過大,也會出現空的字符 int lines = 50;//需要寫入100行 String fileName = "a.csv"; RandomAccessFile rw = null; try { rw = new RandomAccessFile(fileName,"rw"); rw.setLength(0);//清空文件 /*寫入標題欄*/ rw.writeBytes("index,text\n"); rw.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } for(int i=1;i<=lines;i++){ pool.execute(new RWrite(i*linwsize,i+", text"+i+"\n",fileName)); } pool.shutdown(); }
2 實踐二 將寫入文件交給一個線程進行
類似於生產者消費者模式,只是只有一個消費者。
本來只想簡單地寫一下的,但是因為在某處將0寫成了9調試了許久。
/* 將寫入工作交給一個線程 */ /* 加工數據的Task。生產者 */ class DataTask implements Runnable{ BlockingQueue<String> data_put; private int start; private int end; public DataTask(BlockingQueue<String> data_put,int start,int end){ this.data_put = data_put; this.start = start; this.end = end; } @Override public void run(){ /*System.out.println(String.format("%s-%s:%d-%d開始", System.currentTimeMillis(), Thread.currentThread().getName(), this.start,this.end));*/ for(int i=start;i<=end;i++){ try { /* 加工數據需要時間,隨機 */ Thread.sleep(10+new Random().nextInt(20)); String s = String.format("%s-%s:%d-%d[%d]\n", System.currentTimeMillis(), Thread.currentThread().getName(), this.start,this.end,i); data_put.put(i+"\n"); } catch (InterruptedException e) { e.printStackTrace(); return; } } System.out.println(String.format("%s-%s:%d-%d結束", System.currentTimeMillis(), Thread.currentThread().getName(), this.start,this.end)); } } /* 緩存提交的數據,並寫入 */ class WriteTask implements Runnable{ private BlockingQueue<String> data_in = new ArrayBlockingQueue<>(10); private byte[] buffer = new byte[1024]; private int th = (int)(1024*0.8); int length=0; private String fileName; public WriteTask(String fileName){ this.fileName = fileName; try { /* 清空要寫入數據的文件 */ FileOutputStream fileOutputStream = new FileOutputStream(fileName); fileOutputStream.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public BlockingQueue<String> get_Queue(){ return data_in; } private void write(){ if(length==0) return; try { //System.out.println(length); //System.out.println(new String(buffer)); System.out.println("開始寫入……"); FileOutputStream fileOutputStream = new FileOutputStream(fileName,true); fileOutputStream.write(buffer,0,length); fileOutputStream.close(); System.out.println(length+"寫入完成。"); length = 0; } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private void close(){ //System.out.println(new String(buffer)); this.write(); // System.out.println(length); //data_in = null; } @Override public void run(){ while (true){ try { byte[] tmp= data_in.take().getBytes(); System.arraycopy(tmp,0,buffer,length,tmp.length); length = length+tmp.length; if(length>=th){ this.write(); } } catch (InterruptedException e) { //e.printStackTrace(); break; } } } } public void test3(){ ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); String fileName = "b.csv"; WriteTask writeTask = new WriteTask(fileName); pool.execute(writeTask); int num = 20; int writenum = 100; for(int i=0;i<num;i++){ //System.out.println(i*writenum+"---"+((i+1)*writenum-1)); pool.execute(new DataTask(writeTask.get_Queue(),i*writenum,((i+1)*writenum-1))); } pool.shutdown(); while (true){ try { pool.awaitTermination(500,TimeUnit.MILLISECONDS); if(pool.getActiveCount()==1){ writeTask.close(); Thread.sleep(10); pool.shutdownNow(); } if(pool.getActiveCount()==0){ break; } } catch (InterruptedException e) { e.printStackTrace(); break; } } }
3 實踐三 多線程復制文件RandomAccessFile不加鎖
使用相同的偏移量,進行讀寫,也能防止多線程寫入時發生沖突。
復制文件包含讀寫操作,好處是不必自己規划偏移量。
/* 多線程復制文件 */ /* 多線程寫入文件指定位置 */ class RCopy implements Runnable{ public int pos; public int len; public String readFile; public String writeFlie; public RCopy(String readFile,String writeFlie,int pos,int len){ this.pos = pos; this.len = len; this.readFile = readFile; this.writeFlie = writeFlie; } @Override public void run(){ byte[] bytes = new byte[len]; try { RandomAccessFile rr = new RandomAccessFile(this.readFile,"r"); RandomAccessFile rw = new RandomAccessFile(this.writeFlie,"rw"); rr.seek(pos); rw.seek(pos); rr.read(bytes); rw.write(bytes); rr.close(); rw.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } public void test2(){ ExecutorService pool = Executors.newFixedThreadPool(10); String readFile = "b.csv"; String writeFlie = "a.csv"; long totalLen = 0; int len = 1024; /* 每個task寫入的大小 */ try { //讀取需要復制文件的大小 RandomAccessFile file = new RandomAccessFile(readFile,"r"); totalLen = file.length(); System.out.println("length:"+totalLen); //清空需要寫入的文件 file = new RandomAccessFile(writeFlie,"rw"); file.setLength(0); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } int tasknum = 11; for(int i=0;i<totalLen;i = i+len){ int alen = len; if(i+len>totalLen) alen = (int)totalLen-i; //System.out.println(i+":"+alen); pool.execute(new RCopy(readFile,writeFlie,i,alen)); } pool.shutdown(); }