轉自:http://netsmell.com/post/how-sort-10-billion-data.html?ref=myread
海量數據處理/外部歸並排序 - 分治.cppp
今天要給100億個數字排序,100億個 int 型數字放在文件里面大概有 37.2GB,非常大,內存一次裝不下了。那么肯定是要拆分成小的文件一個一個來處理,最終在合並成一個排好序的大文件。
實現思路
1.把這個37GB的大文件,用哈希分成1000個小文件,每個小文件平均38MB左右(理想情況),把100億個數字對1000取模,模出來的結果在0到999之間,每個結果對應一個文件,所以我這里取的哈希函數是 h = x % 1000,哈希函數取得”好”,能使沖突減小,結果分布均勻。
2.拆分完了之后,得到一些幾十MB的小文件,那么就可以放進內存里排序了,可以用快速排序,歸並排序,堆排序等等。
3.1000個小文件內部排好序之后,就要把這些內部有序的小文件,合並成一個大的文件,可以用二叉堆來做1000路合並的操作,每個小文件是一路,合並后的大文件仍然有序。
首先遍歷1000個文件,每個文件里面取第一個數字,組成 (數字, 文件號) 這樣的組合加入到堆里(假設是從小到大排序,用小頂堆),遍歷完后堆里有1000個 (數字,文件號) 這樣的元素
然后不斷從堆頂拿元素出來,每拿出一個元素,把它的文件號讀取出來,然后去對應的文件里,加一個元素進入堆,直到那個文件被讀取完。拿出來的元素當然追加到最終結果的文件里。
按照上面的操作,直到堆被取空了,此時最終結果文件里的全部數字就是有序的了。
最后我用c++寫了個實驗程序,具體代碼在這里可以看到。
如何拆分大文件?
一個32G的大文件,用fopen()打開不會全部加載到內存的,然后for循環遍歷啊,把每個數字對1000取模,會得到0到999種結果,然后每種結果在寫入到新的文件中,就拆分了
| // 對 2 億個數字進行排序, 約 10 G 的文件, 每個數字 int 能表示 | ||
| 3 | // 算法流程 | |
| 4 | // 將 10 G 的文件散列到 300 個文件中, 每個文件大約 35 MB | |
| 5 | // 對 35 MB 的小文件內部排序, 或者分發到多台計算機中, 並行處理 MapReduce | |
| 6 | // 最后使用最小堆, 進行 300 路歸並排序, 合成大文件 | |
| 7 | // 再寫一個算法判斷 2 億個數字是否有序 | |
| 8 | ||
| 9 | #include <stdio.h> | |
| 10 | #include <stdlib.h> | |
| 11 | #include <time.h> | |
| 12 | #include <io.h> | |
| 13 | #include <queue> | |
| 14 | ||
| 15 | #define FILE_NUM 300 // 哈希文件數 | |
| 16 | #define HASH(a) (a % FILE_NUM) | |
| 17 | ||
| 18 | int num = 6000000; // 2 億個數字, 手動改 | |
| 19 | char path[20] = "c:\\data.dat"; // 待排文件 | |
| 20 | char result[20] = "c:\\result.dat"; // 排序后文件 | |
| 21 | char tmpdir[100] = "c:\\hashfile"; // 臨時目錄 | |
| 22 | ||
| 23 | // 隨機生成 2 億個數字 | |
| 24 | int write_file(void) | |
| 25 | { | |
| 26 | FILE *out = NULL; | |
| 27 | int i; | |
| 28 | ||
| 29 | printf("\n正在生成 %d 個數字...\n\n", num); | |
| 30 | out = fopen(path, "wt"); | |
| 31 | if (out == NULL) return 0; | |
| 32 | ||
| 33 | unsigned int s, e; | |
| 34 | e = s = clock(); | |
| 35 | for (i=0; i<num; i++) | |
| 36 | { | |
| 37 | e = clock(); | |
| 38 | if (e - s > 1000) // 計算進度 | |
| 39 | { | |
| 40 | printf("\r處理進度 %0.2f %%\t", (i * 100.0) / num); | |
| 41 | s = e; | |
| 42 | } | |
| 43 | fprintf(out, "%d\n", | |
| 44 | (rand() % 31623) * (rand() % 31623)); | |
| 45 | } | |
| 46 | fclose(out); | |
| 47 | return 1; | |
| 48 | } | |
| 49 | ||
| 50 | // 對 2 億個數字進行哈希, 分散到子文件中 | |
| 51 | // 入口參數: path, tmpdir | |
| 52 | int map(void) | |
| 53 | { | |
| 54 | FILE *in = NULL; | |
| 55 | FILE *tmp[FILE_NUM + 5]; | |
| 56 | char hashfile[512]; // 哈希文件地址 | |
| 57 | int data, add; | |
| 58 | int i; | |
| 59 | ||
| 60 | printf("\r正在哈希 %s\n\n", path); | |
| 61 | in = fopen(path, "rt"); | |
| 62 | if (in == NULL) return 0; | |
| 63 | for (i=0; i<FILE_NUM; i++) tmp[i] = NULL; | |
| 64 | ||
| 65 | // 開始哈希, 核心代碼要盡可能的加速 | |
| 66 | unsigned int s, e; | |
| 67 | e = s = clock(); | |
| 68 | i = 0; | |
| 69 | while (fscanf(in, "%d", &data) != EOF) | |
| 70 | { | |
| 71 | add = HASH(data); | |
| 72 | if (tmp[add] == NULL) | |
| 73 | { | |
| 74 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, add); | |
| 75 | tmp[add] = fopen(hashfile, "a"); | |
| 76 | } | |
| 77 | fprintf(tmp[add], "%d\n", data); | |
| 78 | ||
| 79 | i++; | |
| 80 | e = clock(); // 計算進度 | |
| 81 | if (e - s > 1000) | |
| 82 | { | |
| 83 | printf("\r處理進度 %0.2f %%\t", (i * 100.0) / num); | |
| 84 | s = e; | |
| 85 | } | |
| 86 | } | |
| 87 | for (i=0; i<FILE_NUM; i++) | |
| 88 | if (tmp[i]) fclose(tmp[i]); | |
| 89 | fclose(in); | |
| 90 | ||
| 91 | return 1; | |
| 92 | } | |
| 93 | ||
| 94 | // 對 300 個文件逐個排序, 采用堆排序 STL 的優先隊列 | |
| 95 | void calc(void) | |
| 96 | { | |
| 97 | int fileexist(char *path); // 判斷文件存在 | |
| 98 | std::priority_queue<int> q; // 堆排序 | |
| 99 | char hashfile[512]; | |
| 100 | FILE *fp = NULL; | |
| 101 | int i, data; | |
| 102 | ||
| 103 | // 逐個處理 300 個文件, 或者將這些文件發送到其它計算機中並行處理 | |
| 104 | for (i=0; i<FILE_NUM; i++) | |
| 105 | { | |
| 106 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i); | |
| 107 | if (fileexist(hashfile)) | |
| 108 | { | |
| 109 | printf("\r正在排序 hash_%d.~tmp\t", i); | |
| 110 | ||
| 111 | // 小文件從磁盤加入內存中 | |
| 112 | fp = fopen(hashfile, "rt"); | |
| 113 | while (fscanf(fp, "%d", &data) != EOF) | |
| 114 | { | |
| 115 | q.push(data); | |
| 116 | // 優先隊列默認是大頂堆, 即降序排序 | |
| 117 | // 要升序需要重載 () 運算符 | |
| 118 | } | |
| 119 | fclose(fp); | |
| 120 | ||
| 121 | // 排序后再從內存寫回磁盤 | |
| 122 | fp = fopen(hashfile, "wt"); // 覆蓋模式寫 | |
| 123 | while (!q.empty()) | |
| 124 | { | |
| 125 | fprintf(fp, "%d\n", q.top()); | |
| 126 | q.pop(); | |
| 127 | } | |
| 128 | fclose(fp); | |
| 129 | } | |
| 130 | } | |
| 131 | } | |
| 132 | ||
| 133 | typedef struct node // 隊列結點 | |
| 134 | { | |
| 135 | int data; | |
| 136 | int id; // 哈希文件的編號 | |
| 137 | bool operator < (const node &a) const | |
| 138 | { return data < a.data; } | |
| 139 | }node; | |
| 140 | ||
| 141 | // 將 300 個有序文件合並成一個文件, K 路歸並排序 | |
| 142 | int reduce(void) | |
| 143 | { | |
| 144 | int fileexist(char *path); | |
| 145 | std::priority_queue<node> q; // 堆排序 | |
| 146 | FILE *file[FILE_NUM + 5]; | |
| 147 | FILE *out = NULL; | |
| 148 | char hashfile[512]; | |
| 149 | node tmp, p; | |
| 150 | int i, count = 0; | |
| 151 | ||
| 152 | printf("\r正在合並 %s\n\n", result); | |
| 153 | out = fopen(result, "wt"); | |
| 154 | if (out == NULL) return 0; | |
| 155 | for (i=0; i<FILE_NUM; i++) file[i] = NULL; | |
| 156 | for (i=0; i<FILE_NUM; i++) // 打開全部哈希文件 | |
| 157 | { | |
| 158 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i); | |
| 159 | if (fileexist(hashfile)) | |
| 160 | { | |
| 161 | file[i] = fopen(hashfile, "rt"); | |
| 162 | fscanf(file[i], "%d", &tmp.data); | |
| 163 | tmp.id = i; | |
| 164 | q.push(tmp); // 初始化隊列 | |
| 165 | count++; // 計數器 | |
| 166 | printf("\r入隊進度 %0.2f %%\t", (count * 100.0) / FILE_NUM); | |
| 167 | } | |
| 168 | } | |
| 169 | unsigned int s, e; | |
| 170 | e = s = clock(); | |
| 171 | while (!q.empty()) // 開始 K 路歸並 | |
| 172 | { | |
| 173 | tmp = q.top(); | |
| 174 | q.pop(); | |
| 175 | // 將堆頂的元素寫回磁盤, 再從磁盤中拿一個到內存 | |
| 176 | fprintf(out, "%d\n", tmp.data); | |
| 177 | if (fscanf(file[tmp.id], "%d", &p.data) != EOF) | |
| 178 | { | |
| 179 | p.id = tmp.id; | |
| 180 | q.push(p); | |
| 181 | count++; | |
| 182 | } | |
| 183 | ||
| 184 | e = clock(); // 計算進度 | |
| 185 | if (e - s > 1000) | |
| 186 | { | |
| 187 | printf("\r處理進度 %0.2f %%\t", (count * 100.0) / num); | |
| 188 | s = e; | |
| 189 | } | |
| 190 | } | |
| 191 | for (i=0; i<FILE_NUM; i++) | |
| 192 | if (file[i]) fclose(file[i]); | |
| 193 | fclose(out); | |
| 194 | ||
| 195 | return 1; | |
| 196 | } | |
| 197 | ||
| 198 | int check(void) // 檢查是否降序排序 | |
| 199 | { | |
| 200 | FILE *in = NULL; | |
| 201 | int max = 0x7FFFFFFF; | |
| 202 | int data; | |
| 203 | int count = 0; | |
| 204 | ||
| 205 | printf("\r正在檢查文件正確性...\n\n"); | |
| 206 | in = fopen(result, "rt"); | |
| 207 | if (in == NULL) return 0; | |
| 208 | ||
| 209 | unsigned int s, e; | |
| 210 | e = s = clock(); | |
| 211 | while (fscanf(in, "%d", &data) != EOF) | |
| 212 | { | |
| 213 | if (data <= max) max = data; | |
| 214 | else | |
| 215 | { | |
| 216 | fclose(in); | |
| 217 | return 0; | |
| 218 | } | |
| 219 | count++; | |
| 220 | e = clock(); // 計算進度 | |
| 221 | if (e - s > 1000) | |
| 222 | { | |
| 223 | printf("\r處理進度 %0.2f %%\t", (count * 100.0) / num); | |
| 224 | s = e; | |
| 225 | } | |
| 226 | } | |
| 227 | fclose(in); | |
| 228 | return 1; | |
| 229 | } | |
| 230 | ||
| 231 | // 判斷文件存在 | |
| 232 | int fileexist(char *path) | |
| 233 | { | |
| 234 | FILE *fp = NULL; | |
| 235 | ||
| 236 | fp = fopen(path, "rt"); | |
| 237 | if (fp) | |
| 238 | { | |
| 239 | fclose(fp); | |
| 240 | return 1; | |
| 241 | } | |
| 242 | else return 0; | |
| 243 | } | |
| 244 | ||
| 245 | int main(void) | |
| 246 | { | |
| 247 | char cmd_del[200]; // 刪除目錄 | |
| 248 | char cmd_att[200]; // 設置隱藏 | |
| 249 | char cmd_mkdir[200]; // 建立目錄 | |
| 250 | ||
| 251 | // 初始化 cmd 命令, 建立工作目錄 | |
| 252 | sprintf(cmd_del, "rmdir /s /q %s", tmpdir); | |
| 253 | sprintf(cmd_att, "attrib +h %s", tmpdir); | |
| 254 | sprintf(cmd_mkdir, "mkdir %s", tmpdir); | |
| 255 | if (access(path, 0) == 0) system(cmd_del); | |
| 256 | system(cmd_mkdir); // 建立工作目錄 | |
| 257 | system(cmd_att); // 隱藏目錄 | |
| 258 | ||
| 259 | // 隨機生成 2 億個數字 | |
| 260 | if (!write_file()) return 0; | |
| 261 | ||
| 262 | map(); // 對 2 億個數字進行哈希, 即 Map | |
| 263 | calc(); // 對 300 個文件逐個排序 | |
| 264 | reduce(); // 最后將 300 個有序文件合並成一個文件, 即 reduce | |
| 265 | if (check()) printf("\r排序正確!\t\t\t\n\n"); | |
| 266 | else printf("\r排序錯誤!\t\t\t\n\n"); | |
| 267 | ||
| 268 | system(cmd_del); // 刪除哈希文件 | |
| 269 | remove(path); // 刪除 2 億數字文件 | |
| 270 | remove(result); // 刪除排序后的文件 | |
| 271 | ||
| 272 | return 0; | |
| 273 | } |
