問題一:一個文件含有5億行,每行是一個隨機整數,需要對該文件所有整數排序。
分治(Divide&Conquer),參考大數據算法:對5億數據進行排序
對這個一個500000000行的 total.txt 進行排序,該文件大小 4.6G。
每讀10000行就排序並寫入到一個新的子文件里(這里使用的是快速排序)。
1.分割 & 排序
#!/usr/bin/python2.7
import time
def readline_by_yield(bfile):
with open(bfile, 'r') as rf:
for line in rf:
yield line
def quick_sort(lst):
if len(lst) < 2:
return lst
pivot = lst[0]
left = [ ele for ele in lst[1:] if ele < pivot ]
right = [ ele for ele in lst[1:] if ele >= pivot ]
return quick_sort(left) + [pivot,] + quick_sort(right)
def split_bfile(bfile):
count = 0
nums = []
for line in readline_by_yield(bfile):
num = int(line)
if num not in nums:
nums.append(num)
if 10000 == len(nums):
nums = quick_sort(nums)
with open('subfile/subfile{}.txt'.format(count+1),'w') as wf:
wf.write('\n'.join([ str(i) for i in nums ]))
nums[:] = []
count += 1
print count
now = time.time()
split_bfile('total.txt')
run_t = time.time()-now
print 'Runtime : {}'.format(run_t)
會生成 50000 個小文件,每個小文件大小約在 96K左右。

程序在執行過程中,內存占用一直處在 5424kB 左右

整個文件分割完耗時 94146 秒。
2.合並
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
import os
import time
testdir = '/ssd/subfile'
now = time.time()
# Step 1 : 獲取全部文件描述符
fds = []
for f in os.listdir(testdir):
ff = os.path.join(testdir,f)
fds.append(open(ff,'r'))
# Step 2 : 每個文件獲取第一行,即當前文件最小值
nums = []
tmp_nums = []
for fd in fds:
num = int(fd.readline())
tmp_nums.append(num)
# Step 3 : 獲取當前最小值放入暫存區,並讀取對應文件的下一行;循環遍歷。
count = 0
while 1:
val = min(tmp_nums)
nums.append(val)
idx = tmp_nums.index(val)
next = fds[idx].readline()
# 文件讀完了
if not next:
del fds[idx]
del tmp_nums[idx]
else:
tmp_nums[idx] = int(next)
# 暫存區保存1000個數,一次性寫入硬盤,然后清空繼續讀。
if 1000 == len(nums):
with open('final_sorted.txt','a') as wf:
wf.write('\n'.join([ str(i) for i in nums ]) + '\n')
nums[:] = []
if 499999999 == count:
break
count += 1
with open('runtime.txt','w') as wf:
wf.write('Runtime : {}'.format(time.time()-now))
程序在執行過程中,內存占用一直處在 240M左右


跑了38個小時左右,才合並完不到5千萬行數據...
雖然降低了內存使用,但時間復雜度太高了;可以通過減少文件數(每個小文件存儲行數增加)來進一步降低內存使用。
問題二:一個文件有一千億行數據,每行是一個IP地址,需要對IP地址進行排序。
IP地址轉換成數字
# 方法一:手動計算
In [62]: ip
Out[62]: '10.3.81.150'
In [63]: ip.split('.')[::-1]
Out[63]: ['150', '81', '3', '10']
In [64]: [ '{}-{}'.format(idx,num) for idx,num in enumerate(ip.split('.')[::-1]) ]
Out[64]: ['0-150', '1-81', '2-3', '3-10']
In [65]: [256**idx*int(num) for idx,num in enumerate(ip.split('.')[::-1])]
Out[65]: [150, 20736, 196608, 167772160]
In [66]: sum([256**idx*int(num) for idx,num in enumerate(ip.split('.')[::-1])])
Out[66]: 167989654
In [67]:
# 方法二:使用C擴展庫來計算
In [71]: import socket,struct
In [72]: socket.inet_aton(ip)
Out[72]: b'\n\x03Q\x96'
In [73]: struct.unpack("!I", socket.inet_aton(ip)) # !表示使用網絡字節順序解析, 后面的I表示unsigned int, 對應Python里的integer or long
Out[73]: (167989654,)
In [74]: struct.unpack("!I", socket.inet_aton(ip))[0]
Out[74]: 167989654
In [75]: socket.inet_ntoa(struct.pack("!I", 167989654))
Out[75]: '10.3.81.150'
In [76]:
問題三:有一個1.3GB的文件(共一億行),里面每一行都是一個字符串,請在文件中找出重復次數最多的字符串。
基本思想:迭代讀大文件,把大文件拆分成多個小文件;最后再歸並這些小文件。
拆分的規則:
迭代讀大文件,內存中維護一個字典,key是字符串,value是該字符串出現的次數;
當字典維護的字符串種類達到10000(可自定義)的時候,把該字典按照key從小到大排序,然后寫入小文件,每行是 key\tvalue;
然后清空字典,繼續往下讀,直到大文件讀完。
歸並的規則:
首先獲取全部小文件的文件描述符,然后各自讀出第一行(即每個小文件字符串ascii值最小的字符串),進行比較。
找出ascii值最小的字符串,如果有重復的,這把各自出現的次數累加起來,然后把當前字符串和總次數存儲到內存中的一個列表。
然后把最小字符串所在的文件的讀指針向下移,即從對應小文件里再讀出一行進行下一輪比較。
當內存中的列表個數達到10000時,則一次性把該列表內容寫到一個最終文件里存儲到硬盤上。同時清空列表,進行之后的比較。
一直到讀完全部的小文件,那么最后得到的最終文件就是一個按照字符串ascii值升序排序的大的文件,每一行的內容就是 字符串\t重復次數,
最后迭代去讀這個最終文件,找出重復次數最多的即可。
1. 分割
def readline_by_yield(bfile):
with open(bfile, 'r') as rf:
for line in rf:
yield line
def split_bfile(bfile):
count = 0
d = {}
for line in readline_by_yield(bfile):
line = line.strip()
if line not in d:
d[line] = 0
d[line] += 1
if 10000 == len(d):
text = ''
for string in sorted(d):
text += '{}\t{}\n'.format(string,d[string])
with open('subfile/subfile{}.txt'.format(count+1),'w') as wf:
wf.write(text.strip())
d.clear()
count += 1
text = ''
for string in sorted(d):
text += '{}\t{}\n'.format(string,d[string])
with open('subfile/subfile_end.txt','w') as wf:
wf.write(text.strip())
split_bfile('bigfile.txt')
2. 歸並
import os
import json
import time
import traceback
testdir = '/ssd/subfile'
now = time.time()
# Step 1 : 獲取全部文件描述符
fds = []
for f in os.listdir(testdir):
ff = os.path.join(testdir,f)
fds.append(open(ff,'r'))
# Step 2 : 每個文件獲取第一行
tmp_strings = []
tmp_count = []
for fd in fds:
line = fd.readline()
string,count = line.strip().split('\t')
tmp_strings.append(string)
tmp_count.append(int(count))
# Step 3 : 獲取當前最小值放入暫存區,並讀取對應文件的下一行;循環遍歷。
result = []
need2del = []
while True:
min_str = min(tmp_strings)
str_idx = [i for i,v in enumerate(tmp_strings) if v==min_str]
str_count = sum([ int(tmp_count[idx]) for idx in str_idx ])
result.append('{}\t{}\n'.format(min_str,str_count))
for idx in str_idx:
next = fds[idx].readline() # IndexError: list index out of range
# 文件讀完了
if not next:
need2del.append(idx)
else:
next_string,next_count = next.strip().split('\t')
tmp_strings[idx] = next_string
tmp_count[idx] = next_count
# 暫存區保存10000個記錄,一次性寫入硬盤,然后清空繼續讀。
if 10000 == len(result):
with open('merged.txt','a') as wf:
wf.write(''.join(result))
result[:] = []
# 注意: 文件讀完需要刪除文件描述符的時候, 需要逆序刪除
need2del.reverse()
for idx in need2del:
del fds[idx]
del tmp_strings[idx]
del tmp_count[idx]
need2del[:] = []
if 0 == len(fds):
break
with open('merged.txt','a') as wf:
wf.write(''.join(result))
result[:] = []
歸並結果分析:
| 分割時內存中維護的字典大小 | 分割的小文件個數 | 歸並時需維護的文件描述符個數 | 歸並時內存占用 | 歸並耗時 | |
| 第一次 | 10000 | 9000 | 9000 ~ 0 | 200M | 歸並速度慢,暫未統計完成時間 |
| 第二次 | 100000 | 900 | 900 ~ 0 | 27M | 歸並速度快,只需2572秒 |
3. 查找出現次數最多的字符串及其次數
import time
def read_line(filepath):
with open(filepath,'r') as rf:
for line in rf:
yield line
start_ts = time.time()
max_str = None
max_count = 0
for line in read_line('merged.txt'):
string,count = line.strip().split('\t')
if int(count) > max_count:
max_count = int(count)
max_str = string
print(max_str,max_count)
print('Runtime {}'.format(time.time()-start_ts))
歸並后的文件共9999788行,大小是256M;執行查找耗時27秒,內存占用6480KB。
