package ccb.huge;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
* 大文件多線程讀取測試
*/
public class Entrance {
final static String filePath = "d:/ccb/test.txt";
public static void main(String[] args) throws InterruptedException {
File file = new File(filePath);
long fc = file.length();
/* 多線程模式一 */
// Giraffe3(file);
/* 多線程模式二 */
// Giraffe2(file);
/* 線程管理器(三)*/
// Giraffe1(file);
/* 多線程模式四 */
Monkey(file, fc);
/* 多線程模式五 */
// Koala();
}
private static void Giraffe3(File file) {
GiraffeThread[] giraffe = new GiraffeThread[3];
for (int i = 0; i < 3; i++) {
giraffe[i] = new GiraffeThread(file);
giraffe[i].start();
}
}
private static void Giraffe2(File file) {
Thread thread1 = new GiraffeThread(file);
Thread thread2 = new GiraffeThread(file);
Thread thread3 = new GiraffeThread(file);
thread1.start();
thread2.start();
thread3.start();
}
private static void Giraffe1(File file) {
ExecutorService exec = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
exec.execute(new GiraffeThread(file));
}
exec.shutdown();
}
// 按字節數平均分配多線程同時讀取文件
private static void Monkey(File file, long fc) {
file = new File("d:/ccb/new.dat");
// file = new File("d:/ccb/qlhd.txt");
fc = file.length();
// todo: 需要改進:如果文件長度拆分后沒有余數,線程數不需要+1
int thTotal = 6; // 線程數
int mo = (int) (fc % thTotal); // 余數
int size = (int) (fc - mo) / thTotal; // 每次讀取長度
MonkeyThread mk[] = new MonkeyThread[thTotal + 1];
for (int i = 0; i <= thTotal; i++) {
mk[i] = new MonkeyThread(file, i * size, mo, size, (int) fc);
mk[i].start();
}
}
private static void Koala() throws InterruptedException {
String[] fileList = {"d:/ccb/test.txt", "d:/ccb/ONL.dat.result", "d:/ccb/new.dat", "d:/ccb/sixno.txt",};
// System.out.println("fileList = " + fileList.length);
// 總是創建新線程
// ExecutorService exec = Executors.newCachedThreadPool();
ExecutorService exec = Executors.newFixedThreadPool(4);
// 任務列表
List<FutureTask<Long>> taskList = new ArrayList<>();
for (int i = 0; i < fileList.length; i++) {
for (int j = 0; j < fileList.length; j++) {
FutureTask ft = new FutureTask<Long>(new KoalaThread(new File(fileList[i])));
taskList.add(ft);
// exec.submit(ft);
exec.submit(ft);
}
}
System.out.println(
"主線程--" + Thread.currentThread().getId() +
"程結束!" + Thread.currentThread().getName());
Thread.sleep(5000);
int totalResult = 0;
// 開始統計各線程執行結果
for (FutureTask<Long> ft : taskList) {
try {
long result = ft.get();
totalResult += result;
System.out.printf("線程執行時間:%s, 合計時間:%s\n", result, totalResult);
System.out.println("-----------------------------");
System.out.println(ft.toString());
System.out.println("-----------------------------");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 關閉線程池
exec.shutdown();
System.out.println("所有線程執行完畢!共計耗時:" + totalResult);
}
/**
* 給定文件長度和線程數,計算拆分文件的偏移。方便 RandomAccessReader.read() 方法使用
*
* @param fileLength
* @param threadTotal
* @return
*/
private static int[] getPerStart(long fileLength, int threadTotal) {
// 取余
int mo = (int) (fileLength % threadTotal);
int start = (int) ((fileLength - mo) / threadTotal);
// 起始點
int[] begin = new int[threadTotal + 2];
begin[0] = 0;
for (int i = 1; i <= threadTotal; i++) {
begin[i] = (i) * start;
}
begin[threadTotal + 1] = begin[begin.length - 1] + mo;
System.out.println(String.format("總%d, mo=%d, list=%s", fileLength, mo, Arrays.toString(begin)));
return begin;
}
}
里面用到的各個線程類:
package ccb.huge; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; /** * 考拉項目: 單個線程讀取完成一個大文件需要的時間 */ public class KoalaThread implements Callable<Long> { private long result; private File file; public KoalaThread(File file) { this.file = file; } @Override public Long call() throws Exception { long t1 = System.currentTimeMillis(); BufferedReader reader = null; String line; List<String> targetList = new ArrayList<>(); try { reader = new BufferedReader(new FileReader(file)); while ((line = reader.readLine()) != null) { if (line.indexOf("12345") != -1) { targetList.add(line); } } // System.out.printf("線程執行完畢,%s 文件中共找到%s個目標行:\n%s\n\n", // file.toString(), targetList.size(), targetList.toString()); System.out.printf("線程:%s -- %s\n", Thread.currentThread().getName(), Thread.currentThread().getId()); } catch (Exception e) { e.printStackTrace(); } finally { try { if (reader != null) { reader.close(); } } catch (Exception e) { e.printStackTrace(); } } return System.currentTimeMillis() - t1; } }
另外一個:
package ccb.huge;
import java.io.File;
import java.io.RandomAccessFile;
/**
* 猴子項目
* 按起始位置,多線程讀取同一個文件
*/
public class MonkeyThread extends Thread {
//線程名稱
private long id;
private String name;
private File file;
private int start;
private int end;
private int size;
private int fileLen;
private int mo;
public MonkeyThread(File file, int start, int mo, int size, int fileLen) {
this.file = file;
this.start = start;
this.end = start + size;
this.size = size;
this.fileLen = fileLen;
this.mo = mo;
}
@Override
public void run() {
this.id = Thread.currentThread().getId();
this.name = Thread.currentThread().getName();
RandomAccessFile raf = null;
int n = 0;
try {
raf = new RandomAccessFile(file, "r");
synchronized (file) {
/*
個人理解:
多線程同時讀取,所以每個線程讀取后,會將指針保留在讀取完成的位置
所以,每個線程開始讀取前,先把指針從文件起始位置調整到本線程應當開始讀取的位置
void seek(long pos) 指定從文件起始位置開始的指針偏移量
*/
// System.out.println("filePointer=" + raf.getFilePointer());
raf.seek(start);
System.out.println(String.format("(%s)%s:第 %s 次讀取,start=%s end=%s fileLen=%s,內容如下: ",
Thread.currentThread().getId(),
Thread.currentThread().getName(), ++n, start, end, fileLen));
int len = 0;
byte[] buff = new byte[size];
int sum = start + size;
if (sum < fileLen) {
len = raf.read(buff, 0, size);
} else {
len = raf.read(buff, 0, mo);
System.out.println("-----------mo=" + mo);
}
// System.out.println("len=" + len);
System.out.println(new String(buff, 0, len));
System.out.println();
/* 不能用 readLine 方法 */
// for (String line = raf.readLine(); line != null;) {
// System.out.print(String.format("%s-%s--", id, name));
// System.out.println(line);
// }
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (raf != null) {
raf.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
最后一個:
package ccb.huge; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.List; /** * 長頸鹿項目 * 測試使用多線程讀取文件 */ public class GiraffeThread extends Thread { //線程名稱 private long id; private String name; //文件對象 private File file; //構造函數 public GiraffeThread(File file) { this.file = file; } @Override public void run() { this.id = Thread.currentThread().getId(); this.name = Thread.currentThread().getName(); BufferedReader buff = null; FileReader reader = null; try { // sleep(20); reader = new FileReader(file); buff = new BufferedReader(reader); List<String> lines = new ArrayList<>(); String line; int i = 0; // 文件行數 int n = 0; // 循環讀取的次數 synchronized (file) { while ((line = buff.readLine()) != null) { lines.add(line); i++; if ((i % 5) == 0) { System.out.println(String.format("%s(%s)-- 讀取次數:%s ", name, id, ++n)); for (String tmpLine : lines) { System.out.println(tmpLine); } lines.clear(); } } } System.out.println(name + "(" + id + ") 任務完畢!"); } catch (Exception e) { e.printStackTrace(); } finally { try { if (buff != null) { buff.close(); } } catch (Exception e) { e.printStackTrace(); } try { if (reader != null) { reader.close(); } } catch (Exception e) { e.printStackTrace(); } } } }