文本自動分割算法


1.需求

在數據清洗(ETL),日志文件分析,分隔符信息提取時,我們都會遇到如下常見的文本數據:

  • 中樓層/14層,東西,西直門南大街 3號院,1985年建,板樓
  • 中樓層/23層,南北,通惠南路6號,2003年建,板樓
  • 中樓層/12層,南北,通惠南路6號 1號院,2003年建,塔樓-

一個常見的處理思路,是按照分隔符,對文本進行切割。對於上面的文本,可以采用/,兩種符號來切割。變成如下的表格樣式,之后進行數據處理便非常容易了。

image

所以我開玩笑的說,一門語言中split函數可能是用的最多的。在文本處理中會遇到大量的這種需求,但數據格式多變,總會有大量精力耗費在這類重復的工作上。於是很自然的會期待是否有一種自動算法,能幫助我們自動分割字符串
假設已經為我們提供了一批文本,構成一個字符串數組。我們的任務分為兩步:

編譯(發現內部的分隔符和模式)

def Compile(self,datas):
發現其中的:

  • 分隔符
  • 相同的文本串(如上面的'樓層')
  • 不同的文本串(如'14','12')
分割(根據發現的模式分割)

def Split(self,text,splitgroup,isSameOut=True):
這個函數可以設定是否輸出相同項。

2.一種簡單的思路

從直覺上來說,逗號,空格,分號,冒號這類符號是最有可能出現的分隔符。一種朴素的想法是:
列出可能是分隔符的所有符號,絕大多數非字母的ascii碼,都可以列入
統計所有符號在每一行中出現的次數,構成一個數組字典,例如上面的例子

python 
',' : [3,3,3] 
'/' : [1,1,1] 
' ' : [1,0,1]

求取每個數組的方差,如果滿足小於特定的閾值,則可認為是一個分隔符

我們不能嚴格的認定,只有數組的元素全部一樣才是分隔符:因為總會出現特殊情況,應當允許特殊情況的發生。方差的閾值,應通過參數傳入。我一般將其定為0.1
將所有滿足方差小於閾值的分隔符提取出來。對上面的例子,分隔符應該是 ,和/

對樣例數據進行預分割,分割之后,我們會發現:

image

標注相同的,是指該列所有的數據都是同一內容,否則為不一樣內容。

最后,可以看出,該數據集可以按照斜杠和逗號進行分割。分割的第一項是相同項,可以選擇不輸出。
這種方法思路很簡單,但是非常適合由計算機生成的網絡數據,這些數據通常都有明確的格式,分隔符固定,因此速度較快,而且性能卓越。

全部代碼如下:

import re;

from asq.initiators import query


def GetVariance(data):
    sum1 = 0.0
    sum2 = 0.0
    l = len(data);
    for i in range(l):
        sum1 += data[i]
        sum2 += data[i] ** 2
    mean = sum1 / l
    var = sum2 / l - mean ** 2
    return var;


def GetMaxSameCount(datas):
    dic = {};
    for t in datas:
        if t in dic:
            dic[t] += 1;
        else:
            dic[t] = 1;
    if len(dic) == 0:
        return 0;
    maxkey, maxvalue = None, -1;
    for key in dic:
        if dic[key] > maxvalue:
            maxvalue = dic[key];
            maxkey = key;
    return (maxkey, maxvalue);


class SplitType:
    (ENTITY, SPLIT, SAMECONTENT, DIFFCONTENT) = range(4)


class SplitItem(object):
    def __init__(self):
        self.SplitType = None;
        self.Name = None;
        self.Value = None
        self.Index = 0;
        self.IsRepeat = False;


class SplitGroup(object):
    def __init__(self):
        self.SplitChars = {};  # dict,key:char, value:charmaxcount
        self.SplitItems = [];


class Spliter(object):
    def __init__(self):
        self.MatchRatio = 0.8
        self.ModeCheckRatio = 0.3;
        self.MaxVariance = 3;
        self.spliter2 = u' \r\n\t./_"\',;():|[]{}。:;'
        self.spliter3 = re.compile(r'[a-zA-Z0-9\u4e00-\u9fa5\u3040-\u309f\u30a0-\u30ff]')
        self.spliterdict = [self.spliter2, self.spliter3];

    def GetCharCount(self, string, char):
        count = 0;
        for c in string:
            if c == char:
                count += 1;
        return count;

    def Compile(self, datas):
        splititems = [];
        splitchars = [];
        maps = {};
        datalen = len(datas);
        for data in datas:
            if data == None or data == '':
                continue;
            for splitchar in self.spliter2:
                charcount = self.GetCharCount(data, splitchar)
                if charcount == 0:
                    continue;
                count = maps.get(splitchar, None);
                if count == None:
                    maps[splitchar] = [charcount];
                else:
                    maps[splitchar].append(charcount);
        # select real splitchars
        for text in maps:
            map = maps[text];
            if len(map) < datalen / 2:
                continue
            charcount = GetVariance(map);
            maxkey, maxvalue = GetMaxSameCount(map);
            if charcount < self.MaxVariance:
                splitchars.append(text)
        splitGroup = SplitGroup();
        results = [];
        modedict = [];
        for data in datas:
            splitResult = self.Split(data, splitchars);
            results.append(splitResult);

        qresults = query(results);
        maxlen = qresults.max(lambda x: len(x));
        samevalues = [];
        for i in range(0, maxlen):
            splititem = SplitItem();
            splititem.Index = i;
            values = [];
            for splitResult in results:
                if i < len(splitResult):
                    if splititem.SplitType == None and splitResult[i] in splitchars:
                        splititem.SplitType = SplitType.SPLIT;
                        splititem.Value = splitResult[i];
                    values.append(splitResult[i]);

            if splititem.SplitType == None:
                text, value = GetMaxSameCount(values)
                if value > len(values) * self.MatchRatio:
                    splititem.SplitType = SplitType.SAMECONTENT;
                    splititem.Value = text;
                    if text in samevalues:
                        splititem.IsRepeat = True;
                    else:
                        samevalues.append(text);
                else:
                    splititem.SplitType = SplitType.DIFFCONTENT;
            splititems.append(splititem)
        splitGroup.SplitChars = splitchars;
        splitGroup.SplitItems = splititems;
        # post process

        return splitGroup;

    def SplitWithGroup(self, text, splitgroup, isSameOut=True, issplitOut=False):
        results = self.Split(text, splitgroup.SplitChars);
        splitIndex = 0;
        for r in results:
            currp = splitgroup.SplitItems[splitIndex];
            if r in splitgroup.SplitChars:
                while splitgroup.SplitItems[splitIndex].Value != r:
                    splitIndex += 1;
                    if splitIndex == len(splitgroup.SplitItems):
                        return;
                if issplitOut == False:
                    splitIndex += 1;
                    continue;
            splitIndex += 1;
            if currp.SplitType == SplitType.SAMECONTENT:
                if isSameOut == False:
                    continue;
            yield r;

    def Split(self, data, splits):  # 連續的分隔符會被合並?
        if data is None:
            return None;
        if len(splits) == 0:
            return [data];
        last = -1;
        splititems = [];
        l = len(data);
        for i in range(0, l):
            r = data[i];
            if r not in splits:
                continue;
            else:
                if i > 0 and i > last + 1:
                    splititems.append(data[last + 1:i]);
                splititems.append(r);
                last = i
        if last + 1 < len(data):
            splititems.append(data[last + 1:]);
        return splititems;


if __name__ == '__main__':
    sp = Spliter();
    spgroups = sp.Compile(['中樓層/14層,東西,西直門南大街 3號院,1985年建,板樓'
                              , '中樓層/23層,南北,通惠南路6號,2003年建,板樓',
                           '中樓層/12層,南北,通惠南路6號 1號院,2003年建,塔樓'])
    for r in sp.SplitWithGroup(u"低樓層/14層,東西,太陽宮中路太陽宮大廈,2003年建,板樓", spgroups):
        print(r)

sp是分割器實例,對文本數組編譯后獲得了spgroups,這個數據結構存儲了分割所需的信息。之后使用SplitWithGroup方法,即可對文本進行分割,返回的是一個生成器。該函數的一系列參數可以指定是否輸出相同項:

3. 其他可能的方法

文本對齊算法

上文算法雖然簡單,但卻不能處理所有的情況。以下情況就很難處理:
- 樣本數量非常少
- 分隔符數量不一致
- 上文方法可能會帶來嚴重的錯誤。
采用對齊的思路進行分析。對齊的意思是很容易理解的,但如何讓計算機認為是對齊呢?這就依賴於字符串的編輯距離。

對齊算法有大量可供參考的源代碼和論文,由於不是作者親自實現的,因此就不多做介紹。這種算法使用了動態規划技術,相對簡單,但內存占用量是巨大的,如果字符串長度很長,那么將會開辟很大的空間進行計算。另外,它是兩兩計算的,如果給出的是一組數據,如何合並最后結果也是比較復雜的問題。

4.歧義性

分割不一定是好的,這要看所考慮的問題的環境。例如12:03是一個時間,如果直接按照冒號分割,之后的算法就再也無法判斷是不是時間了。分割造成了原有語境的丟失。
解決這個問題的思路有兩種:

軟分割

與其糾結於要不要分割,倒不如換一種角度考慮問題。樹結構給了很好地啟發,在一個層級上看,它們是一個字段,但看得更深一些,則是兩個字段。因此,12:03被划分為兩個子節點12和03,但從父節點上來看,卻依舊是12:03。
樹結構比列表帶來了更強大的表現能力,具體可參考《用樹結構描述和計算數據》一文。

自動確定層級關系

考慮下面的數據:
LGA,MBA,12:03,15:04,165
MBA,LGA,18:30,22:40,174
可以認定,分隔符有:和, 但哪個符號的優先級更高呢?我們當然知道是冒號,但計算機如何知道?
一種基本可行的方案,是確定分割后,不同子項的模式是否相似
如果優先按逗號分割,則有\w+,一個\d+,兩個dd:dd的模式
但如優先按冒號,則會變成 \w+,\w+,\d{2} 以及\d{2} \d{2},\d{3}
顯然,按照逗號分割,子項的模式相似度更高。優先使用逗號分隔。這樣,就能轉換為一顆樹。

與之而來的問題,是如何確定不同子項的模式相似度更高?
我對此的定義是,不同子項內部的分隔符的數量和排布相似性更高。使用歐拉距離即可計算。不過,這種基於相似性的策略,總有可能出錯的情況。

5. 總結

經過筆者近半年的試用,這種算法在處理網頁數據時非常方便,能夠輕松地對不同文本進行切分。當然算法比較簡單,用的也是最朴素的“方差”來評估分隔符的可能性。如果有更好的辦法,歡迎留言。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM