python大數據工作流程


本文作者:hhh5460

 

大數據分析,內存不夠用怎么辦?

當然,你可以升級你的電腦為超級電腦。

另外,你也可以采用硬盤操作。

本文示范了硬盤操作的一種可能的方式。

 

本文基於:win10(64) + py3.5

 

本人電腦配置:4G內存

 

說明:

數據大小:5.6G

數據描述:自2010年以來,紐約的311投訴

數據來源:紐約開放數據官網(NYC's open data portal)

數據下載:https://data.cityofnewyork.us/api/views/erm2-nwe9/rows.csv?accessType=DOWNLOAD

import pandas as pd
import time

'''python大數據分析工作流程'''
# 5G大數據文件,csv格式
reader = pd.read_csv('311_Service_Requests_from_2010_to_Present.csv', iterator=True, encoding='utf-8')

# HDF5格式文件支持硬盤操作,不需要全部讀入內存
store = pd.HDFStore('311_Service_Requests_from_2010_to_Present.h5')

# 然后用迭代的方式轉換.csv格式為.h5格式
chunkSize = 100000
i = 0
while True:
    try:
        start = time.clock()
        
        # 從csv文件迭代讀取
        df = reader.get_chunk(chunkSize)
        
        # 去除列名中的空格
        df = df.rename(columns={c: c.replace(' ', '') for c in df.columns})
        
        # 轉換為日期時間格式
        df['CreatedDate'] = pd.to_datetime(df['CreatedDate'])
        df['ClosedDate'] = pd.to_datetime(df['ClosedDate'])

        # 感興趣的列
        columns = ['Agency', 'CreatedDate', 'ClosedDate', 'ComplaintType', 
                   'Descriptor', 'TimeToCompletion', 'City']
        # 不感興趣的列
        columns_for_drop = list(set(df.columns) - set(columns))
        df.drop(columns_for_drop, inplace=True, axis=1, errors='ignore')
        
        # 轉到h5文件
        # 通過指定data_columns,建立額外的索引器,可提升查詢速度
        store.append('df', df, data_columns = ['ComplaintType', 'Descriptor', 'Agency'])
        
        # 計時
        i += 1
        end = time.clock()
        print('{} 秒: completed {} rows'.format(end - start, i * chunksize))
    except StopIteration:
        print("Iteration is stopped.")
        break

        

# 轉換完成之后,就可以選出想要進行數據分析的行,將其從硬盤導入到內存,如:
# 導入前三行
#store.select('df', "index<3")

# 導入 ComplaintType, Descriptor, Agency這三列的前十行
#store.select('df', "index<10 & columns=['ComplaintType', 'Descriptor', 'Agency']")

# 導入 ComplaintType, Descriptor, Agency這三列中滿足Agency=='NYPD'的前十行
#store.select('df', "columns=['ComplaintType', 'Descriptor', 'Agency'] & Agency=='NYPD'").head(10)

# 導入 ComplaintType, Descriptor, Agency這三列中滿足Agency IN ('NYPD', 'DOB')的前十行
#store.select('df', "columns=['ComplaintType', 'Descriptor', 'Agency'] & Agency IN ('NYPD', 'DOB')")[:10]


# ======================================
# 下面示范一個groupby操作
# 說明:由於數據太大,遠超內存。因此無法全部導入內存。
# ======================================
# 硬盤操作:導入所有的 City 名稱
cities = store.select_column('df','City').unique()
print("\ngroups:%s" % cities)

# 循環讀取 city
groups = []
for city in cities:
    # 硬盤操作:按City名稱選取
    group = store.select('df', 'City=%s' % city)

    # 這里進行你想要的數據處理
    groups.append(group[['ComplaintType', 'Descriptor', 'Agency']].sum())


print("\nresult:\n%s" % pd.concat(groups, keys = cities))

# 最后,記得關閉
store.close()

 

 

附:

運行過程中出現了一個錯誤

 

把上面的:

# 轉到h5文件
# 通過指定data_columns,建立額外的索引器
store.append('df', df, data_columns = ['ComplaintType', 'Descriptor', 'Agency'])

 

改為:

# 轉到h5文件
# 通過指定data_columns,建立額外的索引器
# 通過指定min_itemsize,設定存儲混合類型長度
store.append('df', df, data_columns = ['ComplaintType', 'Descriptor', 'Agency'], min_itemsize = {'values': 50})

 關於min_itemsize詳情,見:http://pandas.pydata.org/pandas-docs/stable/io.html#storing-types

 

參考:

https://plot.ly/python/big-data-analytics-with-pandas-and-sqlite/

http://stackoverflow.com/questions/14262433/large-data-work-flows-using-pandas

http://python.jobbole.com/84118/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM