貝葉斯的數學基礎和理論就不寫了,很基礎,網上博客也一大堆。這里只寫實現的具體過程
(代碼復制可以直接使用,沒有缺少,里面會有一些測試性的語句)
總的來說實現的過程分成四個步驟
第一部分:一些基礎函數的實現
loadDataSet()函數創建了一些實驗樣本,這個是我們自己寫的,用來對代碼編寫后進行簡單的測試,其中postingList是樣本,classVec是樣本的標簽(1 是侮辱性留言,0是非侮辱性留言)
createVocaList()函數會創建一個包含在所有文檔中出現的不重復的列表,得到基於數據集的一個詞集
setOfWord2Vec()函數 輸入參數為詞匯表及某個文檔,輸出的是文檔向量,向量的每個元素為1或者0,代表單詞是否在文檔中出現。這個稱為詞集模型
bagOfWord2Vec()函數 和setOfWord2Vec()函數差不多,不過這個是統計詞在文檔中出現的次數,因為一個詞可能在文檔中出現不止一次,這個稱為詞袋模型
最后一些語句就是一些測試性代碼,看看函數有沒有編寫成功。
import numpy as np import re import feedparser as fp import operator as op def loadDataSet(): postingList = [['my','dog','has','flea','problems','help','please','haha'], ['maybe','not','take','him','to','dog','park','stupid'], ['my','dalmation','is','so','cute','I','love','him','haha'], ['stop','posting','stupid','worthless','garbage'], ['mr','licks','ate','my','steak','how','to','stop','him','haha'], ['quit','buying','worthless','dog','food','stupid'] ] classVec = [0,1,0,1,0,1] return postingList,classVec #創建一個包含在所有文檔中不重復的詞的列表,為Python的 set數據類型 def createVocaList(dataSet): vocabSet = set([]) for docunment in dataSet: vocabSet = vocabSet | set(docunment) return list(vocabSet) #詞集模型,將每個詞的出現與否作為一個特征,創建一個詞向量 def setOfWords2Vec(vocabList,inputSet): returnVec = [0]*len(vocabList) for word in inputSet: if word in vocabList: returnVec[vocabList.index(word)] = 1 else: print ("the word: %s is not in my vocabList!" % word) return returnVec #如果一個詞在文檔中出現不止一次,稱為詞袋模型 def bagOfWords2Vec(vocabList,inputSet): returnVec = [0]*len(vocabList) for word in inputSet: if word in vocabList: returnVec[vocabList.index(word)] += 1 else: print ("the word: %s is not in my vocabList!" % word) return returnVec '''listOPosts,listClasses = loadDataSet() vocaSet = createVocaList(listOPosts) print (vocaSet) #listOPosts[0].append('hhh') print (setOfWords2Vec(vocaSet,listOPosts[0]))'''
第二部分:是貝葉斯算法的代碼實現:
trainNB0()函數是貝葉斯訓練函數,通過樣本得到一些條件概率,輸出結果就是p0Vect代表是p(W(i) | 0)也就是p(某詞 | 非侮辱性)的概率,p1Vect代表是p(W(i) | 0)也就是 p(某詞 | 侮辱性)概率,pAbusive代表是訓練集中侮辱性的樣本數目的概率。其中會對得到的初步概率進行對數處理,原因代碼里簡單寫了一下。
classifyNB()函數通過上面的函數得到的概率,輸入要輸入的預測的樣本得到分類
testingNB()函數是用來測試用的,簡單封裝到了一個函數里
#朴素貝葉斯訓練函數 def trainNB0(trainMatrix,trainCategory): numTrainDocs = len(trainMatrix) numWords = len(trainMatrix[0])#這里直接用第一個例子的長度作為詞的長度會不會不合適 pAbusive = sum(trainCategory)/float(numTrainDocs) #p0Num = np.zeros(numWords) #p1Num = np.zeros(numWords) #p0Denom = 0.0 #p1Denom = 0.0 #其中一個概率值為0,那么最后的乘積也為0,所以在初始化的時候把所有的詞出現的次數初始化為1 #並將分母初始化為2(為什么要初始化為2呢?,可能要考慮極端情況吧,比如如果訓練集中的數據標簽 #全是0 或者全是 1 ,可能也不一定是2 ,也可能是其他更好的數組) p0Num = np.ones(numWords) p1Num = np.ones(numWords) p0Denom = 2.0 p1Denom = 2.0 for i in range(numTrainDocs): if trainCategory[i] == 1: p1Num += trainMatrix[i] p1Denom += sum(trainMatrix[i]) else: p0Num += trainMatrix[i] p0Denom += sum(trainMatrix[i]) p1Vect = np.log(p1Num/p1Denom) p0Vect = np.log(p0Num/p0Denom) return p0Vect , p1Vect , pAbusive '''listOPosts,listClasses = loadDataSet() myVocabList = createVocaList(listOPosts) #print (myVocabList) trainMat = [] for postinDoc in listOPosts: trainMat.append(setOfWords2Vec(myVocabList,postinDoc)) p0V, p1V, pAb = trainNB0(trainMat, listClasses) print ("the p0V is :",p0V) print ("the p1V is :",p1V) print ("the pAb is :",pAb)''' #要理解為什么要對p0V, p1V取對數,以及ln(a*b) = lna + lnb,是為了解決下溢出問題,由於太多很小 #的數相乘。 #p0V就是p(w(i)|c0)的概率,p1V就是p(w(i)|c1)的概率,pClass1就是訓練數據中p(c1)的概率 def classifyNB(vec2Classify,p0Vec,p1Vec,pClass1): #ln(a*b) = lna + lnb #p(w)的概率是一定的,只需要比較分子的大小 p1 = sum(vec2Classify * p1Vec) + np.log(pClass1) p0 = sum(vec2Classify * p0Vec) + np.log(1.0-pClass1) if p1 > p0: return 1 else: return 0 def testingNB(): listOPosts,listClasses = loadDataSet() myVocabList = createVocaList(listOPosts) trainMat = [] for postinDoc in listOPosts: trainMat.append(setOfWords2Vec(myVocabList,postinDoc)) p0V, p1V, pAb = trainNB0(trainMat, listClasses) testEntry = ['love','my','dalmation'] thisDoc = np.array(setOfWords2Vec(myVocabList,testEntry)) print (testEntry,'classified as:',classifyNB(thisDoc,p0V,p1V,pAb)) testEntry = ['stupid','garbage'] thisDoc = np.array(setOfWords2Vec(myVocabList,testEntry)) print (testEntry,'classified as:',classifyNB(thisDoc,p0V,p1V,pAb)) #testingNB()
第三部分:使用朴素貝葉斯過濾垃圾郵件
textParse()函數是將每一個樣本解析成一個字符串列表,好進行使用
spamTest()函數是用來測試的函數,將樣本全部讀取出來,然后抽取一部分樣本用來測試算法,然后調用第一部分和第二部分的代碼進行測試
#垃圾郵件檢測實例 #將郵件內容解析成字符串列表。//將一個大字符串解析成字符串列表 def textParse(bigString): listofTokens = re.split(r'\W*',bigString) return [tok.lower() for tok in listofTokens if len(tok) > 2] def spamTest(): docList = [] classList = [] fullText = []
#讀取樣本數據 for i in range(1,26): #spam是垃圾郵件 wordList = textParse(open('email/spam/%d.txt' % i).read()) docList.append(wordList) fullText.extend(wordList) classList.append(1) wordList = textParse(open('email/ham/%d.txt' % i).read()) docList.append(wordList) fullText.extend(wordList) classList.append(0) vocabList = createVocaList(docList) trainingSet = list(range(50)) testSet = [] #選出來十條數據作為測試數據集 for i in range(10): randIndex = int(np.random.uniform(0,len(trainingSet))) testSet.append(trainingSet[randIndex]) del(trainingSet[randIndex]) trainMat=[] trainClasses = [] for docIndex in trainingSet: trainMat.append(setOfWords2Vec(vocabList,docList[docIndex])) trainClasses.append(classList[docIndex]) p0V,p1V,pSpam = trainNB0(np.array(trainMat),np.array(trainClasses)) #print ("diyige:",p0V,"dierge:",p1V,"disange",pSpam) errorCount = 0 for docIndex in testSet: wordVector = setOfWords2Vec(vocabList,docList[docIndex]) if classifyNB(wordVector,p0V,p1V,pSpam) != classList[docIndex]:
第四部分:使用貝葉斯分類器從個人廣告找那個獲取區域傾向
這部分數據從網上獲取,需要收集數據:導入RSS源,了解一下RSS源的用法,有一個feedparser包可以用來處理RSS源
calcMostFreq()函數統計文本中的高頻詞匯,然后用來提出,以為例如英文中的 is ,as, the等詞出現的頻率很高,但是對於用來分類毫無用處,甚至會降低分類的准確性。
localWords()函數用來獲取數據和測試樣本,和spamTest()函數功能上基本一樣,就是獲取數據的方式不一樣(讀懂代碼的話需要了解一下RSS的編寫規則)
#實例:從個人廣告中獲取區域傾向 #遍歷詞匯表中的每個詞並統計它在文本中出現的次數,然后從高到低排序 def calcMostFreq(vocabList,fullText): freqDict = {} for token in vocabList: freqDict[token] = fullText.count(token) sortedFreq = sorted(freqDict.items(),key = op.itemgetter(1),reverse = True) return sortedFreq[:30] def localWords(feed1,feed0): docList = [] classList = [] fullText = [] minLen = min(len(feed1['entries']),len(feed0['entries'])) for i in range(minLen): wordList = textParse(feed1['entries'][i]['summary']) docList.append(wordList) fullText.extend(wordList) classList.append(1) wordList = textParse(feed0['entries'][i]['summary']) docList.append(wordList) fullText.extend(wordList) classList.append(0) vocabList = createVocaList(docList) top30Words = calcMostFreq(vocabList,fullText) for pairW in top30Words: if pairW in vocabList: vocabList.remove(pairW) trainingSet = list(range(2*minLen)) testSet = [] for i in range(20): randIndex = int(np.random.uniform(0,len(trainingSet))) testSet.append(trainingSet[randIndex]) del(trainingSet[randIndex]) trainMat = [] trainClasses = [] for docIndex in trainingSet: trainMat.append(bagOfWords2Vec(vocabList,docList[docIndex])) trainClasses.append(classList[docIndex]) p0V,p1V,pSpam = trainNB0(np.array(trainMat),np.array(trainClasses)) errorCount = 0 for docIndex in testSet: wordVector = bagOfWords2Vec(vocabList,docList[docIndex]) if classifyNB(np.array(wordVector),p0V,p1V,pSpam) != classList[docIndex]: errorCount +=1 print ('the error rate is :',float(errorCount)/len(testSet)) return vocabList,p0V,p1V ny = fp.parse('http://newyork.craigslist.org/stp/index.rss') sf = fp.parse('http://sfbay.craigslist.org/stp/index.rss') #print (ny['entries']) vocabList, psf,pny = localWords(ny,sf)
總結:使用概率有時要比使用硬規則更有效,這里假設每個詞(特征)之間獨立,但是現實中往往不是這樣的,每個詞的出現往往和周圍的詞有關,但是這里我們只進行了簡單的實現。可以優化的方面還有很多。