轉自:AIfred
問題:
對一個 10GB 的數據文件排序,而計算機內存僅有 4GB
思路:
將整個文件讀入內存排序顯然不行。可以將這個 10GB 的大文件分區為 100 個 100MB 的小文件,把這些小文件的數據依次讀入內存、排序、再輸出,於是我們便得到了 100 個各自有序的小文件。接下來再將這 100 個小文件兩兩歸並,便得到了一個有序的大文件,完成了排序操作。在實際中,如果僅僅使用普通串行算法實現,整個程序的效率非常低。盡可能多的並行化會大幅提升效率。
用到知識點:
1. 多線程讀寫二進制文件
2. 並行歸並排序(sort部分並行,merge部分並行)
3. 解決線程沖突(C++11的原子操作)
分區排序
對待排序的大文件,首先需要定義每個分區最多的元素數量partition_size。例如待排序文件中有5500個元素,設partition_size = 1000, 需要創建6個分區,數量分別為5*1000,1*500。然后分別對這個6個分區單獨內排序即可。
4-11:對第i
個分區,首先將待排序文件中對應該分區部分的元素並行讀入內存
26-31:對第i
個分區,將排序好的對應分區的元素並行寫入文件
由於不能將整個文件全部讀入內存,只能通過fsetpos
和 fread
函數來讀取該位置的元素的值。
1 void partition_and_sort(string in_file, long long n) { 2 int* arr = new int[PARTITION_SIZE]; // 一個分區大小的臨時數組 3 // 分區循環 4 for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) { 5 // each_get[i]: 分區內一個線程處理大小, all_get: 分區大小 6 int each_get[4] = {}, all_get = 0; 7 clock_t start_time = clock(); 8 // 多線程讀取一個分區 9 parallel_for(0, MAX_THREADS, [&](long long x) { 10 FILE* fin = fopen(in_file.c_str(), "rb"); 11 // 在文件中定位到該線程讀取的位置 12 fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int); 13 if (fsetpos(fin, &pos) == 0) 14 // 讀取 EACH_NUM個值,返回個數 15 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin); 16 fclose(fin); 17 }); 18 clock_t end_time = clock(); 19 for (int i = 0; i < MAX_THREADS; i++) all_get += each_get[i]; 20 cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". " 21 << "Time usage = " << end_time - start_time << "ms.\n"; 22 start_time = clock(); 23 parallel_qsort(arr, arr + all_get); // 並行快速排序 24 end_time = clock(); 25 cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n"; 26 string tmp_file = "temp\\part" + to_string(parts++) + ".dat"; 27 FILE *fout = fopen(tmp_file.c_str(), "wb"); 28 chsize(fileno(fout), all_get * sizeof(int)); 29 fclose(fout); 30 start_time = clock(); 31 // 對一個分區並行寫入文件 32 parallel_for(0, MAX_THREADS, [&](long long x) { 33 FILE* fout = fopen(tmp_file.c_str(), "rb+"); 34 fpos_t pos = EACH_NUM * x * sizeof(int); 35 if (fsetpos(fout, &pos) == 0) 36 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout); 37 fclose(fout); 38 }); 39 end_time = clock(); 40 cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". " 41 << "Time usage = " << end_time - start_time << "ms.\n"; 42 } 43 delete[] arr; 44 }
並行快排:算法導論版快排,一個for循環維護 i , j 區間,[0, i]其值<=key,(i, j)其值>key。
1 void parallel_qsort(int *begin, int *end) { 2 if (begin >= end - 1) return; 3 int *key = rand() % (end - begin) + begin; // 選 pivot 4 swap(*key, *begin); 5 int *i = begin, *j = begin; 6 for (key = begin; j < end; j++) { 7 if (*j < *key) { 8 i++; 9 swap(*i, *j); 10 } 11 } 12 swap(*begin, *i); 13 if (i - begin > dx && end - i > dx) { // 如果兩個區間長度均大於dx,並行 14 parallel_for(0, 2, [&](int x) { 15 if (x) parallel_qsort(begin, i); 16 else parallel_qsort(i + 1, end); 17 }); 18 } else { // 區間長度較小則不並行 19 parallel_qsort(begin, i); 20 parallel_qsort(i + 1, end); 21 } 22 }
歸並:
是外排序核心,整個程序的效率高低取決於該階段的算法。由於待歸並的數據規模較大,使用並行歸並算法可以顯著提升效率。
並行一般需要分治的思想,需要先找到各對樞軸,將原數據划分成均勻的幾部分,再啟動多個線程進行歸並。
段組分解的基本思想
下面就來講解一種最簡單的基於二分查找的段組分解算法的基本思想。
先看一個實例,有以下兩組有序數據,如何將它們進行並行歸並呢?
- 第一組:
15 25 33 47 58 59 62 64
- 第二組:
12 18 27 31 36 38 42 80
考慮將第1
組數據以中間位置數據47
作為樞軸數據平分成兩段,如下所示:
- 第一組:
(15 25 33 47)(58 59 62 64)
然后再在第2
組數據中使用對半查找方法查找一個剛好小於等於樞軸數據47
的數據42
,以42
為樞軸將第2
組數據分成兩段如下:
- 第二組:
(12 18 27 31 36 38 42)(80)
此時可以發現,第一組的第1
段數據和第二組的第1
段數據均小於第1
組的第2
段或第二組的第2
段數據。
將第一組的第1
段數據和第二組的第1
段數據進行歸並,得到以下數據序列:
- 第一段歸並:
(12 15 18 25 27 31 33 36 38 42 47)
將第一組的第2
段數據和第二組的第2
段數據進行歸並,
- 第二段歸並:
(58 59 62 64 80)
很容易發現,上面兩段歸並后的數據連起來后,自然就形成了一個有序系列。由於上面兩次歸並操作都是獨立的,因此它可以並行地進行。這種可以歸並的兩個成對數據段就是前面說過的段組。
47
在第一組數據中的位置是3
;42
在第二組數據中的位置是6
。位置對(3,6)
將上面兩組數據分成了兩個可以並行歸並的段組。
從上面的例子可以看出,在第一組數據的樞軸數據47
確定以后,在第二組數據中使用折半查找方法找到剛好小於等於樞軸數據47
的數據42
,時間復雜度為O(logn)O(logn)。為了讓效率最優化,每個線程的負載應該盡可能均衡,這就要求選取的位置對能夠將原數據分成元素數量相對均衡的段組。於是我們也應使用折半查找確定第一組數據的樞軸的合適位置。
嵌套二分求樞軸:時間復雜度O((logn)^2)
下面程序假設為4個線程,找3個樞軸將兩個要合並的數組分別分成4段,每一對片段合並后的長度一致,使負載盡可能均衡。
由於不能一次全部將整個文件裝進內存,此處涉及磁盤操作,但由於折半查找的效率較高,訪問磁盤的次數並不多,開銷相對較小。
1 void get_fpos() { 2 long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4; 3 for (long long i = 1; i < MAX_THREADS; i++) {//找第i個樞軸 4 long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1; 5 int *get1 = new int; 6 while (r1 - l1 > 1) { 7 pos1 = (l1 + r1) / 2;//二分查找,先假定file1的樞軸 8 fpos = pos1 * sizeof(int); 9 fsetpos(fin1, &fpos); 10 fread(get1, sizeof(int), 1, fin1); 11 long long l2 = 0, r2 = n2; 12 int* get2 = new int; 13 while (r2 - l2 > 0) {//再用二分查找確定file2的樞軸 14 pos2 = (l2 + r2) / 2; 15 fpos = pos2 * sizeof(int); 16 fsetpos(fin2, &fpos); 17 fread(get2, sizeof(int), 1, fin2); 18 if (*get1 <= *get2) 19 r2 = pos2; 20 else 21 l2 = pos2 + 1; 22 } 23 delete get2; get2 = NULL; 24 pos2 = r2; 25 //如果這兩個樞軸將file1和file2划分的不夠均勻,則對pos1進行調整 26 if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i) 27 l1 = pos1 + 1; 28 else 29 r1 = pos1 - 1; 30 } 31 delete get1; get1 = NULL; 32 seek_pos[i][1] = pos1 * sizeof(int);//記錄file1樞軸位置 33 seek_pos[i][2] = pos2 * sizeof(int);//記錄file2樞軸位置 34 seek_pos[i][0] = seek_pos[i][1] + seek_pos[i][2];//輸出文件樞軸位置 35 } 36 fclose(fin1); fclose(fin2); 37 //邊界細節 38 seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0; 39 seek_pos[MAX_THREADS][1] = n1 * sizeof(int); 40 seek_pos[MAX_THREADS][2] = n2 * sizeof(int); 41 seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2]; 42 }
完成歸並操作的merge_file
函數需要傳入兩個string
類型的參數,表示需要歸並的兩組數據的文件名,返回歸並后的數據的文件名:
string merge_file(string in_file1, string in_file2) {}
在這個函數中,我們首先需要獲取到兩個輸入文件的大小(從而得到元素數量):
1 void get_size() { 2 FILE* fin1 = fopen(in_file1.c_str(), "rb"); 3 FILE* fin2 = fopen(in_file2.c_str(), "rb"); 4 fseek(fin1, 0, SEEK_END); // 將文件指針設為文末 5 fseek(fin2, 0, SEEK_END); 6 fpos_t size1 = 0, size2 = 0; 7 fgetpos(fin1, &size1); // 獲得文件大小 8 fgetpos(fin2, &size2); 9 }
於是size1 / sizeof(int)
便是第一個文件中的元素數量,size2 / sizeof(int)
便是第二個文件中的元素數量。
下面創建輸出文件,並預先調整其大小(為了並行寫入):
1 void create_output_file() { 2 string out_file = "temp\\part" + to_string(parts++) + ".dat"; 3 FILE *fout = fopen(out_file.c_str(), "wb"); 4 _chsize_s(fileno(fout), (n1 + n2) * sizeof(int)); 5 }
parts
是一個全局變量,用來統計已經創建的文件數量並作為新文件的文件名。(比如前一階段 partition
操作將原數據分割為5
個part
,分別命名為part0.dat
, part1.dat
… part4.dat
, 那么在歸並階段形成的新文件將繼續命名為 part5.dat
, part6.dat
… 比如 part0.dat
和part1.dat
歸並得到part5.dat
, part3.dat
和 part4.dat
歸並得到part6.dat
…)由於此時可能有多個線程在執行merge_file
函數,同時訪問全局變量可能會發生訪問沖突,故parts
必須定義為atomic_int
類型,實現原子操作。
歸並操作:
1 //多線程歸並操作,每個線程只負責歸並對應的段組 2 parallel_for(0, MAX_THREADS, [&](int x) { 3 FILE* fin1 = fopen(in_file1.c_str(), "rb"); 4 FILE* fin2 = fopen(in_file2.c_str(), "rb"); 5 FILE* fout = fopen(out_file.c_str(), "rb+"); 6 //根據樞軸位置確定讀入起點 7 fsetpos(fin1, &seek_pos[x][1]); 8 fsetpos(fin2, &seek_pos[x][2]); 9 fsetpos(fout, &seek_pos[x][0]); 10 //輸入輸出緩沖,優化讀寫性能 11 int *buf0 = new int[BUFFER_SIZE];//輸出文件緩沖 12 int *buf1 = new int[BUFFER_SIZE];//fin1緩沖 13 int *buf2 = new int[BUFFER_SIZE];//fin2緩沖 14 int i = 0, j = 0, k = 0;//i, j, k分別是buf1,buf2,buf0的數組下標 15 //該線程應該從file1中讀取all1個數據,從file2中讀取all2個數據 16 long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int); 17 long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int); 18 //先讀取到緩沖區 19 fread(buf1, sizeof(int), BUFFER_SIZE, fin1); 20 fread(buf2, sizeof(int), BUFFER_SIZE, fin2); 21 while (all1 > 0 && all2 > 0) {//歸並排序 22 if (buf1[i] < buf2[j]) { 23 buf0[k++] = buf1[i++]; all1--; 24 if (i == BUFFER_SIZE) {//如果緩沖區讀完了,就更新緩沖區,重置i 25 fread(buf1, sizeof(int), BUFFER_SIZE, fin1); 26 i = 0; 27 } 28 } else { 29 buf0[k++] = buf2[j++]; all2--; 30 if (j == BUFFER_SIZE) { 31 fread(buf2, sizeof(int), BUFFER_SIZE, fin2); 32 j = 0; 33 } 34 } 35 if (k == BUFFER_SIZE) {//如果緩沖區寫滿了,就全部寫入文件,重置k 36 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout); 37 k = 0; 38 } 39 } 40 while (all1 > 0) {//歸並流程-如果file1中還有剩余數據,直接追加輸出 41 buf0[k++] = buf1[i++]; all1--; 42 if (i == BUFFER_SIZE) { 43 fread(buf1, sizeof(int), BUFFER_SIZE, fin1); 44 i = 0; 45 } 46 if (k == BUFFER_SIZE) { 47 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout); 48 k = 0; 49 } 50 } 51 while (all2 > 0) {//歸並流程-如果file2中還有剩余數據,直接追加輸出 52 buf0[k++] = buf2[j++]; all2--; 53 if (j == BUFFER_SIZE) { 54 fread(buf2, sizeof(int), BUFFER_SIZE, fin2); 55 j = 0; 56 } 57 if (k == BUFFER_SIZE) { 58 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout); 59 k = 0; 60 } 61 } 62 fwrite(buf0, sizeof(int), k, fout);//寫入輸出文件 63 fclose(fin1); 64 fclose(fin2); 65 fclose(fout); 66 delete[] buf0; 67 delete[] buf1; 68 delete[] buf2; 69 });
在歸並操作中,一個非常重要的優化便是輸入輸出緩沖區。輸入緩沖即預先從文件一次性讀入BUFFER_SIZE
個數據,之后頻繁的讀取操作便從緩沖區獲取,當緩沖區讀完時(即i
, j
指向了 buf1
或buf2
的尾端),再一次性從文件中讀取 BUFFER_SIZE
個數據,重置i
或j
;輸出緩沖即將數據先寫入緩沖區,待緩沖區填滿后(即k
指向buf0
的尾端)再一次性寫入文件。緩沖區的設立,避免了頻繁的IO操作,配合多線程可以使讀寫磁盤速率達到最大,性能大幅提升。
merge_file
函數編寫完畢,我們需要遞歸調用它,直到所有的小分區都兩兩歸並完成。遞歸的過程可以並行處理(不過由於merge_file
函數本身已做並行化處理,此處串行遞歸亦可,效率不會太差):
1 string merge(int l, int r) { 2 if (l == r) return "temp\\part" + to_string(l) + ".dat"; 3 int mid = (l + r) / 2; 4 string file1, file2; 5 parallel_for(0, 2, [&](int x) {//此處使用串行遞歸亦可,不過效率略低一點 6 if (x == 0) file1 = merge(l, mid); 7 if (x == 1) file2 = merge(mid + 1, r); 8 }); 9 return merge_file(file1, file2); 10 }
整合:
1 #include <iostream> 2 #include <stdio.h> 3 #include <cstring> 4 #include <string> 5 #include <atomic> 6 #include <Windows.h> 7 #include <ppl.h> 8 #include <io.h> 9 #include <time.h> 10 #define MAX_THREADS 4//最多線程數量 11 using namespace std; 12 using namespace concurrency; 13 const int dx = 20;//並行快速排序的dx優化 14 const long long PARTITION_SIZE = 100000000;//分區大小 15 const long long BUFFER_SIZE = 10000000;//輸入輸出緩沖區大小 16 const long long EACH_NUM = (PARTITION_SIZE / MAX_THREADS); 17 atomic_int parts; 18 void parallel_qsort(int *begin, int *end) {//並行快速排序 19 if (begin >= end - 1) return; 20 int *key = rand() % (end - begin) + begin; 21 swap(*key, *begin); 22 int *i = begin, *j = begin; 23 for (key = begin; j < end; j++) { 24 if (*j < *key) { 25 i++; 26 swap(*i, *j); 27 } 28 } 29 swap(*begin, *i); 30 if (i - begin > dx && end - i > dx) {//dx優化 31 parallel_for(0, 2, [&](int x) { 32 if (x) parallel_qsort(begin, i); 33 else parallel_qsort(i + 1, end); 34 }); 35 } else { 36 parallel_qsort(begin, i); 37 parallel_qsort(i + 1, end); 38 } 39 } 40 void partition_and_sort(string in_file, long long n) { 41 int* arr = new int[PARTITION_SIZE]; 42 for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) {//准備建立第i個分區 43 int each_get[4] = {}, all_get = 0; 44 //並行讀取int數據至arr[]中,每個線程計划讀取EACH_NUM個,一共計划讀取PARTITION_SIZE個 45 //每個線程實際讀取each_get[x]個,實際一共讀取all_get個 46 clock_t start_time = clock(); 47 parallel_for(0, MAX_THREADS, [&](long long x) { 48 FILE* fin = fopen(in_file.c_str(), "rb"); 49 fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int); 50 if (fsetpos(fin, &pos) == 0) 51 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin); 52 fclose(fin); 53 }); 54 clock_t end_time = clock(); 55 for (int i = 0; i < MAX_THREADS; i++) all_get += each_get[i]; 56 cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". " 57 << "Time usage = " << end_time - start_time << "ms.\n"; 58 //對arr進行並行快速排序 59 start_time = clock(); 60 parallel_qsort(arr, arr + all_get); 61 end_time = clock(); 62 cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n"; 63 //創建分區文件並調整大小 64 string tmp_file = "temp\\part" + to_string(parts++) + ".dat"; 65 FILE *fout = fopen(tmp_file.c_str(), "wb"); 66 chsize(fileno(fout), all_get * sizeof(int)); 67 fclose(fout); 68 //並行將arr全部寫入分區文件 69 start_time = clock(); 70 parallel_for(0, MAX_THREADS, [&](long long x) { 71 FILE* fout = fopen(tmp_file.c_str(), "rb+"); 72 fpos_t pos = EACH_NUM * x * sizeof(int); 73 if (fsetpos(fout, &pos) == 0) 74 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout); 75 fclose(fout); 76 }); 77 end_time = clock(); 78 cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". " 79 << "Time usage = " << end_time - start_time << "ms.\n"; 80 } 81 delete[] arr; 82 } 83 string merge_file(string in_file1, string in_file2) {//將兩個文件歸並 84 string out_file = "temp\\part" + to_string(parts++) + ".dat"; 85 //首先獲取兩個文件的長度 86 FILE* fin1 = fopen(in_file1.c_str(), "rb"); 87 FILE* fin2 = fopen(in_file2.c_str(), "rb"); 88 clock_t seekpos_start_time = clock(); 89 fseek(fin1, 0, SEEK_END); 90 fseek(fin2, 0, SEEK_END); 91 fpos_t size1 = 0, size2 = 0; 92 fgetpos(fin1, &size1); 93 fgetpos(fin2, &size2); 94 long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4; 95 for (long long i = 1; i < MAX_THREADS; i++) {//找第i個樞軸 96 long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1; 97 int *get1 = new int; 98 while (r1 - l1 > 1) { 99 pos1 = (l1 + r1) / 2;//二分查找,先假定file1的樞軸 100 fpos = pos1 * sizeof(int); 101 fsetpos(fin1, &fpos); 102 fread(get1, sizeof(int), 1, fin1); 103 long long l2 = 0, r2 = n2; 104 int* get2 = new int; 105 while (r2 - l2 > 0) {//再用二分查找確定file2的樞軸 106 pos2 = (l2 + r2) / 2; 107 fpos = pos2 * sizeof(int); 108 fsetpos(fin2, &fpos); 109 fread(get2, sizeof(int), 1, fin2); 110 if (*get1 <= *get2) 111 r2 = pos2; 112 else 113 l2 = pos2 + 1; 114 } 115 delete get2; get2 = NULL; 116 pos2 = r2; 117 //如果這兩個樞軸將file1和file2划分的不夠均勻,則對pos1進行調整 118 if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i) 119 l1 = pos1 + 1; 120 else 121 r1 = pos1 - 1; 122 } 123 delete get1; get1 = NULL; 124 seek_pos[i][1] = pos1 * sizeof(int);//記錄file1樞軸位置 125 seek_pos[i][2] = pos2 * sizeof(int);//記錄file2樞軸位置 126 seek_pos[i][0] = seek_pos[i][1] + seek_pos[i][2];//輸出文件樞軸位置 127 } 128 fclose(fin1); fclose(fin2); 129 //邊界細節 130 seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0; 131 seek_pos[MAX_THREADS][1] = n1 * sizeof(int); 132 seek_pos[MAX_THREADS][2] = n2 * sizeof(int); 133 seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2]; 134 clock_t seekpos_end_time = clock(); 135 //調整輸出文件大小 136 clock_t chsize_start_time = clock(); 137 FILE *fout = fopen(out_file.c_str(), "wb"); 138 _chsize_s(fileno(fout), (n1 + n2) * sizeof(int)); 139 fclose(fout); 140 clock_t chsize_end_time = clock(); 141 //多線程歸並操作,每個線程只負責歸並對應的段組 142 clock_t merge_start_time = clock(); 143 parallel_for(0, MAX_THREADS, [&](int x) { 144 FILE* fin1 = fopen(in_file1.c_str(), "rb"); 145 FILE* fin2 = fopen(in_file2.c_str(), "rb"); 146 FILE* fout = fopen(out_file.c_str(), "rb+"); 147 //根據樞軸位置確定讀入起點 148 fsetpos(fin1, &seek_pos[x][1]); 149 fsetpos(fin2, &seek_pos[x][2]); 150 fsetpos(fout, &seek_pos[x][0]); 151 //輸入輸出緩沖,優化讀寫性能 152 int *buf0 = new int[BUFFER_SIZE]; 153 int *buf1 = new int[BUFFER_SIZE]; 154 int *buf2 = new int[BUFFER_SIZE]; 155 int i = 0, j = 0, k = 0;//i, j, k分別是buf1,buf2,buf0的數組下標 156 //該線程應該從file1中讀取all1個數據,從file2中讀取all2個數據 157 long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int); 158 long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int); 159 //先讀取到緩沖區 160 fread(buf1, sizeof(int), BUFFER_SIZE, fin1); 161 fread(buf2, sizeof(int), BUFFER_SIZE, fin2); 162 while (all1 > 0 && all2 > 0) {//歸並排序 163 if (buf1[i] < buf2[j]) { 164 buf0[k++] = buf1[i++]; all1--; 165 if (i == BUFFER_SIZE) {//如果緩沖區讀完了,就更新緩沖區,重置i 166 fread(buf1, sizeof(int), BUFFER_SIZE, fin1); 167 i = 0; 168 } 169 } else { 170 buf0[k++] = buf2[j++]; all2--; 171 if (j == BUFFER_SIZE) { 172 fread(buf2, sizeof(int), BUFFER_SIZE, fin2); 173 j = 0; 174 } 175 } 176 if (k == BUFFER_SIZE) {//如果緩沖區寫滿了,就全部寫入文件,重置k 177 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout); 178 k = 0; 179 } 180 } 181 while (all1 > 0) {//歸並流程-如果file1中還有剩余數據,直接追加輸出 182 buf0[k++] = buf1[i++]; all1--; 183 if (i == BUFFER_SIZE) { 184 fread(buf1, sizeof(int), BUFFER_SIZE, fin1); 185 i = 0; 186 } 187 if (k == BUFFER_SIZE) { 188 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout); 189 k = 0; 190 } 191 } 192 while (all2 > 0) {//歸並流程-如果file2中還有剩余數據,直接追加輸出 193 buf0[k++] = buf2[j++]; all2--; 194 if (j == BUFFER_SIZE) { 195 fread(buf2, sizeof(int), BUFFER_SIZE, fin2); 196 j = 0; 197 } 198 if (k == BUFFER_SIZE) { 199 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout); 200 k = 0; 201 } 202 } 203 fwrite(buf0, sizeof(int), k, fout);//寫入輸出文件 204 fclose(fin1); 205 fclose(fin2); 206 fclose(fout); 207 delete[] buf0; 208 delete[] buf1; 209 delete[] buf2; 210 }); 211 clock_t merge_end_time = clock(); 212 Sleep(100); 213 //輸入文件的數據已歸並至新文件中,不再需要,刪除。 214 system(("del " + in_file1).c_str()); 215 system(("del " + in_file2).c_str()); 216 cout << "\nPart \"" << in_file1 << "\" and \"" << in_file2 << "\" merged, " 217 << "result saved to \"" << out_file << "\".\n" 218 << "Time usage: seek_pos: " << seekpos_end_time - seekpos_start_time << "ms, " 219 << "chsize: " << chsize_end_time - chsize_start_time << "ms, " 220 << "parallel_merge: " << merge_end_time - merge_start_time << "ms.\n"; 221 return out_file; 222 } 223 string merge(int l, int r) {//遞歸歸並操作 224 if (l == r) return "temp\\part" + to_string(l) + ".dat"; 225 int mid = (l + r) / 2; 226 string file1, file2; 227 parallel_for(0, 2, [&](int x) {//此處使用串行遞歸亦可,不過效率略低一點 228 if (x == 0) file1 = merge(l, mid); 229 if (x == 1) file2 = merge(mid + 1, r); 230 }); 231 return merge_file(file1, file2); 232 } 233 int main() { 234 string in_file, out_file; 235 cout << "Enter data file name: "; 236 cin >> in_file; 237 FILE* fin = fopen(in_file.c_str(), "rb"); 238 if (fin == NULL) { 239 cout << "Could not open that file.\n"; 240 main(); 241 } 242 clock_t start_time = clock(); 243 //獲取輸入文件的大小 244 fseek(fin, 0, SEEK_END); 245 fpos_t pos = 0; 246 fgetpos(fin, &pos); 247 fclose(fin); 248 //創建臨時文件目錄 249 system("mkdir temp"); 250 //將待排序大文件分區,並對各個小分區進行快速排序 251 partition_and_sort(in_file, pos / 4); // pos是字節數,所以要/4統計int個數 252 cout << "\nStart merging...\n"; 253 //將各個小分區歸並 254 out_file = merge(0, parts - 1); 255 system(("move " + out_file + " ans.dat").c_str()); 256 system("rd temp"); 257 clock_t end_time = clock(); 258 cout << "External sorting complete, result saved to \"ans.dat\".\n" 259 << "Time usage = " << end_time - start_time << "ms.\n"; 260 system("pause"); 261 return 0; 262 }
windows下排序 1e10 個int
數據( 37.2GB ),耗時約 30min,經過驗證,結果正確。
總結
外排序的算法不難,單線程實現很簡單:分區階段便是讀取文本文件、排序、再寫到文本文件中;歸並階段也是簡單地用單線程讀寫文本文件,效率極低。后來,多線程讀寫二進制文件,快速排序和歸並排序也實現了高效的並行,整個程序幾乎全程並行處理,效率翻了幾番。最后便是一些細節的優化,比如原子操作,讀寫緩沖區等等。看似簡單的一個排序算法,其中包涵的知識竟是如此豐富。