1.需求
在數據清洗(ETL),日志文件分析,分隔符信息提取時,我們都會遇到如下常見的文本數據:
- 中樓層/14層,東西,西直門南大街 3號院,1985年建,板樓
- 中樓層/23層,南北,通惠南路6號,2003年建,板樓
- 中樓層/12層,南北,通惠南路6號 1號院,2003年建,塔樓-
一個常見的處理思路,是按照分隔符,對文本進行切割。對於上面的文本,可以采用/,兩種符號來切割。變成如下的表格樣式,之后進行數據處理便非常容易了。
所以我開玩笑的說,一門語言中split函數可能是用的最多的。在文本處理中會遇到大量的這種需求,但數據格式多變,總會有大量精力耗費在這類重復的工作上。於是很自然的會期待是否有一種自動算法,能幫助我們自動分割字符串?
假設已經為我們提供了一批文本,構成一個字符串數組。我們的任務分為兩步:
編譯(發現內部的分隔符和模式)
def Compile(self,datas):
發現其中的:
- 分隔符
- 相同的文本串(如上面的'樓層')
- 不同的文本串(如'14','12')
分割(根據發現的模式分割)
def Split(self,text,splitgroup,isSameOut=True):
這個函數可以設定是否輸出相同項。
2.一種簡單的思路
從直覺上來說,逗號,空格,分號,冒號這類符號是最有可能出現的分隔符。一種朴素的想法是:
列出可能是分隔符的所有符號,絕大多數非字母的ascii碼,都可以列入
統計所有符號在每一行中出現的次數,構成一個數組字典,例如上面的例子
python ',' : [3,3,3] '/' : [1,1,1] ' ' : [1,0,1]
求取每個數組的方差,如果滿足小於特定的閾值,則可認為是一個分隔符
我們不能嚴格的認定,只有數組的元素全部一樣才是分隔符:因為總會出現特殊情況,應當允許特殊情況的發生。方差的閾值,應通過參數傳入。我一般將其定為0.1
將所有滿足方差小於閾值的分隔符提取出來。對上面的例子,分隔符應該是 ,和/
對樣例數據進行預分割,分割之后,我們會發現:
標注相同的,是指該列所有的數據都是同一內容,否則為不一樣內容。
最后,可以看出,該數據集可以按照斜杠和逗號進行分割。分割的第一項是相同項,可以選擇不輸出。
這種方法思路很簡單,但是非常適合由計算機生成的網絡數據,這些數據通常都有明確的格式,分隔符固定,因此速度較快,而且性能卓越。
全部代碼如下:
import re; from asq.initiators import query def GetVariance(data): sum1 = 0.0 sum2 = 0.0 l = len(data); for i in range(l): sum1 += data[i] sum2 += data[i] ** 2 mean = sum1 / l var = sum2 / l - mean ** 2 return var; def GetMaxSameCount(datas): dic = {}; for t in datas: if t in dic: dic[t] += 1; else: dic[t] = 1; if len(dic) == 0: return 0; maxkey, maxvalue = None, -1; for key in dic: if dic[key] > maxvalue: maxvalue = dic[key]; maxkey = key; return (maxkey, maxvalue); class SplitType: (ENTITY, SPLIT, SAMECONTENT, DIFFCONTENT) = range(4) class SplitItem(object): def __init__(self): self.SplitType = None; self.Name = None; self.Value = None self.Index = 0; self.IsRepeat = False; class SplitGroup(object): def __init__(self): self.SplitChars = {}; # dict,key:char, value:charmaxcount self.SplitItems = []; class Spliter(object): def __init__(self): self.MatchRatio = 0.8 self.ModeCheckRatio = 0.3; self.MaxVariance = 3; self.spliter2 = u' \r\n\t./_"\',;():|[]{}。:;' self.spliter3 = re.compile(r'[a-zA-Z0-9\u4e00-\u9fa5\u3040-\u309f\u30a0-\u30ff]') self.spliterdict = [self.spliter2, self.spliter3]; def GetCharCount(self, string, char): count = 0; for c in string: if c == char: count += 1; return count; def Compile(self, datas): splititems = []; splitchars = []; maps = {}; datalen = len(datas); for data in datas: if data == None or data == '': continue; for splitchar in self.spliter2: charcount = self.GetCharCount(data, splitchar) if charcount == 0: continue; count = maps.get(splitchar, None); if count == None: maps[splitchar] = [charcount]; else: maps[splitchar].append(charcount); # select real splitchars for text in maps: map = maps[text]; if len(map) < datalen / 2: continue charcount = GetVariance(map); maxkey, maxvalue = GetMaxSameCount(map); if charcount < self.MaxVariance: splitchars.append(text) splitGroup = SplitGroup(); results = []; modedict = []; for data in datas: splitResult = self.Split(data, splitchars); results.append(splitResult); qresults = query(results); maxlen = qresults.max(lambda x: len(x)); samevalues = []; for i in range(0, maxlen): splititem = SplitItem(); splititem.Index = i; values = []; for splitResult in results: if i < len(splitResult): if splititem.SplitType == None and splitResult[i] in splitchars: splititem.SplitType = SplitType.SPLIT; splititem.Value = splitResult[i]; values.append(splitResult[i]); if splititem.SplitType == None: text, value = GetMaxSameCount(values) if value > len(values) * self.MatchRatio: splititem.SplitType = SplitType.SAMECONTENT; splititem.Value = text; if text in samevalues: splititem.IsRepeat = True; else: samevalues.append(text); else: splititem.SplitType = SplitType.DIFFCONTENT; splititems.append(splititem) splitGroup.SplitChars = splitchars; splitGroup.SplitItems = splititems; # post process return splitGroup; def SplitWithGroup(self, text, splitgroup, isSameOut=True, issplitOut=False): results = self.Split(text, splitgroup.SplitChars); splitIndex = 0; for r in results: currp = splitgroup.SplitItems[splitIndex]; if r in splitgroup.SplitChars: while splitgroup.SplitItems[splitIndex].Value != r: splitIndex += 1; if splitIndex == len(splitgroup.SplitItems): return; if issplitOut == False: splitIndex += 1; continue; splitIndex += 1; if currp.SplitType == SplitType.SAMECONTENT: if isSameOut == False: continue; yield r; def Split(self, data, splits): # 連續的分隔符會被合並? if data is None: return None; if len(splits) == 0: return [data]; last = -1; splititems = []; l = len(data); for i in range(0, l): r = data[i]; if r not in splits: continue; else: if i > 0 and i > last + 1: splititems.append(data[last + 1:i]); splititems.append(r); last = i if last + 1 < len(data): splititems.append(data[last + 1:]); return splititems; if __name__ == '__main__': sp = Spliter(); spgroups = sp.Compile(['中樓層/14層,東西,西直門南大街 3號院,1985年建,板樓' , '中樓層/23層,南北,通惠南路6號,2003年建,板樓', '中樓層/12層,南北,通惠南路6號 1號院,2003年建,塔樓']) for r in sp.SplitWithGroup(u"低樓層/14層,東西,太陽宮中路太陽宮大廈,2003年建,板樓", spgroups): print(r)
sp是分割器實例,對文本數組編譯后獲得了spgroups,這個數據結構存儲了分割所需的信息。之后使用SplitWithGroup方法,即可對文本進行分割,返回的是一個生成器。該函數的一系列參數可以指定是否輸出相同項:
3. 其他可能的方法
文本對齊算法
上文算法雖然簡單,但卻不能處理所有的情況。以下情況就很難處理:
- 樣本數量非常少
- 分隔符數量不一致
- 上文方法可能會帶來嚴重的錯誤。
采用對齊的思路進行分析。對齊的意思是很容易理解的,但如何讓計算機認為是對齊呢?這就依賴於字符串的編輯距離。
對齊算法有大量可供參考的源代碼和論文,由於不是作者親自實現的,因此就不多做介紹。這種算法使用了動態規划技術,相對簡單,但內存占用量是巨大的,如果字符串長度很長,那么將會開辟很大的空間進行計算。另外,它是兩兩計算的,如果給出的是一組數據,如何合並最后結果也是比較復雜的問題。
4.歧義性
分割不一定是好的,這要看所考慮的問題的環境。例如12:03是一個時間,如果直接按照冒號分割,之后的算法就再也無法判斷是不是時間了。分割造成了原有語境的丟失。
解決這個問題的思路有兩種:
軟分割
與其糾結於要不要分割,倒不如換一種角度考慮問題。樹結構給了很好地啟發,在一個層級上看,它們是一個字段,但看得更深一些,則是兩個字段。因此,12:03被划分為兩個子節點12和03,但從父節點上來看,卻依舊是12:03。
樹結構比列表帶來了更強大的表現能力,具體可參考《用樹結構描述和計算數據》一文。
自動確定層級關系
考慮下面的數據:
LGA,MBA,12:03,15:04,165
MBA,LGA,18:30,22:40,174
可以認定,分隔符有:和, 但哪個符號的優先級更高呢?我們當然知道是冒號,但計算機如何知道?
一種基本可行的方案,是確定分割后,不同子項的模式是否相似。
如果優先按逗號分割,則有\w+,一個\d+,兩個dd:dd的模式
但如優先按冒號,則會變成 \w+,\w+,\d{2} 以及\d{2} \d{2},\d{3}
顯然,按照逗號分割,子項的模式相似度更高。優先使用逗號分隔。這樣,就能轉換為一顆樹。
與之而來的問題,是如何確定不同子項的模式相似度更高?
我對此的定義是,不同子項內部的分隔符的數量和排布相似性更高。使用歐拉距離即可計算。不過,這種基於相似性的策略,總有可能出錯的情況。
5. 總結
經過筆者近半年的試用,這種算法在處理網頁數據時非常方便,能夠輕松地對不同文本進行切分。當然算法比較簡單,用的也是最朴素的“方差”來評估分隔符的可能性。如果有更好的辦法,歡迎留言。