Python數據處理實戰


一、運行環境

1、python版本 2.7.13 博客代碼均是這個版本
2、系統環境:win7 64位系統

二、需求 對雜亂文本數據進行處理

部分數據截圖如下,第一個字段是原字段,后面3個是清洗出的字段,從數據庫中聚合字段觀察,乍一看數據比較規律,類似(幣種 金額 萬元)這樣,我想着用sql寫條件判斷,統一轉換為‘萬元人民幣’ 單位,用sql腳本進行字符串截取即可完成,但是后面發現數據並不規則,條件判斷太多清洗質量也不一定,有的前面不是左括號,有的字段里面沒有幣種,有的數字並不是整數,有的沒有萬字,這樣如果存儲成數字和‘萬元人民幣’單位兩個字段寫sql腳本復雜了,mysql我也沒找到能從文本中提取數字的函數,正則表達式常用於where條件中好像,如果誰知道mysql有類似從文本中過濾文本提取數字的函數,可以告訴我哈,這樣就不用費這么多功夫,用kettle一個工具即可,工具活學活用最好。

結合用python的經驗,python對字符串過濾有許多函數稍后代碼中就是用了這樣的辦法去過濾文本。


第一次部分清洗數據截圖

三、對數據處理的宏觀邏輯思考

拿到數據,先不要着急寫代碼,先思考清洗的邏輯,這點很關鍵,方向對了事半功倍,剩下的時間就是代碼實現邏輯和調試代碼的過程。

3.1思考過程 不寫代碼:

我想實現的最終的數據清洗是將資金字段換算成【金額+單位+各幣種】的組合形式或者【金額+單位+統一的人民幣幣種】(幣種進行匯率換算),分兩步或者三步都可以

3.1.1拆分出三個字段,數字,單位,幣種

(單元分為萬和不含萬,幣種分為人民幣和具體的外幣)

3.1.2將單位統一換為萬為單位

第一步中單位不是萬的 數字部分/10000,是萬的數字部分保持不變

3.1.3將幣種統一為人民幣

幣種是人民幣的前兩個字段都不變,不是的數字部分變為數字*各外幣兌換人民幣的匯率,單位不變依舊是第二步統一的‘萬’

3.2期望各步驟清洗效果 數據列舉:

從這個結果着手我們步步拆解,先梳理 清洗邏輯部分

3.2.1第一次清洗期望效果 拆分出三個字段 數字 單位 幣種:

①字段值=“2000元人民幣”,第一次清洗
2000 不含萬 人民幣
②字段值=“2000萬元人民幣”,第一次清洗
2000 萬 人民幣
③字段值=“2000萬元外幣”, 第一次清洗
2000 萬 外幣

3.2.2第二次清洗期望效果 將單位 統一歸為萬:

#二次處理條件 case when 單位=‘萬’ then 金額 else 金額/10000 end as 第二次金額

①字段值=“2000元人民幣”
0.2 萬 人民幣
②字段值=“2000萬元人民幣”
2000 萬 人民幣
③字段值=“2000萬元外幣”
2000 萬 外幣

注意:如果上面達到需求 則清洗完畢,如果想將單位換成人民幣就進行下面三次清洗

3.2.3第三次清洗期望效果:單位 幣種都統一為萬+人民幣

如果最后需求是換算成幣種統一人民幣,那么我們就在二次清洗后的基礎上再寫條件就好,

#三次處理條件 case when 幣種=‘人民幣’ then 金額 else 金額*幣種和人民幣的換算匯率 end as 第三次金額

①字段值=“2000元人民幣”
0.2 萬 人民幣
②字段值=“2000萬元人民幣”
2000 萬 人民幣
③字段值=“2000萬元外幣”
2000*外幣兌換人民幣匯率 萬 人民幣

四、對具體代碼的宏觀邏輯思考

幣種和單位這兩個就2種情況,很好寫

4.1、幣種部分

這個條件簡單,如果幣種的值在字符中出現就讓新字段等於這個幣種的值即可。

4.2、單位(萬為單位)

這個條件也簡單,萬字出現在字符中 單位這個變量=‘萬’ 沒出現就讓單位變量等於‘不含萬’,這樣寫是為了方便下一步對數字進行二次處理的時候寫條件判斷了。

4.3、數字部分 確保清洗后和原值邏輯上一樣 做些判斷

確保清洗后和原值邏輯上一樣意思是假如有這樣字段300.0100萬清洗后變成300.01 萬 人民幣 也是正確的。

filter(str.isdigit,字段的值)這個代碼我首先知道可以將文本中數字取出,同過對字段group by 聚合以后知道有小數點的字段,取出的值不再帶有小數點,如‘20.01萬’,filter(str.isdigit,‘20.01萬’)取出的數字就是2001,顯然這個數字是不正確,因此就需要考慮有無小數點的情況,有小數點的做到和原字段一樣

四、第一次清洗主要代碼,先不讀取數據庫數據

從數據庫中抽異常值10個左右做測試,info是regCapital字段的值

   #帶小數點的以小數點分割 取出小數點前后部分進行拼接 if '.' in info and int(filter(str.isdigit,info.split('.')[1]))>0: derive_regcapital=filter(str.isdigit,info.split('.')[0])+'.'+filter(str.isdigit,info.split('.')[1]) elif '.' in info and int(filter(str.isdigit,info.split('.')[1]))==0: derive_regcapital = filter(str.isdigit, info.split('.')[0]) elif filter(str.isdigit,info)=='': derive_regcapital='0' else: derive_regcapital=filter(str.isdigit,info) #單位 以萬和不含萬 為統一 if '萬' in info: derive_danwei='萬' else: derive_danwei='不含萬' #幣種 第一次清洗 外幣保留外幣字段 聚合大量數據 發現數據中含有外幣的情況大致有下面這些情況 如果有新外幣出現 進行數據的update操作即可 if '美元' in info: derive_currency='美元' elif '港幣' in info: derive_currency = '港幣' elif '阿富汗尼' in info: derive_currency = '阿富汗尼' elif '澳元' in info: derive_currency = '澳元' elif '英鎊' in info: derive_currency = '英鎊' elif '加拿大元' in info: derive_currency = '加拿大元' elif '日元' in info: derive_currency = '日元' elif '港幣' in info: derive_currency = '港幣' elif '法郎' in info: derive_currency = '法郎' elif '歐元' in info: derive_currency = '歐元' elif '新加坡' in info: derive_currency = '新加坡元' else: derive_currency = '人民幣'

五、全部代碼:讀取數據庫數據 進行全量清洗

第四步我是將部分數據做了測試,驗證代碼無誤,此時邏輯上應再從宏觀上再拓展,將info變量動態變為數據庫中所有的值,進行全量清洗

#coding:utf-8 from class_mysql import Mysql project=Mysql('s_58infor_data',[],0,conn_type='local') p2=Mysql('etl1_58infor_data',[],24,conn_type='local') field_list=p2.select_fields(db='local_db',table='etl1_58infor_data') print field_list project2=Mysql('etl1_58infor_data',field_list=field_list,field_num=26,conn_type='local') #以上部分 看不懂沒關系 由於我有兩套數據庫環境,測試和生產 #不同的數據庫連接和網段,因此要傳遞不同的參數進行切換數據庫和數據連接 如果一套環境 連接一次數據庫即可 數據處理需要經常做測試 方便自己調用 data_tuple=project.select(db='local_db',id=0) #data_tuple 是我實例化自己寫的操作數據庫的類對數據庫數據進行全字段進行讀取,返回值是一個不可變的對象元組tuple,清洗需要保留舊表全部字段,同時增加3個清洗后的數據字段 data_tuple=project.select(db='local_db',id=0) #遍歷元組 用字典去存儲每個字段的值 插入到增加3個清洗字段的表 etl1_58infor_data for data in data_tuple: item={} #old_data不取最后一個字段 是因為那個字段我想用當前處理的時間 #這樣可以計算數據總量運行的時間 來調整二次清洗的時間去和和kettle定時任務對接 #元組轉換為列表 轉換的原因是因為元組為不可變類型 如果有數據中有null值 遍歷轉換為字符串會報錯 old_data=list(data[:-1]) if data[-2]: if len(data[-2]) >0 : info=data[-2].encode('utf-8') else: info='' if '.' in info and int(filter(str.isdigit,info.split('.')[1]))>0: derive_regcapital=filter(str.isdigit,info.split('.')[0])+'.'+filter(str.isdigit,info.split('.')[1]) elif '.' in info and int(filter(str.isdigit,info.split('.')[1]))==0: derive_regcapital = filter(str.isdigit, info.split('.')[0]) elif filter(str.isdigit,info)=='': derive_regcapital='0' else: derive_regcapital=filter(str.isdigit,info) if '萬' in info: derive_danwei='萬' else: derive_danwei='不含萬' if '美元' in info: derive_currency='美元' elif '港幣' in info: derive_currency = '港幣' elif '阿富汗尼' in info: derive_currency = '阿富汗尼' elif '澳元' in info: derive_currency = '澳元' elif '英鎊' in info: derive_currency = '英鎊' elif '加拿大元' in info: derive_currency = '加拿大元' elif '日元' in info: derive_currency = '日元' elif '港幣' in info: derive_currency = '港幣' elif '法郎' in info: derive_currency = '法郎' elif '歐元' in info: derive_currency = '歐元' elif '新加坡' in info: derive_currency = '新加坡元' else: derive_currency = '人民幣' time_58infor_data = p2.create_time() old_data.append(time_58infor_data) old_data.append(derive_regcapital) old_data.append(derive_danwei) old_data.append(derive_currency) #print len(old_data) for i in range(len(old_data)): if not old_data[i] : old_data[i]='' else: pass data2=old_data[i].replace('"','') item[i+1]=data2 print item[1] #插入測試環境 的表 project2.insert(item=item,db='local_db')

六、代碼運行情況

6.1讀取數據庫原表數據和新表創建的字段


讀取數據庫原表數據和新表創建的字段

6.2 插入新表 並進行第一次數據清洗

紅框部分為清洗部分,其他數據做了脫敏處理


插入新表 並進行第一次數據清洗

6.3 數據表數據清洗結果


數據表數據清洗結果

七、增量數據處理

由於每天數據有增量進入,因此第一次執行完初始話之后,我們要根據表中的時間戳字段進行判斷,讀取昨日新的數據進行清洗插入,這部分留到下篇博客。
初步計划用下面函數 作為參數 判斷增量 create_time 是爬蟲腳本執行時候寫入的時間,yesterday是昨日時間,在where條件里加以限制,取出昨天進入數據庫的數據 進行執行 win7系統支持定時任務

import datetime from datetime import datetime as dt #%進行轉義使用%%來轉義 #主要構造sql中條件“where create_time like %s%%“ % yesterday #寫入腳本運行的當前時間 def create_time(self): create_time = dt.now().strftime('%Y-%m-%d %H:%M:%S') return create_time def yesterday(self): yestoday= datetime.date.today()-datetime.timedelta(days=1) return yestoday


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM