最近做了一個功能模塊,就是有大量的文本文件,需要錄入數據庫,之前的邏輯是for循環實現的,所以當文件非常多的時候,就會非常吃力,而且效率低,所以就想到了用線程池來解決這個問題。首先,我們的思路是,先判斷有多少個文件,如果10個文件一下,那單線程就可以解決,沒必要開多個線程。10個到100個文件,我們就可以開10個線程來處理這些任務,100個文件以上,就開100個線程。廢話不多說,直接上代碼。
1.創建線程
public static void main(String[] args) throws Exception { ApplicationContext ac = new ClassPathXmlApplicationContext("conf/spring-config.xml"); ReaderMapper readermapper = ac.getBean(ReaderMapper.class); //查詢出所有等待讀取文件 List<FileName> f_list = readermapper.selectTxt(); int f_size = f_list.size();//文件數目 if(f_size>=1 && f_size<10){ ExecutorService pool = Executors.newSingleThreadExecutor(); //創建單線程池 MyRunnable1 t1 = new MyRunnable1(f_list, 0, f_size); pool.submit(t1); pool.shutdown(); //結束線程池 }else if(f_size>=10 && f_size<100){ ExecutorService pool = Executors.newFixedThreadPool(10); //創建線程池 //取余,把余數給最后一個線程 int m = f_list.size()%10; //每個線程分配多少個任務 int s = (f_list.size()-m)/10; //創建前九個個線程 for(int i = 0; i < 9; i++){ MyRunnable1 t1 = new MyRunnable1(f_list, s*i, s*(i+1)); pool.submit(t1); } //創建第10個線程 MyRunnable1 t2 = new MyRunnable1(f_list, s*9, s*10+m); pool.submit(t2); pool.shutdown(); //結束線程池 }else if(f_size>=100){ ExecutorService pool = Executors.newFixedThreadPool(100); //創建線程池 //取余,把余數給最后一個線程 int m = f_list.size()%100; //每個線程分配多少個任務 int s = (f_list.size()-m)/100; //創建前99個個線程 for(int i = 0; i < 99; i++){ MyRunnable1 t1 = new MyRunnable1(f_list, s*i, s*(i+1)); pool.submit(t1); } //創建第100個線程 MyRunnable1 t2 = new MyRunnable1(f_list, s*99, s*100+m); pool.submit(t2); pool.shutdown(); //結束線程池 } }
2.執行相應的線程
為了保證各個任務不沖突,我的邏輯是,給他們每個線程分配對應的任務,然后各自執行自己的,從查出來list 中,讀取自己對應的起始位置。
public class MyRunnable1 implements Runnable { private List<FileName> f_list; private int start; private int end; public MyRunnable1(List<FileName> f_List, int start, int end){ super(); this.f_list = f_List; this.start = start; this.end = end; } public void run() { ApplicationContext ac = new ClassPathXmlApplicationContext("conf/spring-config.xml"); ReaderMapper readermapper = ac.getBean(ReaderMapper.class); //執行任務 for (int n = this.start; n <this.end; n++){ //創建流 File file = new File(f_list.get(n).getPath1()); BufferedReader bufr = null; FileReader fr; String line = null; String[] name = null;// 定義當前行String數組 File_test ft = new File_test(); int lineCount = 0; //計數器,統計行數 if(file.isFile()){ try { fr = new FileReader(file); bufr = new BufferedReader(fr); } catch (FileNotFoundException e) { e.printStackTrace(); } try { while((line = bufr.readLine()) != null){ name = line.split("##"); lineCount++;//計數器,統計行數 // 上傳文件解析不正確,可能為數據不全 if (name.length != 3) { //報錯時候修改狀態 readermapper.updateState(f_list.get(n).getPath1()); System.err.println("文件 "+f_list.get(n).getName() +"第"+lineCount+"行出錯了!!" ); break; }else{ ft.setOne(name[0]); ft.setTwo(name[1]); ft.setThree(name[2]); ft.setTextname(f_list.get(n).getPath1()); //信息加入另一個表 readermapper.insert(ft); //修改讀取狀態 readermapper.updateTxt(f_list.get(n).getPath1()); } } } catch (IOException e) { e.printStackTrace(); } try { bufr.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("任務"+Thread.currentThread().getName()+"完成"); } } }