window系統 1. anaconda 或python spark環境變量 2. 配置spark home D:\Develop\spark-1.6.0-bin-hadoop2.6\spark-1.6.0-bin-hadoop2.6 3. C:\Users\Administrator>pip install py4j python for java cpython c 與java交互就是通過py4j pip uninstall py4j 4. 安裝pyspark (不建議pip install ,) 為了版本對應,采用復制 D:\Develop\spark-1.6.0-bin-hadoop2.6\python\lib py4j-0.9-src pyspark 復制到 D:\Develop\Python\Anaconda3\Lib\site-packages C:\Users\Administrator>python >>> import py4j >>> import pyspark ## 不報錯,則安裝成功 idea 版本python插件下載
eclipse scala IDE 安裝pydev插件
python spark 環境描述 python 2.7.9 spark spark-1.6.0-bin-hadoop2.6
安裝pyspark (不建議pip install ,) 為了版本對應,采用復制,注意解壓文件夾名稱可能有兩層,脫去外層pyspark @@@@@@@
D:\Develop\spark-1.6.0-bin-hadoop2.6\python\lib
py4j-0.9-src pyspark 復制到
D:\Develop\Python\Anaconda3\Lib\site-packages
安裝 pyDev pycharm 配置成功。但是不能自動提示。 scala IDE 版本太低,官網下載最新的版本,eclispe marketplace 安裝老版和新版都報錯。 最后:參考bing 必應搜索,【how to install pydev on eclipse scala ide】 http://www.planetofbits.com/python/how-to-install-python-pydev-plugin-in-eclipse/ 重新下載 eclipse ,下載 \PyDev 5.2.0 復制到eclipse dropins下。在eclispe marketplace中安裝scala. ok.
eclipse 運行Python console 亂碼(因為只支持gbk)
# coding:utf-8 ''' Created on 2019年10月3日 @author: Administrator python wordcount python print ''' from pyspark.conf import SparkConf from pyspark.context import SparkContext print "hello" print("world") def showResult(one): print(one) if __name__ == '__main__': conf = SparkConf() conf.setMaster("local") conf.setAppName("test") sc=SparkContext(conf=conf) lines = sc.textFile("./words") words = lines.flatMap(lambda line:line.split(" ")) pairWords = words.map(lambda word:(word,1)) reduceResult=pairWords.reduceByKey(lambda v1,v2:v1+v2) reduceResult.foreach(lambda one:showResult(one))
hello spark hello hdfs hello python hello scala hello hbase hello storm hello python hello scala hello hbase hello storm
## Demo2.py # coding:utf-8 ''' Created on 2019年10月3日 @author: Administrator ''' from os import sys import random if __name__ == '__main__': file = sys.argv[0] ## 本文件的路徑 outputPath = sys.argv[1] print("%s,%s"%(file,outputPath)) ## 真正的參數 print(random.randint(0,255)) ## 包含0和255 pvuvdata 2019-10-01 192.168.112.101 uid123214 beijing www.taobao.com buy 2019-10-02 192.168.112.111 uid123223 beijing www.jingdong.com buy 2019-10-03 192.168.112.101 uid123214 beijing www.tencent.com login 2019-10-04 192.168.112.101 uid123214 shanghai www.taobao.com buy 2019-10-01 192.168.112.101 uid123214 guangdong www.taobao.com logout 2019-10-01 192.168.112.101 uid123214 shanghai www.taobao.com view 2019-10-02 192.168.112.111 uid123223 beijing www.jingdong.com comment 2019-10-03 192.168.112.101 uid123214 shanghai www.tencent.com login 2019-10-04 192.168.112.101 uid123214 beijing www.xiaomi.com buy 2019-10-01 192.168.112.101 uid123214 shanghai www.huawei.com buy 2019-10-03 192.168.112.101 uid123214 beijing www.tencent.com login 2019-10-04 192.168.112.101 uid123214 shanghai www.taobao.com buy 2019-10-01 192.168.112.101 uid123214 guangdong www.taobao.com logout 2019-10-01 192.168.112.101 uid123214 beijing www.taobao.com view 2019-10-02 192.168.112.111 uid123223 guangdong www.jingdong.com comment 2019-10-03 192.168.112.101 uid123214 beijing www.tencent.com login 2019-10-04 192.168.112.101 uid123214 guangdong www.xiaomi.com buy 2019-10-01 192.168.112.101 uid123214 beijing www.huawei.com buy pvuv.py
# coding:utf-8 # import sys # print(sys.getdefaultencoding()) ## ascii # reload(sys) # sys.setdefaultencoding("utf-8") ## 2.x版本 # print(sys.getdefaultencoding()) from pyspark.conf import SparkConf from pyspark.context import SparkContext from cProfile import label from com.sxt.spark.wordcount import showResult ''' Created on 2019年10月3日 @author: Administrator ''' ''' 6. PySpark統計PV,UV 部分代碼 1). 統計PV,UV 2). 統計除了某個地區外的UV 3).統計每個網站最活躍的top2地區 4).統計每個網站最熱門的操作 5).統計每個網站下最活躍的top3用戶 ''' ## 方法 def pv(lines): pairSite = lines.map(lambda line:(line.split("\t")[4],1)) reduceResult = pairSite.reduceByKey(lambda v1,v2:v1+v2) result = reduceResult.sortBy(lambda tp:tp[1],ascending=False) result.foreach(lambda one:showResult(one)) def uv(lines): distinct = lines.map(lambda line:line.split("\t")[1] +'_' + line.split("\t")[4]).distinct() reduceResult= distinct.map(lambda distinct:(distinct.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2) result = reduceResult.sortBy(lambda tp:tp[1],ascending=False) result.foreach(lambda one:showResult(one)) def uvExceptBJ(lines): distinct = lines.filter(lambda line:line.split('\t')[3]<>'beijing').map(lambda line:line.split("\t")[1] +'_' + line.split("\t")[4]).distinct() reduceResult= distinct.map(lambda distinct:(distinct.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2) result = reduceResult.sortBy(lambda tp:tp[1],ascending=False) result.foreach(lambda one:showResult(one)) def getCurrentSiteTop2Location(one): site = one[0] locations = one[1] locationDict = {} for location in locations: if location in locationDict: locationDict[location] +=1 else: locationDict[location] =1 sortedList = sorted(locationDict.items(),key=lambda kv : kv[1],reverse=True) resultList = [] if len(sortedList) < 2: resultList = sortedList else: for i in range(2): resultList.append(sortedList[i]) return site,resultList def getTop2Location(line): site_locations = lines.map(lambda line:(line.split("\t")[4],line.split("\t")[3])).groupByKey() result = site_locations.map(lambda one:getCurrentSiteTop2Location(one)).collect() for elem in result: print(elem) def getSiteInfo(one): userid = one[0] sites = one[1] dic = {} for site in sites: if site in dic: dic[site] +=1 else: dic[site] = 1 resultList = [] for site,count in dic.items(): resultList.append((site,(userid,count))) return resultList ''' 如下一片程序感覺有錯,我寫 ''' def getCurrectSiteTop3User(one): site = one[0] uid_c_tuples = one[1] top3List = ["","",""] for uid_count in uid_c_tuples: for i in range(len(top3List)): if top3List[i] == "": top3List[i] = uid_count break else: if uid_count[1] > top3List[i][1]: ## 元組 for j in range(2,i,-1): top3List[j] = top3List[j-1] top3List[i] = uid_count break return site,top3List ''' 如下一片程序感覺有錯,老師寫 ''' def getCurSiteTop3User2(one): site = one[0] userid_count_Iterable = one[1] top3List = ["","",""] for userid_count in userid_count_Iterable: for i in range(0,len(top3List)): if top3List[i] == "": top3List[i] = userid_count break else: if userid_count[1]>top3List[i][1]: for j in range(2,i,-1): top3List[j] = top3List[j-1] top3List[i] = userid_count break return site,top3List def getTop3User(lines): site_uid_count = lines.map(lambda line:(line.split('\t')[2],line.split("\t")[4])).groupByKey().flatMap(lambda one:getSiteInfo(one)) result = site_uid_count.groupByKey().map(lambda one:getCurrectSiteTop3User(one)).collect() for ele in result: print(ele) if __name__ == '__main__': # conf = SparkConf().setMaster("local").setAppName("test") # sc = SparkContext() # lines = sc.textFile("./pvuvdata") # # pv(lines) # # uv(lines) # # uvExceptBJ(lines) # # getTop2Location(lines) # # getTop3User(lines) res = getCurrectSiteTop3User(("baidu",[('A',12),('B',5),('C',12),('D',1),('E',21),('F',20)])) print(res) res2 = getCurSiteTop3User2(("baidu",[('A',12),('B',5),('C',12),('D',1),('E',21),('F',20)])) print(res)
python pycharm anaconda 版本切換為3.5
線性回歸:y=w0+w1x1+w2x2... xi稱為特征,wi稱為權重 矩陣轉置就是矩陣旋轉90度
有監督訓練:是有y值;無監督訓練是無y值
線性回歸代碼 lpsa.data -0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 -0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 -0.1625189,-1.57881887548545 -2.1887840293994 1.36116336875686 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541 -0.1625189,-2.16691708463163 -0.807993896938655 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 0.3715636,-0.507874475300631 -0.458834049396776 -0.250631301876899 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 0.7654678,-2.03612849966376 -0.933954647105133 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 0.8544153,-0.557312518810673 -0.208756571683607 -0.787896192088153 0.990146852537193 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.2669476,-0.929360463147704 -0.0578991819441687 0.152317365781542 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.2669476,-2.28833047634983 -0.0706369432557794 -0.116315079324086 0.80409888772376 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.2669476,0.223498042876113 -1.41471935455355 -0.116315079324086 -1.02470580167082 -0.522940888712441 -0.29928234305568 0.342627053981254 0.199211097885341 1.3480731,0.107785900236813 -1.47221551299731 0.420949810887169 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.687186906466865 1.446919,0.162180092313795 -1.32557369901905 0.286633588334355 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.4701758,-1.49795329918548 -0.263601072284232 0.823898478545609 0.788388310173035 -0.522940888712441 -0.29928234305568 0.342627053981254 0.199211097885341 1.4929041,0.796247055396743 0.0476559407005752 0.286633588334355 -1.02470580167082 -0.522940888712441 0.394013435896129 -1.04215728919298 -0.864466507337306 1.5581446,-1.62233848461465 -0.843294091975396 -3.07127197548598 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.5993876,-0.990720665490831 0.458513517212311 0.823898478545609 1.07379746308195 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.6389967,-0.171901281967138 -0.489197399065355 -0.65357996953534 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.6956156,-1.60758252338831 -0.590700340358265 -0.65357996953534 -0.619561070667254 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 1.7137979,0.366273918511144 -0.414014962912583 -0.116315079324086 0.232904453212813 -0.522940888712441 0.971228997418125 0.342627053981254 1.26288870310799 1.8000583,-0.710307384579833 0.211731938156277 0.152317365781542 -1.02470580167082 -0.522940888712441 -0.442797990776478 0.342627053981254 1.61744790484887 1.8484548,-0.262791728113881 -1.16708345615721 0.420949810887169 0.0846342590816532 -0.522940888712441 0.163172393491611 0.342627053981254 1.97200710658975 1.8946169,0.899043117369237 -0.590700340358265 0.152317365781542 -1.02470580167082 -0.522940888712441 1.28643254437683 -1.04215728919298 -0.864466507337306 1.9242487,-0.903451690500615 1.07659722048274 0.152317365781542 1.28380453408541 -0.522940888712441 -0.442797990776478 -1.04215728919298 -0.864466507337306 2.008214,-0.0633337899773081 -1.38088970920094 0.958214701098423 0.80409888772376 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 2.0476928,-1.15393789990757 -0.961853075398404 -0.116315079324086 -1.02470580167082 -0.522940888712441 -0.442797990776478 -1.04215728919298 -0.864466507337306 2.1575593,0.0620203721138446 0.0657973885499142 1.22684714620405 -0.468824786336838 -0.522940888712441 1.31421001659859 1.72741139715549 -0.332627704725983 2.1916535,-0.75731027755674 -2.92717970468456 0.018001143228728 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.332627704725983 2.2137539,1.11226993252773 1.06484916245061 0.555266033439982 0.877691038550889 1.89254797819741 1.43890404648442 0.342627053981254 0.376490698755783 2.2772673,-0.468768642850639 -1.43754788774533 -1.05652863719378 0.576050411655607 -0.522940888712441 0.0120483832567209 0.342627053981254 -0.687186906466865 2.2975726,-0.618884859896728 -1.1366360750781 -0.519263746982526 -1.02470580167082 -0.522940888712441 -0.863171185425945 3.11219574032972 1.97200710658975 2.3272777,-0.651431999123483 0.55329161145762 -0.250631301876899 1.11210019001038 -0.522940888712441 -0.179808625688859 -1.04215728919298 -0.864466507337306 2.5217206,0.115499102435224 -0.512233676577595 0.286633588334355 1.13650173283446 -0.522940888712441 -0.179808625688859 0.342627053981254 -0.155348103855541 2.5533438,0.266341329949937 -0.551137885443386 -0.384947524429713 0.354857790686005 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.332627704725983 2.5687881,1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152 2.6567569,-0.218972367124187 0.851192298581141 0.555266033439982 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 0.908329501367106 2.677591,0.263121415733908 1.4142681068416 0.018001143228728 1.35980653053822 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 2.7180005,-0.0704736333296423 1.52000996595417 0.286633588334355 1.39364261119802 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.332627704725983 2.7942279,-0.751957286017338 0.316843561689933 -1.99674219506348 0.911736065044475 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 2.8063861,-0.685277652430997 1.28214038482516 0.823898478545609 0.232904453212813 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541 2.8124102,-0.244991501432929 0.51882005949686 -0.384947524429713 0.823246560137838 -0.522940888712441 -0.863171185425945 0.342627053981254 0.553770299626224 2.8419982,-0.75731027755674 2.09041984898851 1.22684714620405 1.53428167116843 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 2.8535925,1.20962937075363 -0.242882661178889 1.09253092365124 -1.02470580167082 -0.522940888712441 1.24263233939889 3.11219574032972 2.50384590920108 2.9204698,0.570886990493502 0.58243883987948 0.555266033439982 1.16006887775962 -0.522940888712441 1.07357183940747 0.342627053981254 1.61744790484887 2.9626924,0.719758684343624 0.984970304132004 1.09253092365124 1.52137230773457 -0.522940888712441 -0.179808625688859 0.342627053981254 -0.509907305596424 2.9626924,-1.52406140158064 1.81975700990333 0.689582255992796 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 2.9729753,-0.132431544081234 2.68769877553723 1.09253092365124 1.53428167116843 -0.522940888712441 -0.442797990776478 0.342627053981254 -0.687186906466865 3.0130809,0.436161292804989 -0.0834447307428255 -0.519263746982526 -1.02470580167082 1.89254797819741 1.07357183940747 0.342627053981254 1.26288870310799 3.0373539,-0.161195191984091 -0.671900359186746 1.7641120364153 1.13650173283446 -0.522940888712441 -0.863171185425945 0.342627053981254 0.0219314970149 3.2752562,1.39927182372944 0.513852869452676 0.689582255992796 -1.02470580167082 1.89254797819741 1.49394503405693 0.342627053981254 -0.155348103855541 3.3375474,1.51967002306341 -0.852203755696565 0.555266033439982 -0.104527297798983 1.89254797819741 1.85927724828569 0.342627053981254 0.908329501367106 3.3928291,0.560725834706224 1.87867703391426 1.09253092365124 1.39364261119802 -0.522940888712441 0.486423065822545 0.342627053981254 1.26288870310799 3.4355988,1.00765532502814 1.69426310090641 1.89842825896812 1.53428167116843 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.509907305596424 3.4578927,1.10152996153577 -0.10927271844907 0.689582255992796 -1.02470580167082 1.89254797819741 1.97630171771485 0.342627053981254 1.61744790484887 3.5160131,0.100001934217311 -1.30380956369388 0.286633588334355 0.316555063757567 -0.522940888712441 0.28786643052924 0.342627053981254 0.553770299626224 3.5307626,0.987291634724086 -0.36279314978779 -0.922212414640967 0.232904453212813 -0.522940888712441 1.79270085261407 0.342627053981254 1.26288870310799 3.5652984,1.07158528137575 0.606453149641961 1.7641120364153 -0.432854616994416 1.89254797819741 0.528504607720369 0.342627053981254 0.199211097885341 3.5876769,0.180156323255198 0.188987436375017 -0.519263746982526 1.09956763075594 -0.522940888712441 0.708239632330506 0.342627053981254 0.199211097885341 3.6309855,1.65687973755377 -0.256675483533719 0.018001143228728 -1.02470580167082 1.89254797819741 1.79270085261407 0.342627053981254 1.26288870310799 3.6800909,0.5720085322365 0.239854450210939 -0.787896192088153 1.0605418233138 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 3.7123518,0.323806133438225 -0.606717660886078 -0.250631301876899 -1.02470580167082 1.89254797819741 0.342907418101747 0.342627053981254 0.199211097885341 3.9843437,1.23668206715898 2.54220539083611 0.152317365781542 -1.02470580167082 1.89254797819741 1.89037692416194 0.342627053981254 1.26288870310799 3.993603,0.180156323255198 0.154448192444669 1.62979581386249 0.576050411655607 1.89254797819741 0.708239632330506 0.342627053981254 1.79472750571931 4.029806,1.60906277046565 1.10378605019827 0.555266033439982 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 4.1295508,1.0036214996026 0.113496885050331 -0.384947524429713 0.860016436332751 1.89254797819741 -0.863171185425945 0.342627053981254 -0.332627704725983 4.3851468,1.25591974271076 0.577607033774471 0.555266033439982 -1.02470580167082 1.89254797819741 1.07357183940747 0.342627053981254 1.26288870310799 4.6844434,2.09650591351268 0.625488598331018 -2.66832330782754 -1.02470580167082 1.89254797819741 1.67954222367555 0.342627053981254 0.553770299626224 5.477509,1.30028987435881 0.338383613253713 0.555266033439982 1.00481276295349 1.89254797819741 1.24263233939889 0.342627053981254 1.97200710658975
LinearRegression.scala
package com.bjsxt.lr import org.apache.log4j.{ Level, Logger } import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LinearRegressionModel object LinearRegression { def main(args: Array[String]) { // 構建Spark對象 val conf = new SparkConf().setAppName("LinearRegressionWithSGD").setMaster("local") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) // sc.setLogLevel("WARN") //讀取樣本數據 val data_path1 = "lpsa.data" val data = sc.textFile(data_path1) val examples = data.map { line => val parts = line.split(',') val y = parts(0) val xs = parts(1) LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) } val train2TestData = examples.randomSplit(Array(0.8, 0.2), 1) /* * 迭代次數 * 訓練一個多元線性回歸模型收斂(停止迭代)條件: * 1、error值小於用戶指定的error值 * 2、達到一定的迭代次數 */ val numIterations = 100 //在每次迭代的過程中 梯度下降算法的下降步長大小 0.1 0.2 0.3 0.4 val stepSize = 1 val miniBatchFraction = 1 val lrs = new LinearRegressionWithSGD() //讓訓練出來的模型有w0參數,就是由截距 lrs.setIntercept(true) //設置步長 lrs.optimizer.setStepSize(stepSize) //設置迭代次數 lrs.optimizer.setNumIterations(numIterations) //每一次下山后,是否計算所有樣本的誤差值,1代表所有樣本,默認就是1.0 lrs.optimizer.setMiniBatchFraction(miniBatchFraction) val model = lrs.run(train2TestData(0)) println(model.weights) println(model.intercept) // 對樣本進行測試 val prediction = model.predict(train2TestData(1).map(_.features)) val predictionAndLabel = prediction.zip(train2TestData(1).map(_.label)) val print_predict = predictionAndLabel.take(20) println("prediction" + "\t" + "label") for (i <- 0 to print_predict.length - 1) { println(print_predict(i)._1 + "\t" + print_predict(i)._2) } // 計算測試集平均誤差 val loss = predictionAndLabel.map { case (p, v) => val err = p - v Math.abs(err) }.reduce(_ + _) val error = loss / train2TestData(1).count println("Test RMSE = " + error) // 模型保存 // val ModelPath = "model" // model.save(sc, ModelPath) // val sameModel = LinearRegressionModel.load(sc, ModelPath) sc.stop() } }
// case (p, v) 表示是 p v 結構
邏輯回歸又稱logistic回歸,是一種廣義的線性回歸分析模型邏輯回歸是一種用於分類的算法
傅里葉變換: 時域分析:對一個信號來說,信號強度隨時間的變化的規律就是時域特性,例如一個信號的時域波形可以表達信號隨着時間的變化。 頻域分析:對一個信號來說,在對其進行分析時,分析信號和頻率有關的部分,而不是和時間相關的部分,和時域相對。也就是信號是由哪些單一頻率的的信號合成的就是頻域特性。頻域中有一個重要的規則是正弦波是頻域中唯一存在的波。即正弦波是對頻域的描述,因為時域中的任何波形都可用正弦波合成。 一般來說,時域的表示較為形象直觀,頻域分析則簡練。傅里葉變換是貫穿時域和頻域的方法之一,傅里葉變換就是將難以處理的時域信號轉換成了易於分析的頻域信號。
傅里葉原理:任何連續測量的時序信號,都可以表示為不同頻率的正弦波信號的無限疊加。
音樂分類的步驟: 1. 通過傅里葉變換將不同7類里面所有原始wav格式音樂文件轉換為特征,並取前1000個特征,存入文件以便后續訓練使用 2. 讀入以上7類特征向量數據作為訓練集 3. 使用sklearn包中LogisticRegression的fit方法計算出分類模型 4. 讀入黑豹樂隊歌曲”無地自容”並進行傅里葉變換同樣取前1000維作為特征向量 5. 調用模型的predict方法對音樂進行分類,結果分為rock即搖滾類
訓練集
待分類的文件
如果在python2.7下運行 需要 1,安裝VCForPython27.msi 2,pip install wheel 3,pip install D:/PythonInstallPackage/numpy-1.9.2+mkl-cp27-none-win_amd64.whl 4,pip install D:/PythonInstallPackage/scipy-0.16.0-cp27-none-win_amd64.whl 5,pip install D:/PythonInstallPackage/scikit_learn-0.16.1-cp27-none-win_amd64.whl 6,pip install D:/PythonInstallPackage/python_dateutil-2.4.2-py2.py3-none-any.whl 7,pip install D:/PythonInstallPackage/six-1.9.0-py2.py3-none-any.whl 8,pip install D:/PythonInstallPackage/pyparsing-2.0.3-py2-none-any.whl 9,pip install D:/PythonInstallPackage/pytz-2015.4-py2.py3-none-any.whl 10,pip install D:/PythonInstallPackage/matplotlib-1.4.3-cp27-none-win_amd64.whl
music.py # coding:utf-8 from scipy import fft from scipy.io import wavfile from matplotlib.pyplot import specgram import matplotlib.pyplot as plt # 可以先把一個wav文件讀入python,然后繪制它的頻譜圖(spectrogram)來看看是什么樣的 #畫框設置 #figsize=(10, 4)寬度和高度的英寸 # dpi=80 分辨率 # plt.figure(figsize=(10, 4),dpi=80) # # (sample_rate, X) = wavfile.read("D:/usr/genres/metal/converted/metal.00065.au.wav") # print(sample_rate, X.shape) # specgram(X, Fs=sample_rate, xextent=(0,30)) # plt.xlabel("time") # plt.ylabel("frequency") # #線的形狀和顏色 # plt.grid(True, linestyle='-', color='0.75') # #tight緊湊一點 # plt.savefig("D:/usr/metal.00065.au.wav5.png", bbox_inches="tight") # 當然,我們也可以把每一種的音樂都抽一些出來打印頻譜圖以便比較,如下圖: # def plotSpec(g,n): # sample_rate, X = wavfile.read("E:/genres/"+g+"/converted/"+g+"."+n+".au.wav") # specgram(X, Fs=sample_rate, xextent=(0,30)) # plt.title(g+"_"+n[-1]) # # plt.figure(num=None, figsize=(18, 9), dpi=80, facecolor='w', edgecolor='k') # plt.subplot(6,3,1);plotSpec("classical","00001");plt.subplot(6,3,2);plotSpec("classical","00002") # plt.subplot(6,3,3);plotSpec("classical","00003");plt.subplot(6,3,4);plotSpec("jazz","00001") # plt.subplot(6,3,5);plotSpec("jazz","00002");plt.subplot(6,3,6);plotSpec("jazz","00003") # plt.subplot(6,3,7);plotSpec("country","00001");plt.subplot(6,3,8);plotSpec("country","00002") # plt.subplot(6,3,9);plotSpec("country","00003");plt.subplot(6,3,10);plotSpec("pop","00001") # plt.subplot(6,3,11);plotSpec("pop","00002");plt.subplot(6,3,12);plotSpec("pop","00003") # plt.subplot(6,3,13);plotSpec("rock","00001");plt.subplot(6,3,14);plotSpec("rock","00002") # plt.subplot(6,3,15);plotSpec("rock","00003");plt.subplot(6,3,16);plotSpec("metal","00001") # plt.subplot(6,3,17);plotSpec("metal","00002");plt.subplot(6,3,18);plotSpec("metal","00003") # plt.tight_layout(pad=0.4, w_pad=0, h_pad=1.0) # plt.savefig("D:/compare.au.wav.png", bbox_inches="tight") # 對單首音樂進行傅里葉變換 #畫框設置figsize=(9, 6)寬度和高度的英寸,dpi=80是分辨率 plt.figure(figsize=(9, 6), dpi=80) #sample_rate代表每秒樣本的采樣率,X代表讀取文件的所有信息 音軌信息,這里全是單音軌數據 是個數組【雙音軌是個二維數組,左聲道和右聲道】 #采樣率:每秒從連續信號中提取並組成離散信號的采樣個數,它用赫茲(Hz)來表示 sample_rate, X = wavfile.read("D:/usr/genres/jazz/converted/jazz.00002.au.wav") print(sample_rate,X,type(X),len(X)) # 大圖含有2行1列共2個子圖,正在繪制的是第一個 plt.subplot(211) #畫wav文件時頻分析的函數 specgram(X, Fs=sample_rate) plt.xlabel("time") plt.ylabel("frequency") plt.subplot(212) #fft 快速傅里葉變換 fft(X)得到振幅 即當前采樣下頻率的振幅 fft_X = abs(fft(X)) print("fft_x",fft_X,len(fft_X)) #畫頻域分析圖 specgram(fft_X) # specgram(fft_X,Fs=1) plt.xlabel("frequency") plt.ylabel("amplitude") plt.savefig("D:/usr/genres/jazz.00000.au.wav.fft.png") plt.show() logistic.py # coding:utf-8 from scipy import fft from scipy.io import wavfile from scipy.stats import norm from sklearn import linear_model, datasets from sklearn.linear_model import LogisticRegression import matplotlib.pyplot as plt import numpy as np """ 使用logistic regression處理音樂數據,音樂數據訓練樣本的獲得和使用快速傅里葉變換(FFT)預處理的方法需要事先准備好 1. 把訓練集擴大到每類100個首歌,類別仍然是六類:jazz,classical,country, pop, rock, metal 2. 同時使用logistic回歸訓練模型 3. 引入一些評價的標准來比較Logistic測試集上的表現 """ # 准備音樂數據 def create_fft(g,n): rad="D:/usr/genres/"+g+"/converted/"+g+"."+str(n).zfill(5)+".au.wav" #sample_rate 音頻的采樣率,X代表讀取文件的所有信息 (sample_rate, X) = wavfile.read(rad) #取1000個頻率特征 也就是振幅 fft_features = abs(fft(X)[:1000]) #zfill(5) 字符串不足5位,前面補0 sad="D:/usr/trainset/"+g+"."+str(n).zfill(5)+ ".fft" np.save(sad, fft_features) #-------create fft 構建訓練集-------------- genre_list = ["classical", "jazz", "country", "pop", "rock", "metal","hiphop"] for g in genre_list: for n in range(100): create_fft(g,n) print('running...') print('finished') #========================================================================================= # 加載訓練集數據,分割訓練集以及測試集,進行分類器的訓練 # 構造訓練集!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #-------read fft-------------- genre_list = ["classical", "jazz", "country", "pop", "rock", "metal","hiphop"] X=[] Y=[] for g in genre_list: for n in range(100): rad="D:/usr/trainset/"+g+"."+str(n).zfill(5)+ ".fft"+".npy" #加載文件 fft_features = np.load(rad) X.append(fft_features) #genre_list.index(g) 返回匹配上類別的索引號 Y.append(genre_list.index(g)) #構建的訓練集 X=np.array(X) #構建的訓練集對應的類別 Y=np.array(Y) # 接下來,我們使用sklearn,來構造和訓練我們的兩種分類器 #------train logistic classifier-------------- model = LogisticRegression() #需要numpy.array類型參數 model.fit(X, Y) print('Starting read wavfile...') #prepare test data------------------- # sample_rate, test = wavfile.read("i:/classical.00007.au.wav") sample_rate, test = wavfile.read("D:/usr/projects/heibao-wudizirong-remix.wav") print(sample_rate,test) testdata_fft_features = abs(fft(test))[:1000] #model.predict(testdata_fft_features) 預測為一個數組,array([類別]) print(testdata_fft_features) # testdata_fft_features = np.array(testdata_fft_features).reshape(1, -1) type_index = model.predict(testdata_fft_features)[0] print(type_index) print(genre_list[type_index])
分類結果 4 rock
邏輯回歸中:訓練的模型的訓練集有什么特點,訓練出來的模型就有什么樣的功能
思路說明
思路: 記錄每一個卡口一段時間內車輛的平均速度,作為本卡口擁堵情況的分類。
啟動 zookeeper 啟動 node 2,3,4 的zookeeper /opt/sxt/zookeeper-3.4.6/bin/zkServer.sh start 啟動kafka ,創建topic node2,3,4 /root/shells/start-kafka.sh cat start-kafka.sh cd /opt/sxt/kafka_2.10-0.8.2.2 nohup bin/kafka-server-start.sh config/server.properties >kafka.log 2>&1 & ./bin/kafka-topics.sh -zookeeper node2:2181,node3,node4 --create --topic car_events --partitions 3 --replication-factor 3 ./bin/kafka-topics.sh -zookeeper node2:2181,node3,node4 --list ./bin/kafka-console-consumer.sh --zookeeper node2,node3:2181,node4 --topic car_events 啟動 redis redis-server (自行百度確認) 進入redis-cli [root@node1 ~]# redis-cli 127.0.0.1:6379> select 1 OK 127.0.0.1:6379[1]> keys * (empty list or set) 啟動spark集群 node2啟動spark 集群 /opt/sxt/spark-1.6.0/sbin/start-all.sh node3啟動master集群 /opt/sxt/spark-1.6.0/sbin/start-master.sh 原始data data/2014082013_all_column_test.txt '310999003001', '3109990030010220140820141230292','00000000','','2014-08-20 14:09:35','0',255,'SN', 0.00,'4','','310999','310999003001','02','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999003102', '3109990031020220140820141230266','粵BT96V3','','2014-08-20 14:09:35','0',21,'NS', 0.00,'2','','310999','310999003102','02','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999000106', '3109990001060120140820141230316','滬F35253','','2014-08-20 14:09:35','0',57,'OR', 0.00,'2','','310999','310999000106','01','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999000205', '3109990002050220140820141230954','滬FN0708','','2014-08-20 14:09:35','0',33,'IR', 0.00,'2','','310999','310999000205','02','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999000205', '3109990002050120140820141230975','皖N94028','','2014-08-20 14:09:35','0',40,'IR', 0.00,'2','','310999','310999000205','01','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999015305', '3109990153050220140820141230253','滬A09L05','','2014-08-20 14:09:35','0',24,'IR', 0.00,'2','','310999','310999015305','02','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999015305', '3109990153050120140820141230658','蘇FRM638','','2014-08-20 14:09:35','0',16,'IR', 0.00,'2','','310999','310999015305','01','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999003201', '3109990032010420140820141230966','滬FW3438','','2014-08-20 14:09:35','0',24,'SN', 0.00,'2','','310999','310999003201','04','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999003201', '3109990032010220140820141230302','冀F1755Z','','2014-08-20 14:09:35','0',20,'SN', 0.00,'2','','310999','310999003201','02','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' '310999003702', '3109990037020320140820141230645','滬M05016','','2014-08-20 14:09:35','0',10,'NS', 0.00,'2','','310999','310999003702','03','','','2','','','2014-08-20 14:12:30','2014-08-20 14:16:13',0,0,'2014-08-21 18:50:05','','',' ' 生產數據: 運行 package com.ic.traffic.streaming import java.sql.Timestamp import java.util.Properties import kafka.javaapi.producer.Producer import kafka.producer.{KeyedMessage, ProducerConfig} import org.apache.spark.{SparkContext, SparkConf} import org.codehaus.jettison.json.JSONObject import scala.util.Random //向kafka car_events中生產數據 object KafkaEventProducer { def main(args: Array[String]): Unit = { val topic = "car_events" val brokers = "node2:9092,node3:9092,node4:9092" val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(props) val producer = new Producer[String, String](kafkaConfig) val sparkConf = new SparkConf().setAppName("traffic data").setMaster("local[4]") val sc = new SparkContext(sparkConf) val filePath = "./data/2014082013_all_column_test.txt" val records = sc.textFile(filePath) .filter(!_.startsWith(";")) .map(_.split(",")).collect() for (i <- 1 to 100) { for (record <- records) { // prepare event data val event = new JSONObject() event.put("camera_id", record(0)) .put("car_id", record(2)) .put("event_time", record(4)) .put("speed", record(6)) .put("road_id", record(13)) // produce event message producer.send(new KeyedMessage[String, String](topic,event.toString)) println("Message sent: " + event) Thread.sleep(200) } } sc.stop } } jedis 代碼 RedisClient.scala package com.ic.traffic.streaming import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool object RedisClient extends Serializable { val redisHost = "node1" val redisPort = 6379 val redisTimeout = 30000 /** * JedisPool是一個連接池,既可以保證線程安全,又可以保證了較高的效率。 */ lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout) // lazy val hook = new Thread { // override def run = { // println("Execute hook thread: " + this) // pool.destroy() // } // } // sys.addShutdownHook(hook.run) } sparkStreaming代碼 CarEventCountAnalytics.scala package com.ic.traffic.streaming import java.text.SimpleDateFormat import java.util.Calendar import kafka.serializer.StringDecoder import net.sf.json.JSONObject import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.dstream.InputDStream /** * 將每個卡扣的總速度_車輛數 存入redis中 * 【yyyyMMdd_Monitor_id,HHmm,SpeedTotal_CarCount】 */ object CarEventCountAnalytics { def main(args: Array[String]): Unit = { // Create a StreamingContext with the given master URL val conf = new SparkConf().setAppName("CarEventCountAnalytics") if (args.length == 0) { conf.setMaster("local[*]") } val ssc = new StreamingContext(conf, Seconds(5)) // ssc.checkpoint(".") // Kafka configurations val topics = Set("car_events") val brokers = "node2:9092,node3:9092,node4:9092" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") val dbIndex = 1 // Create a direct stream val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val events: DStream[JSONObject] = kafkaStream.map(line => { //JSONObject.fromObject 將string 轉換成jsonObject val data = JSONObject.fromObject(line._2) println(data) data }) /** * carSpeed K:monitor_id * V:(speedCount,carCount) */ val carSpeed = events.map(jb => (jb.getString("camera_id"),jb.getInt("speed"))) .mapValues((speed:Int)=>(speed,1)) //(camera_id, (speed, 1) ) => (camera_id , (total_speed , total_count)) .reduceByKeyAndWindow((a:Tuple2[Int,Int], b:Tuple2[Int,Int]) => {(a._1 + b._1, a._2 + b._2)},Seconds(60),Seconds(10)) // .reduceByKeyAndWindow((a:Tuple2[Int,Int], b:Tuple2[Int,Int]) => {(a._1 + b._1, a._2 + b._2)},(a:Tuple2[Int,Int], b:Tuple2[Int,Int]) => {(a._1 - b._1, a._2 - b._2)},Seconds(20),Seconds(10)) carSpeed.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val jedis = RedisClient.pool.getResource partitionOfRecords.foreach(pair => { val camera_id = pair._1 val speedTotal = pair._2._1 val CarCount = pair._2._2 val now = Calendar.getInstance().getTime() // create the date/time formatters val minuteFormat = new SimpleDateFormat("HHmm") val dayFormat = new SimpleDateFormat("yyyyMMdd") val time = minuteFormat.format(now) val day = dayFormat.format(now) if(CarCount!=0){ jedis.select(dbIndex) jedis.hset(day + "_" + camera_id, time , speedTotal + "_" + CarCount) } }) RedisClient.pool.returnResource(jedis) }) }) println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") ssc.start() ssc.awaitTermination() } }
接上: 啟動 hdfs yarn TrainLRwithLBFGS.scala 開始訓練 package com.ic.traffic.streaming import java.text.SimpleDateFormat import java.util import java.util.{Date} import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import scala.collection.mutable.ArrayBuffer import scala.Array import scala.collection.mutable.ArrayBuffer import org.apache.spark.mllib.classification.LogisticRegressionModel /** * 訓練模型 */ object TrainLRwithLBFGS { val sparkConf = new SparkConf().setAppName("train traffic model").setMaster("local[*]") val sc = new SparkContext(sparkConf) // create the date/time formatters val dayFormat = new SimpleDateFormat("yyyyMMdd") val minuteFormat = new SimpleDateFormat("HHmm") def main(args: Array[String]) { // fetch data from redis val jedis = RedisClient.pool.getResource jedis.select(1) // find relative road monitors for specified road // val camera_ids = List("310999003001","310999003102","310999000106","310999000205","310999007204") val camera_ids = List("310999003001","310999003102") val camera_relations:Map[String,Array[String]] = Map[String,Array[String]]( "310999003001" -> Array("310999003001","310999003102","310999000106","310999000205","310999007204"), "310999003102" -> Array("310999003001","310999003102","310999000106","310999000205","310999007204") ) val temp = camera_ids.map({ camera_id => val hours = 5 val nowtimelong = System.currentTimeMillis(); val now = new Date(nowtimelong) val day = dayFormat.format(now)//yyyyMMdd val array = camera_relations.get(camera_id).get /** * relations中存儲了每一個卡扣在day這一天每一分鍾的平均速度 */ val relations = array.map({ camera_id => // println(camera_id) // fetch records of one camera for three hours ago val minute_speed_car_map = jedis.hgetAll(day + "_'" + camera_id+"'") (camera_id, minute_speed_car_map) }) // relations.foreach(println) // organize above records per minute to train data set format (MLUtils.loadLibSVMFile) val dataSet = ArrayBuffer[LabeledPoint]() // start begin at index 3 //Range 從300到1 遞減 不包含0 for(i <- Range(60*hours,0,-1)){ val features = ArrayBuffer[Double]() val labels = ArrayBuffer[Double]() // get current minute and recent two minutes for(index <- 0 to 2){ //當前時刻過去的時間那一分鍾 val tempOne = nowtimelong - 60 * 1000 * (i-index) val d = new Date(tempOne) val tempMinute = minuteFormat.format(d)//HHmm //下一分鍾 val tempNext = tempOne - 60 * 1000 * (-1) val dNext = new Date(tempNext) val tempMinuteNext = minuteFormat.format(dNext)//HHmm for((k,v) <- relations){ val map = v //map -- k:HHmm v:Speed if(index == 2 && k == camera_id){ if (map.containsKey(tempMinuteNext)) { val info = map.get(tempMinuteNext).split("_") val f = info(0).toFloat / info(1).toFloat labels += f } } if (map.containsKey(tempMinute)){ val info = map.get(tempMinute).split("_") val f = info(0).toFloat / info(1).toFloat features += f } else{ features += -1.0 } } } if(labels.toArray.length == 1 ){ //array.head 返回數組第一個元素 val label = (labels.toArray).head val record = LabeledPoint(if ((label.toInt/10)<10) (label.toInt/10) else 10.0, Vectors.dense(features.toArray)) dataSet += record } } // dataSet.foreach(println) // println(dataSet.length) val data = sc.parallelize(dataSet) // Split data into training (80%) and test (20%). //將data這個RDD隨機分成 8:2兩個RDD val splits = data.randomSplit(Array(0.8, 0.2)) //構建訓練集 val training = splits(0) /** * 測試集的重要性: * 測試模型的准確度,防止模型出現過擬合的問題 */ val test = splits(1) if(!data.isEmpty()){ // 訓練邏輯回歸模型 val model = new LogisticRegressionWithLBFGS() .setNumClasses(11) .setIntercept(true) .run(training) // 測試集測試模型 val predictionAndLabels = test.map { case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) } predictionAndLabels.foreach(x=> println("預測類別:"+x._1+",真實類別:"+x._2)) // Get evaluation metrics. 得到評價指標 val metrics: MulticlassMetrics = new MulticlassMetrics(predictionAndLabels) val precision = metrics.precision// 准確率 println("Precision = " + precision) if(precision > 0.8){ val path = "hdfs://node1:8020/model/model_"+camera_id+"_"+nowtimelong // val path = "hdfs://node1:9000/model/model_"+camera_id+"_"+nowtimelong model.save(sc, path) println("saved model to "+ path) jedis.hset("model", camera_id , path) } } }) RedisClient.pool.returnResource(jedis) } } 預測 PredictLRwithLBFGS.scala 修改當前時間符合運行的時間段,在到redis 中查看效果如何。 package com.ic.traffic.streaming import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.mllib.classification.{ LogisticRegressionModel, LogisticRegressionWithLBFGS } import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.{ SparkConf, SparkContext } import scala.collection.mutable.ArrayBuffer object PredictLRwithLBFGS { val sparkConf = new SparkConf().setAppName("predict traffic").setMaster("local[4]") val sc = new SparkContext(sparkConf) // create the date/time formatters val dayFormat = new SimpleDateFormat("yyyyMMdd") val minuteFormat = new SimpleDateFormat("HHmm") val sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss") def main(args: Array[String]) { val input = "2019-10-05_01:35:00" val date = sdf.parse(input) val inputTimeLong = date.getTime() // val inputTime = new Date(inputTimeLong) val day = dayFormat.format(date)//yyyyMMdd // fetch data from redis val jedis = RedisClient.pool.getResource jedis.select(1) // find relative road monitors for specified road // val camera_ids = List("310999003001","310999003102","310999000106","310999000205","310999007204") val camera_ids = List("310999003001", "310999003102") val camera_relations: Map[String, Array[String]] = Map[String, Array[String]]( "310999003001" -> Array("310999003001", "310999003102", "310999000106", "310999000205", "310999007204"), "310999003102" -> Array("310999003001", "310999003102", "310999000106", "310999000205", "310999007204")) val temp = camera_ids.map({ camera_id => val list = camera_relations.get(camera_id).get val relations = list.map({ camera_id => // fetch records of one camera for three hours ago (camera_id, jedis.hgetAll(day + "_'" + camera_id + "'")) }) // relations.foreach(println) // organize above records per minute to train data set format (MLUtils.loadLibSVMFile) val aaa = ArrayBuffer[Double]() // get current minute and recent two minutes for (index <- 3 to (1,-1)) { //拿到過去 一分鍾,兩分鍾,過去三分鍾的時間戳 val tempOne = inputTimeLong - 60 * 1000 * index val currentOneTime = new Date(tempOne) //獲取輸入時間的 "HHmm" val tempMinute = minuteFormat.format(currentOneTime) println("inputtime ====="+currentOneTime) for ((k, v) <- relations) { // k->camera_id ; v->speed val map = v if (map.containsKey(tempMinute)) { val info = map.get(tempMinute).split("_") val f = info(0).toFloat / info(1).toFloat aaa += f } else { aaa += -1.0 } } } // Run training algorithm to build the model val path = jedis.hget("model", camera_id) if(path!=null){ val model = LogisticRegressionModel.load(sc, path) // Compute raw scores on the test set. val prediction = model.predict(Vectors.dense(aaa.toArray)) println(input + "\t" + camera_id + "\t" + prediction + "\t") // jedis.hset(input, camera_id, prediction.toString) } }) RedisClient.pool.returnResource(jedis) } }
127.0.0.1:6379[1]> hgetall "20191005_'310999019905'" 1) "0103" 2) "38_1" 3) "0104" 4) "179_5" 5) "0105" 6) "39_1" 7) "0107" 8) "178_5" 9) "0108" 10) "39_1"
邏輯回歸深入以及優化
分別對應如上的 1,2,3 4 5 健康狀況訓練集.txt 1 1:57 2:0 3:0 4:5 5:3 6:5 1 1:56 2:1 3:0 4:3 5:4 6:3 1 1:27 2:0 3:0 4:4 5:3 6:4 1 1:46 2:0 3:0 4:3 5:2 6:4 1 1:75 2:1 3:0 4:3 5:3 6:2 1 1:19 2:1 3:0 4:4 5:4 6:4 1 1:49 2:0 3:0 4:4 5:3 6:3 1 1:25 2:1 3:0 4:3 5:5 6:4 1 1:47 2:1 3:0 4:3 5:4 6:3 1 1:59 2:0 3:1 4:0 5:1 6:2 1 1:18 2:0 3:0 4:4 5:3 6:3 1 1:79 2:0 3:0 4:5 5:4 6:3 LogisticRegression1.scala package com.bjsxt.lr import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} /** * 邏輯回歸 健康狀況訓練集 */ object LogisticRegression { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark").setMaster("local[3]") val sc = new SparkContext(conf) //加載 LIBSVM 格式的數據 這種格式特征前綴要從1開始 val inputData = MLUtils.loadLibSVMFile(sc, "健康狀況訓練集.txt") val splits = inputData.randomSplit(Array(0.7, 0.3), seed = 1L) val (trainingData, testData) = (splits(0), splits(1)) val lr = new LogisticRegressionWithLBFGS() // lr.setIntercept(true) val model = lr.run(trainingData) val result = testData .map{point=>Math.abs(point.label-model.predict(point.features)) } println("正確率="+(1.0-result.mean())) /** *邏輯回歸算法訓練出來的模型,模型中的參數個數(w0....w6)=訓練集中特征數(6)+1 */ println(model.weights.toArray.mkString(" ")) println(model.intercept) sc.stop() } } w0測試數據.txt 0 1:1.0140641394573489 2:1.0053491794300906 1 1:2.012709390641638 2:2.001907117215239 0 1:1.0052568352996578 2:1.0162894218780352 1 1:2.0140249849545118 2:2.0042119386532122 0 1:1.0159829400919032 2:1.0194470820311243 1 1:2.007369501382139 2:2.0071524676923533 0 1:1.0013307693392184 2:1.0158450335581597 1 1:2.01517182545874 2:2.0052873772719177 0 1:1.0130231961501968 2:1.019126883631059 1 1:2.014080456651037 2:2.004348828637212 0 1:1.0094645373208078 2:1.0092571241891017 LogisticRegression2.scala package com.bjsxt.lr import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionWithSGD} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 有無截距 */ object LogisticRegression2 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark").setMaster("local[3]") val sc = new SparkContext(conf) val inputData: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "w0測試數據.txt") /** * randomSplit(Array(0.7, 0.3))方法就是將一個RDD拆分成N個RDD,N = Array.length * 第一個RDD中的數據量和數組中的第一個元素值相關 */ val splits = inputData.randomSplit(Array(0.7, 0.3),11L) val (trainingData, testData) = (splits(0), splits(1)) val lr = new LogisticRegressionWithSGD // 設置要有W0,也就是有截距 lr.setIntercept(true) val model=lr.run(trainingData) val result=testData.map{labeledpoint=>Math.abs(labeledpoint.label-model.predict(labeledpoint.features)) } println("正確率="+(1.0-result.mean())) println(model.weights.toArray.mkString(" ")) println(model.intercept) } } 線性不可分數據集.txt 0 1:1.0021476396439248 2:1.0005277544365077 0 1:0.004780438916016197 2:0.004464089083318912 1 1:1.005957371386034 2:0.009488506452877079 1 1:0.0032888762213735202 2:1.0096142970365218 0 1:1.004487425006835 2:1.0108859204789946 0 1:0.016129088455466407 2:0.013415124039032063 1 1:1.0183108247074553 2:0.014888578069677983 1 1:0.005267064113457103 2:1.0149789230465331 0 1:1.0079616977465946 2:1.0135833360338558 0 1:0.011391932589615935 2:0.015552261205467644 LogisticRegression3.scala package com.bjsxt.lr import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionWithSGD} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} /** * 線性不可分 ----升高維度 */ object LogisticRegression3 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark").setMaster("local[3]") val sc = new SparkContext(conf) // 解決線性不可分我們來升維,升維有代價,計算復雜度變大了 val inputData = MLUtils.loadLibSVMFile(sc, "線性不可分數據集.txt") .map { labelpoint => val label = labelpoint.label val feature = labelpoint.features //新維度的值,必須基於已有的維度值的基礎上,經過一系列的數學變換得來 val array = Array(feature(0), feature(1), feature(0) * feature(1)) val convertFeature = Vectors.dense(array) new LabeledPoint(label, convertFeature) } val splits = inputData.randomSplit(Array(0.7, 0.3),11L) val (trainingData, testData) = (splits(0), splits(1)) val lr = new LogisticRegressionWithLBFGS() lr.setIntercept(true) val model = lr.run(trainingData) val result = testData .map { point => Math.abs(point.label - model.predict(point.features)) } println("正確率=" + (1.0 - result.mean())) println(model.weights.toArray.mkString(" ")) println(model.intercept) } } 健康狀況訓練集.txt 1 1:57 2:0 3:0 4:5 5:3 6:5 1 1:56 2:1 3:0 4:3 5:4 6:3 1 1:27 2:0 3:0 4:4 5:3 6:4 1 1:46 2:0 3:0 4:3 5:2 6:4 1 1:75 2:1 3:0 4:3 5:3 6:2 1 1:19 2:1 3:0 4:4 5:4 6:4 1 1:49 2:0 3:0 4:4 5:3 6:3 1 1:25 2:1 3:0 4:3 5:5 6:4 1 1:47 2:1 3:0 4:3 5:4 6:3 1 1:59 2:0 3:1 4:0 5:1 6:2 LogisticRegression4.scala package com.bjsxt.lr import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionWithSGD} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} /** * 設置分類閾值 */ object LogisticRegression4 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark").setMaster("local[3]") val sc = new SparkContext(conf) /** * LabeledPoint = Vector+Y */ val inputData = MLUtils.loadLibSVMFile(sc, "健康狀況訓練集.txt") val splits = inputData.randomSplit(Array(0.7, 0.3),11L) val (trainingData, testData) = (splits(0), splits(1)) val lr = new LogisticRegressionWithLBFGS() lr.setIntercept(true) // val model = lr.run(trainingData) // val result = testData // .map{point=>Math.abs(point.label-model.predict(point.features)) } // println("正確率="+(1.0-result.mean())) // println(model.weights.toArray.mkString(" ")) // println(model.intercept) /** * 如果在訓練模型的時候沒有調用clearThreshold這個方法,那么這個模型預測出來的結果都是分類號 * 如果在訓練模型的時候調用clearThreshold這個方法,那么這個模型預測出來的結果是一個概率 */ val model = lr.run(trainingData).clearThreshold() val errorRate = testData.map{p=> //score就是一個概率值 val score = model.predict(p.features) // 癌症病人寧願判斷出得癌症也別錯過一個得癌症的病人 val result = score>0.3 match {case true => 1 ; case false => 0} Math.abs(result-p.label) }.mean() println(1-errorRate) } } LogisticRegression5.scala package com.bjsxt.lr import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionWithSGD} import org.apache.spark.mllib.optimization.{L1Updater, SquaredL2Updater} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} /** * 魯棒性調優 * 提高模型抗干擾能力 */ object LogisticRegression5 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark").setMaster("local[3]") val sc = new SparkContext(conf) val inputData = MLUtils.loadLibSVMFile(sc, "健康狀況訓練集.txt") val splits = inputData.randomSplit(Array(0.7, 0.3),100) val (trainingData, testData) = (splits(0), splits(1)) /** * LogisticRegressionWithSGD 既有L1 又有L2正則化(默認) */ val lr = new LogisticRegressionWithSGD() lr.setIntercept(true) // lr.optimizer.setUpdater(new L1Updater()) lr.optimizer.setUpdater(new SquaredL2Updater) /** * LogisticRegressionWithLBFGS 既有L1 又有L2正則化(默認) */ // val lr = new LogisticRegressionWithLBFGS() // lr.setIntercept(true) // lr.optimizer.setUpdater(new L1Updater) // lr.optimizer.setUpdater(new SquaredL2Updater) /** * 這塊設置的是我們的lambda,越大越看重這個模型的推廣能力,一般不會超過1,0.4是個比較好的值 */ lr.optimizer.setRegParam(0.4) val model = lr.run(trainingData) val result=testData .map{point=>Math.abs(point.label-model.predict(point.features)) } println("正確率="+(1.0-result.mean())) println(model.weights.toArray.mkString(" ")) println(model.intercept) } }
環境分類數據.txt 0 1:49 2:52320 1 1:17 2:17868 0 1:36 2:54418 1 1:13 2:19701 0 1:30 2:97516 1 1:15 2:17075 0 1:37 2:77589 1 1:10 2:14078 0 1:53 2:65912 1 1:17 2:16562 0 1:50 2:76091 LogisticRegression6.scala package com.bjsxt.lr import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionWithSGD} import org.apache.spark.mllib.feature.StandardScaler import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.sql.SQLContext /** * 方差歸一化 */ object LogisticRegression6 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark").setMaster("local[3]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * scalerModel 這個對象中已經有每一列的均值和方差 * withStd:代表的是方差歸一化 * withMean:代表的是均值歸一化 * scalerModel:存放每一列的方差值 * * withMean默認為false, withStd默認為true * 當withMean=true,withStd=false時,向量中的各元素均減去它相應的均值。 * 當withMean=true,withStd=true時,各元素在減去相應的均值之后,還要除以它們相應的標准差。 * */ val inputData = MLUtils.loadLibSVMFile(sc, "環境分類數據.txt") val vectors = inputData.map(_.features) val scalerModel = new StandardScaler(withMean=true, withStd=true).fit(vectors) val normalizeInputData = inputData.map{point => val label = point.label //對每一條數據進行了歸一化 val features = scalerModel.transform(point.features.toDense) println(features) new LabeledPoint(label,features) } val splits = normalizeInputData.randomSplit(Array(0.7, 0.3),100) val (trainingData, testData) = (splits(0), splits(1)) val lr=new LogisticRegressionWithLBFGS() // val lr = new LogisticRegressionWithSGD() lr.setIntercept(true) val model = lr.run(trainingData) val result=testData.map{point=>Math.abs(point.label-model.predict(point.features)) } println("正確率="+(1.0-result.mean())) println(model.weights.toArray.mkString(" ")) println(model.intercept) } } LogisticRegression7.scala package com.bjsxt.lr import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.sql.SQLContext import org.apache.spark.mllib.linalg.DenseVector import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS /** * 最大最小值歸一化 */ object LogisticRegression7 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("spark").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * 加載生成的DataFrame自動有兩列:label features */ val df = sqlContext.read.format("libsvm").load("環境分類數據.txt") // df.show() /** * MinMaxScaler fit需要DataFrame類型數據 * setInputCol:設置輸入的特征名 * setOutputCol:設置歸一化后輸出的特征名 * */ val minMaxScalerModel = new MinMaxScaler() .setInputCol("features") .setOutputCol("scaledFeatures") .fit(df) /** * 將所有數據歸一化 */ val features = minMaxScalerModel.transform(df) features.show() val normalizeInputData = features.rdd.map(row=>{ val label = row.getAs("label").toString().toDouble val dense = (row.getAs("scaledFeatures")).asInstanceOf[DenseVector] new LabeledPoint(label,dense) }) val splits = normalizeInputData.randomSplit(Array(0.7, 0.3),11L) val (trainingData, testData) = (splits(0), splits(1)) val lr=new LogisticRegressionWithLBFGS() lr.setIntercept(true) val model = lr.run(trainingData) val result=testData.map{point=>Math.abs(point.label-model.predict(point.features)) } println("正確率="+(1.0-result.mean())) println(model.weights.toArray.mkString(" ")) println(model.intercept) } }
案例:郵件分類預測:是否垃圾郵件 sms_spam.txt type,text ham,00 008704050406 008704050406 008704050406 008704050406 00 00 00 00 00 00 Hope Hope Hope you are having a good week. Just checking in ham,K..give back my thanks. ham,Am also doing in cbe only. But have to pay. spam,"complimentary 4 STAR Ibiza Holiday or £10,000 cash needs your URGENT collection. 09066364349 NOW from Landline not to lose out! Box434SK38WP150PPM18+" spam,okmail: Dear Dave this is your final notice to collect your 4* Tenerife Holiday or #5000 CASH award! Call 09061743806 from landline. TCs SAE Box326 CW25WX 150ppm ham,Aiya we discuss later lar... Pick u up at 4 is it? ham,Are you this much buzy ham,Please ask mummy to call father spam,Marvel Mobile Play the official Ultimate Spider-man game (£4.50) on ur mobile right now. Text SPIDER to 83338 for the game & we ll send u a FREE 8Ball wallpaper ham,"fyi I'm at usf now, swing by the room whenever" ham,"Sure thing big man. i have hockey elections at 6, shouldn€˜t go on longer than an hour though" ham,I anything lor... ham,"By march ending, i should be ready. But will call you for sure. The problem is that my capital never complete. How far with you. How's work and the ladies" ham,"Hmm well, night night " ham,K I'll be sure to get up before noon and see what's what ham,Ha ha cool cool chikku chikku:-):-DB-) bayes.py # coding:utf-8 import os import sys #codecs 編碼轉換模塊 import codecs # 講訓練樣本中的中文文章分詞並存入文本文件中 # if __name__ == '__main__': # corpus = [] # f = codecs.open("D:/workspaceR/news_spam.csv", "r", "utf-8") # f1 = codecs.open("D:/workspaceR/news_spam_jieba.csv", "w", "utf-8") # count = 0 # while True: # line = f.readline() # if line: # count = count + 1 # line = line.split(",") # s = line[1] # words=pseg.cut(s) # temp = [] # for key in words: # temp.append(key.word) # sentence = " ".join(temp) # print line[0],',',sentence # corpus.append(sentence) # f1.write(line[0]) # f1.write(',') # f1.write(sentence) # f1.write('\n') # else: # break # f.close() # f1.close() ###################################################### #Multinomial Naive Bayes Classifier print '*************************\nNaive Bayes\n*************************' from sklearn.naive_bayes import MultinomialNB from sklearn.feature_extraction.text import CountVectorizer if __name__ == '__main__': # 讀取文本構建語料庫 corpus = [] labels = [] corpus_test = [] labels_test = [] f = codecs.open("./sms_spam.txt", "rb") count = 0 while True: #readline() 方法用於從文件讀取整行,包括 "\n" 字符。 line = f.readline() #讀取第一行,第一行數據是列頭,不統計 if count == 0: count = count + 1 continue if line: count = count + 1 line = line.split(",") label = line[0] sentence = line[1] corpus.append(sentence) if "ham"==label: labels.append(0) elif "spam"==label: labels.append(1) if count > 5550: corpus_test.append(sentence) if "ham"==label: labels_test.append(0) elif "spam"==label: labels_test.append(1) else: break # 文本特征提取: # 將文本數據轉化成特征向量的過程 # 比較常用的文本特征表示法為詞袋法 # # 詞袋法: # 不考慮詞語出現的順序,每個出現過的詞匯單獨作為一列特征 # 這些不重復的特征詞匯集合為詞表 # 每一個文本都可以在很長的詞表上統計出一個很多列的特征向量 #CountVectorizer是將文本向量轉換成稀疏表示數值向量(字符頻率向量) vectorizer 將文檔詞塊化,只考慮詞匯在文本中出現的頻率 #詞袋 vectorizer=CountVectorizer() #每行的詞向量,fea_train是一個矩陣 fea_train = vectorizer.fit_transform(corpus) print "vectorizer.get_feature_names is ",vectorizer.get_feature_names() print "fea_train is ",fea_train.toarray() #vocabulary=vectorizer.vocabulary_ 只計算上面vectorizer中單詞的tf(term frequency 詞頻) vectorizer2=CountVectorizer(vocabulary=vectorizer.vocabulary_) fea_test = vectorizer2.fit_transform(corpus_test) # print vectorizer2.get_feature_names() # print fea_test.toarray() #create the Multinomial Naive Bayesian Classifier #alpha = 1 拉普拉斯估計給每個單詞個數加1 clf = MultinomialNB(alpha = 1) clf.fit(fea_train,labels) pred = clf.predict(fea_test); for p in pred: if p == 0: print "正常郵件" else: print "垃圾郵件" scala 代碼 sample_naive_bayes_data.txt ## 相當於上邊python代碼的sentence 編碼化。 label word0出現次 wor1 0次, word2 0次 0,1 0 0 0,2 0 0 0,3 0 0 0,4 0 0 1,0 1 0 1,0 2 0 1,0 3 0 1,0 4 0 2,0 0 1 2,0 0 2 2,0 0 3 2,0 0 4 Naive_bayes.scala package com.bjsxt.bayes import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.mllib.classification.{ NaiveBayes, NaiveBayesModel } import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint object Naive_bayes { def main(args: Array[String]) { //1 構建Spark對象 val conf = new SparkConf().setAppName("Naive_bayes").setMaster("local") val sc = new SparkContext(conf) //讀取樣本數據1 val data = sc.textFile("./sample_naive_bayes_data.txt") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) } //樣本數據划分訓練樣本與測試樣本 val splits = parsedData.randomSplit(Array(0.5, 0.5), seed = 11L) val training = splits(0) val test = splits(1) //新建貝葉斯分類模型模型,並訓練 ,lambda 拉普拉斯估計 val model = NaiveBayes.train(training, lambda = 1.0) //對測試樣本進行測試 val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val print_predict = predictionAndLabel.take(100) println("prediction" + "\t" + "label") for (i <- 0 to print_predict.length - 1) { println(print_predict(i)._1 + "\t" + print_predict(i)._2) } val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() println(accuracy) val result = model.predict(Vectors.dense(Array[Double](80,0,0))) println("result = "+result) //保存模型 // val ModelPath = "./naive_bayes_model" // model.save(sc, ModelPath) // val sameModel = NaiveBayesModel.load(sc, ModelPath) } }
例子 datingTestSet2.txt 40920 8.326976 0.953952 3 14488 7.153469 1.673904 2 26052 1.441871 0.805124 1 75136 13.147394 0.428964 1 38344 1.669788 0.134296 1 72993 10.141740 1.032955 1 35948 6.830792 1.213192 3 KNNDateOnHand.py #coding:utf-8 import numpy as np import operator #matplotlib 繪圖模塊 import matplotlib.pyplot as plt # from array import array # from matplotlib.font_manager import FontProperties #normData 測試數據集的某行, dataSet 訓練數據集 ,labels 訓練數據集的類別,k k的值 def classify(normData,dataSet,labels,k): #計算行數 dataSetSize = dataSet.shape[0] # print ('dataSetSize 長度 =%d'%dataSetSize) #當前點到所有點的坐標差值 ,np.tile(x,(y,1)) 復制x 共y行 1列 diffMat = np.tile(normData, (dataSetSize,1)) - dataSet #對每個坐標差值平方 sqDiffMat = diffMat ** 2 #對於二維數組 sqDiffMat.sum(axis=0)指 對向量每列求和,sqDiffMat.sum(axis=1)是對向量每行求和,返回一個長度為行數的數組 #例如:narr = array([[ 1., 4., 6.], # [ 2., 5., 3.]]) # narr.sum(axis=1) = array([ 11., 10.]) # narr.sum(axis=0) = array([ 3., 9., 9.]) sqDistances = sqDiffMat.sum(axis = 1) #歐式距離 最后開方 distance = sqDistances ** 0.5 #x.argsort() 將x中的元素從小到大排序,提取其對應的index 索引,返回數組 #例: tsum = array([ 11., 10.]) ---- tsum.argsort() = array([1, 0]) sortedDistIndicies = distance.argsort() # classCount保存的K是魅力類型 V:在K個近鄰中某一個類型的次數 classCount = {} for i in range(k): #獲取對應的下標的類別 voteLabel = labels[sortedDistIndicies[i]] #給相同的類別次數計數 classCount[voteLabel] = classCount.get(voteLabel,0) + 1 #sorted 排序 返回新的list # sortedClassCount = sorted(classCount.items(),key=operator.itemgetter(1),reverse=True) sortedClassCount = sorted(classCount.items(),key=lambda x:x[1],reverse=True) return sortedClassCount[0][0] def file2matrix(filename): fr = open(filename) #readlines:是一次性將這個文本的內容全部加載到內存中(列表) arrayOflines = fr.readlines() numOfLines = len(arrayOflines) # print "numOfLines = " , numOfLines #numpy.zeros 創建給定類型的數組 numOfLines 行 ,3列 returnMat = np.zeros((numOfLines,3)) #存結果的列表 classLabelVector = [] index = 0 for line in arrayOflines: #去掉一行的頭尾空格 line = line.strip() listFromline = line.split('\t') returnMat[index,:] = listFromline[0:3] classLabelVector.append(int(listFromline[-1])) index += 1 return returnMat,classLabelVector ''' 將訓練集中的數據進行歸一化 歸一化的目的: 訓練集中飛行公里數這一維度中的值是非常大,那么這個緯度值對於最終的計算結果(兩點的距離)影響是非常大, 遠遠超過其他的兩個維度對於最終結果的影響 實際約會姑娘認為這三個特征是同等重要的 下面使用最大最小值歸一化的方式將訓練集中的數據進行歸一化 ''' #將數據歸一化 def autoNorm(dataSet): # dataSet.min(0) 代表的是統計這個矩陣中每一列的最小值 返回值是一個矩陣1*3矩陣 #例如: numpyarray = array([[1,4,6], # [2,5,3]]) # numpyarray.min(0) = array([1,4,3]) numpyarray.min(1) = array([1,2]) # numpyarray.max(0) = array([2,5,6]) numpyarray.max(1) = array([6,5]) minVals = dataSet.min(0) maxVals = dataSet.max(0) ranges = maxVals - minVals #dataSet.shape[0] 計算行數, shape[1] 計算列數 m = dataSet.shape[0] # print '行數 = %d' %(m) # print maxVals # normDataSet存儲歸一化后的數據 # normDataSet = np.zeros(np.shape(dataSet)) #np.tile(minVals,(m,1)) 在行的方向上重復 minVals m次 即復制m行,在列的方向上重復munVals 1次,即復制1列 normDataSet = dataSet - np.tile(minVals,(m,1)) normDataSet = normDataSet / np.tile(ranges,(m,1)) return normDataSet,ranges,minVals def datingClassTest(): hoRatio = 0.1 datingDataMat,datingLabels = file2matrix('./datingTestSet2.txt') #將數據歸一化 normMat,ranges,minVals = autoNorm(datingDataMat) # m 是 : normMat行數 = 1000 m = normMat.shape[0] # print 'm =%d 行'%m #取出100行數據測試 numTestVecs = int(m*hoRatio) errorCount = 0.0 for i in range(numTestVecs): #normMat[i,:] 取出數據的第i行,normMat[numTestVecs:m,:]取出數據中的100行到1000行 作為訓練集,datingLabels[numTestVecs:m] 取出數據中100行到1000行的類別,4是K classifierResult = classify(normMat[i,:],normMat[numTestVecs:m,:],datingLabels[numTestVecs:m],4) print('模型預測值: %d ,真實值 : %d' %(classifierResult,datingLabels[i])) if (classifierResult != datingLabels[i]): errorCount += 1.0 errorRate = errorCount / float(numTestVecs) print '正確率 : %f' %(1-errorRate) return 1-errorRate ''' 拿到每條樣本的飛行里程數和玩視頻游戲所消耗的時間百分比這兩個維度的值,使用散點圖 ''' def createScatterDiagram(): datingDataMat,datingLabels = file2matrix('datingTestSet2.txt') type1_x = [] type1_y = [] type2_x = [] type2_y = [] type3_x = [] type3_y = [] #生成一個新的圖像 fig = plt.figure() #matplotlib下, 一個 Figure 對象可以包含多個子圖(Axes), 可以使用 subplot() 快速繪制 #subplot(numRows, numCols, plotNum)圖表的整個繪圖區域被分成 numRows 行和 numCols 列,按照從左到右,從上到下的順序對每個子區域進行編號,左上的子區域的編號為1 #plt.subplot(111)等價於plt.subplot(1,1,1) axes = plt.subplot(111) #設置字體 黑體 ,用來正常顯示中文標簽 plt.rcParams['font.sans-serif']=['SimHei'] for i in range(len(datingLabels)): if datingLabels[i] == 1: # 不喜歡 type1_x.append(datingDataMat[i][0]) type1_y.append(datingDataMat[i][1]) if datingLabels[i] == 2: # 魅力一般 type2_x.append(datingDataMat[i][0]) type2_y.append(datingDataMat[i][1]) if datingLabels[i] == 3: # 極具魅力 type3_x.append(datingDataMat[i][0]) type3_y.append(datingDataMat[i][1]) #繪制散點圖 ,前兩個參數表示相同長度的數組序列 ,s 表示點的大小, c表示顏色 type1 = axes.scatter(type1_x, type1_y, s=20, c='red') type2 = axes.scatter(type2_x, type2_y, s=40, c='green') type3 = axes.scatter(type3_x, type3_y, s=50, c='blue') plt.title(u'標題') plt.xlabel(u'每年飛行里程數') plt.ylabel(u'玩視頻游戲所消耗的時間百分比') #loc 設置圖例的位置 2是upper left axes.legend((type1, type2, type3), (u'不喜歡', u'魅力一般', u'極具魅力'), loc=2) # plt.scatter(datingDataMat[:,0],datingDataMat[:,1],c = datingLabels) plt.show() def classifyperson(): resultList = ['沒感覺', '看起來還行','極具魅力'] input_man= [30000,3,0.1] # input_man= [13963,0.000000,1.437030] datingDataMat,datingLabels = file2matrix('datingTestSet2.txt') normMat,ranges,minVals = autoNorm(datingDataMat) result = classify((input_man - minVals)/ranges,normMat,datingLabels,3) print ('你即將約會的人是:%s'%resultList[result-1]) if __name__ == '__main__': # createScatterDiagram觀察數據的分布情況 # createScatterDiagram() acc = datingClassTest() if(acc > 0.9): classifyperson()
## 不采用上邊的手動實現,調用庫實現。 KNNDateByScikit-learn.py #coding:utf-8 from sklearn.neighbors import NearestNeighbors import numpy as np from KNNDateOnHand import * if __name__ == '__main__': datingDataMat,datingLabels = file2matrix('datingTestSet2.txt') normMat,ranges,minVals = autoNorm(datingDataMat) # n_neighbors=3 表示查找的近鄰數,默認是5 # fit:用normMat作為訓練集擬合模型 n_neighbors:幾個最近鄰 #NearestNeighbors 默認使用的就是歐式距離測度 nbrs = NearestNeighbors(n_neighbors=3).fit(normMat) input_man= [9289,9.666576,1.370330] #數據歸一化 S = (input_man - minVals)/ranges #找到當前點的K個臨近點,也就是找到臨近的3個點 #indices 返回的距離數據集中最近點的坐標的下標。 distance 返回的是距離數據集中最近點的距離 distances, indices = nbrs.kneighbors(S) print distances print indices # classCount K:類別名 V:這個類別中的樣本出現的次數 classCount = {} for i in range(3): #找出對應的索引的類別號 voteLabel = datingLabels[indices[0][i]] classCount[voteLabel] = classCount.get(voteLabel,0) + 1 sortedClassCount = sorted(classCount.items(),key=operator.itemgetter(1),reverse=True) resultList = ['沒感覺', '看起來還行','極具魅力'] print resultList[sortedClassCount[0][0]-1]
IdentifImg.py #coding:utf-8 import os import numpy as np from KNNDateOnHand import classify #此方法將每個文件中32*32的矩陣數據,轉換到1*1024一行中 def img2vector(filename): #創建一個1行1024列的矩陣 returnVect = np.zeros((1,1024)) #打開當前的文件 fr = open(filename) #每個文件中有32行,每行有32列數據,遍歷32個行,將32個列數據放入1024的列中 for i in range(32): lineStr = fr.readline() for j in range(32): returnVect[0,32*i+j] = int(lineStr[j]) return returnVect def IdentifImgClassTest(): hwLabels = [] #讀取訓練集 TrainData目錄下所有的文件和文件夾 trainingFileList = os.listdir('TrainData') m = len(trainingFileList) #zeros((m,1024)) 返回一個m行 ,1024列的矩陣,默認是浮點型的 trainingMat = np.zeros((m,1024)) for i in range(m): #獲取文件名稱 fileNameStr = trainingFileList[i] #獲取文件除了后綴的名稱 fileStr = fileNameStr.split('.')[0] #獲取文件"數字"的類別 classNumStr = int(fileStr.split('_')[0]) hwLabels.append(classNumStr) #構建訓練集, img2vector 每個文件返回一行數據 1024列 trainingMat[i,:] = img2vector('TrainData/%s' % fileNameStr) #讀取測試集數據 testFileList = os.listdir('TestData') errorCount = 0.0 mTest = len(testFileList) for i in range(mTest): fileNameStr = testFileList[i] fileStr = fileNameStr.split('.')[0] classNumStr = int(fileStr.split('_')[0]) vectorUnderTest = img2vector('TestData/%s' % fileNameStr) classifierResult = classify(vectorUnderTest, trainingMat, hwLabels, 3) print "識別出的數字是: %d, 真實數字是: %d" % (classifierResult, classNumStr) if (classifierResult != classNumStr): errorCount += 1.0 print "\n識別錯誤次數 %d" % errorCount errorRate = errorCount/float(mTest) print "\n正確率: %f" % (1-errorRate) if __name__ == '__main__': IdentifImgClassTest()
連續數據 分類 形成離散化數據, 決策樹的數據一定是分類數據
例子: 汽車數據樣本.txt 1 1:2 2:1 3:1 4:1 5:80 1 1:3 2:2 3:1 4:1 5:77 1 1:3 2:2 3:1 4:1 5:77 1 1:2 2:1 3:1 4:1 5:77 1 1:2 2:1 3:1 4:1 5:72 1 1:3 2:2 3:1 4:1 5:40 1 1:2 2:2 3:1 4:1 5:61 1 1:2 2:1 3:1 4:1 5:69 1 1:2 2:1 3:1 4:1 5:71 1 1:3 2:2 3:1 4:1 5:76 1 1:2 2:1 3:1 4:1 5:74 1 1:2 2:1 3:1 4:1 5:80 1 1:3 2:1 3:1 4:1 5:61 1 1:2 2:1 3:1 4:1 5:68 1 1:2 2:2 3:1 4:1 5:79 1 1:3 2:1 3:1 4:1 5:73 ClassificationDecisionTree.scala package com.bjsxt.rf import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkContext, SparkConf} /** * 決策樹 */ object ClassificationDecisionTree { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("analysItem") conf.setMaster("local[3]") val sc = new SparkContext(conf) val data = MLUtils.loadLibSVMFile(sc, "汽車數據樣本.txt") // Split the data into training and test sets (30% held out for testing) val splits = data.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) //指明分類的類別 val numClasses=2 //指定離散變量,未指明的都當作連續變量處理 //某列下有1,2,3類別 處理時候要自定為4類,雖然沒有0,但是程序默認從0開始分類 //這里天氣維度有3類,但是要指明4,這里是個坑,后面以此類推 val categoricalFeaturesInfo=Map[Int,Int](0->4,1->4,2->3,3->3) //設定評判標准 "gini"/"entropy" val impurity="entropy" //樹的最大深度,太深運算量大也沒有必要 剪枝 防止模型的過擬合!!! val maxDepth=3 //設置離散化程度,連續數據需要離散化,分成32個區間,默認其實就是32,分割的區間保證數量差不多 這個參數也可以進行剪枝 val maxBins=32 //生成模型 val model =DecisionTree.trainClassifier(trainingData,numClasses,categoricalFeaturesInfo,impurity,maxDepth,maxBins) //測試 val labelAndPreds = testData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count() println("Test Error = " + testErr) println("Learned classification tree model:\n" + model.toDebugString) } }
ClassificationRandomForest.scala package com.bjsxt.rf import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.tree.RandomForest /** * 隨機森林 * */ object ClassificationRandomForest { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("analysItem") conf.setMaster("local[3]") val sc = new SparkContext(conf) //讀取數據 val data = MLUtils.loadLibSVMFile(sc,"汽車數據樣本.txt") //將樣本按7:3的比例分成 val splits = data.randomSplit(Array(0.7, 0.3)) val (trainingData, testData) = (splits(0), splits(1)) //分類數 val numClasses = 2 // categoricalFeaturesInfo 為空,意味着所有的特征為連續型變量 val categoricalFeaturesInfo =Map[Int, Int](0->4,1->4,2->3,3->3) //樹的個數 val numTrees = 3 //特征子集采樣策略,auto 表示算法自主選取 //"auto"根據特征數量在4個中進行選擇 // 1:all 全部特征 。2:sqrt 把特征數量開根號后隨機選擇的 。 3:log2 取對數個。 4:onethird 三分之一 val featureSubsetStrategy = "auto" //純度計算 "gini"/"entropy" val impurity = "entropy" //樹的最大層次 val maxDepth = 3 //特征最大裝箱數,即連續數據離散化的區間 val maxBins = 32 //訓練隨機森林分類器,trainClassifier 返回的是 RandomForestModel 對象 val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) //打印模型 println(model.toDebugString) //保存模型 //model.save(sc,"汽車保險") //在測試集上進行測試 val count = testData.map { point => val prediction = model.predict(point.features) // Math.abs(prediction-point.label) (prediction,point.label) }.filter(r => r._1 != r._2).count() println("Test Error = " + count.toDouble/testData.count().toDouble) println("model "+model.toDebugString) } }
mean 英 [miːn] 美 [miːn] v. 表示…的意思;意思是;本意是;打算;意欲;有…的目的 adj. 吝嗇的;小氣的;不善良;刻薄;要發怒的;要發狂的 n. 中間;中庸;折中;平均數;平均值;算術中項
testSet.txt 1.658985 4.285136 -3.453687 3.424321 4.838138 -1.151539 -5.379713 -3.362104 0.972564 2.924086 -3.567919 1.531611 0.450614 -3.302219 -3.487105 -1.724432 2.668759 1.594842 -3.156485 3.191137 KMeansOnHand.py # encoding:utf-8 import numpy as np #將每行數據放入一個數組內列表,返回一個二維列表 def loadDataSet(fileName): #創建空列表 dataMat = [] fr = open(fileName) for line in fr.readlines(): #按照制表符切割每行,返回一個列表list curLine = line.strip().split('\t') #將切分后的每個列表中的元素,以float形式返回,map()內置函數,返回一個list fltLine = map(float,curLine) dataMat.append(fltLine) return dataMat #兩點歐式距離 def distEclud(vecA, vecB): #np.power(x1,x2) 對x1中的每個元素求x2次方,不會改變x1。 return np.sqrt(np.sum(np.power(vecA - vecB, 2))) #隨機找到3個中心點的位置坐標,返回一個3*2的矩陣 def randCent(dataSet, k): #返回dataSet列數,2列 n = np.shape(dataSet)[1] ''' centroids是一個3*2的矩陣,用於存儲三個中心點的坐標 ''' centroids = np.mat(np.zeros((k,n))) for j in range(n): #統計每一列的最小值 minJ = min(dataSet[:,j]) #每列最大值與最小值的差值 rangeJ = float(max(dataSet[:,j]) - minJ) #np.random.rand(k,1) 產生k行1列的數組,里面的數據是0~1的浮點型 隨機數。 array2 = minJ + rangeJ * np.random.rand(k,1) #轉換成k*1矩陣 賦值給centroids centroids[:,j] = np.mat(array2) return centroids def kMeans(dataSet, k, distMeas=distEclud, createCent=randCent): #計算矩陣所有 行數 m=80 m = np.shape(dataSet)[0] #zeros((m,2)) 創建一個80行,2列的二維數組 #numpy.mat 將二維數組轉換成矩陣 clusterAssment = np.mat(np.zeros((m,2))) #createCent找到K個隨機中心點坐標 centroids = createCent(dataSet, k) # print centroids clusterChanged = True while clusterChanged: clusterChanged = False #遍歷80個數據到每個中心點的距離 for i in range(m): #np.inf float的最大值,無窮大 minDist = np.inf #當前點屬於的類別號 minIndex = -1 #每個樣本點到三個中心點的距離 for j in range(k): # x = centroids[j,:] # print x #返回兩點距離的值 distJI = distMeas(centroids[j,:],dataSet[i,:]) if distJI < minDist: #當前最小距離的值 minDist = distJI #當前最小值屬於哪個聚類 minIndex = j #有與上次迭代計算的當前點的類別不相同的點 if clusterAssment[i,0] != minIndex: clusterChanged = True #將當前點的類別號和最小距離 賦值給clusterAssment的一行 clusterAssment[i,:] = minIndex,minDist for cent in range(k): # array = clusterAssment[:,0].A==cent # result = np.nonzero(clusterAssment[:,0].A==cent)[0] #clusterAssment[:,0].A 將0列 也就是類別號轉換成數組 #clusterAssment[:,0].A==cent 返回的是一列,列中各個元素是 True或者False,True代表的是當前遍歷的cent類別 #np.nonzero(clusterAssment[:,0].A==cent) 返回數組中值不為False的元素對應的行號下標數組 和列號下標數組 #ptsInClust 取出的是對應是當前遍歷cent類別的 所有行數據組成的一個矩陣 ptsInClust = dataSet[np.nonzero(clusterAssment[:,0].A==cent)[0]] #numpy.mean 計算矩陣的均值,axis=0計算每列的均值,axis=1計算每行的均值。 #這里是每經過一次while計算都會重新找到各個類別中中心點坐標的位置 ,axis = 0 是各個列求均值 centroids[cent,:] = np.mean(ptsInClust, axis=0) #返回 【 當前三個中心點的坐標】 【每個點的類別號,和到當前中心點的最小距離】 return centroids, clusterAssment if __name__ == '__main__': #numpy.mat 將數據轉換成80*2的矩陣 dataMat = np.mat(loadDataSet('./testSet.txt')) k=3 #centroids 三個中心點的坐標。clusterAssment 每個點的類別號|到當前中心點的最小距離 centroids, clusterAssment = kMeans(dataMat, k, distMeas=distEclud, createCent=randCent) print centroids print clusterAssment
KMeansByScikitlearn.py ## 帶圖 #coding:utf-8 import numpy as np import matplotlib.pyplot as plt from sklearn.cluster import KMeans from sklearn.datasets import make_blobs #建立12*12英寸 新的圖像 plt.figure(figsize=(12, 12)) n_samples = 1500 random_state = 170 ''' make_blobs函數是為聚類產生數據集 , 產生一個數據集和相應的標簽 n_samples:表示數據樣本點個數,默認值100 n_features:表示數據的維度,特征,默認值是2 centers:產生數據的中心點,默認值3個 shuffle :洗亂,默認值是True random_state:官網解釋是隨機生成器的種子 ''' #x返回的是向量化的數據點,y返回的是對應數據的類別號 x,y = make_blobs(n_samples=n_samples, random_state=random_state) print 'x=',x,type(x),'y=',y #使用KMeans去聚類,返回聚好的類別集合, n_clusters聚合成幾類 y_pred = KMeans(n_clusters=3, random_state=random_state).fit_predict(x) print "y_pred : ",y_pred #subplot 繪制多個子圖,221 等價於2,2,1 表示兩行兩列的子圖中的第一個 plt.subplot(221) #scatter 繪制散點圖 ,c 指定顏色 plt.scatter(x[:, 0], x[:, 1], c=y_pred) plt.title("kmeans01") transformation = [[ 0.60834549, -0.63667341], [-0.40887718, 0.85253229]] #numpy.dot 矩陣相乘 #a1= [[1,2] # [3,4] # [5,6]] #a2= [[10,20] # [30,40]] #a1*a2 = [[1*10+2*30,1*20+2*40] # [3*10+4*30,3*20+4*40] # [5*10+5*30,6*20+6*40] # ] X_aniso = np.dot(x, transformation) y_pred = KMeans(n_clusters=3, random_state=random_state).fit_predict(X_aniso) plt.subplot(222) plt.scatter(X_aniso[:, 0], X_aniso[:, 1], c=y_pred) plt.title("kmeans02") #vstack 是合並矩陣,將y=0類別的取出500行,y=1類別的取出100行,y=2類別的取出10行 X_filtered = np.vstack((x[y == 0][:500], x[y == 1][:100], x[y == 2][:10])) y_pred = KMeans(n_clusters=3, random_state=random_state).fit_predict(X_filtered) plt.subplot(223) plt.scatter(X_filtered[:, 0], X_filtered[:, 1], c=y_pred) plt.title("kmeans03") dataMat = [] fr = open("testSet.txt","r") for line in fr.readlines(): if line.strip() <> "": curLine = line.strip().split('\t') fltLine = map(float,curLine) dataMat.append(fltLine) dataMat = np.array(dataMat) #調用Scikitlearn中的KMeans #KMeans 中參數 init='k-means++' 默認就是k-means++ 如果設置為'random'是隨機找中心點 y_pred = KMeans(n_clusters=4).fit_predict(dataMat) plt.subplot(224) plt.scatter(dataMat[:,0], dataMat[:, 1], c=y_pred) plt.title("kmeans04") plt.savefig("./kmeans.png") plt.show()
kmeans_data.txt 0.0 0.0 0.0 0.1 0.1 0.1 0.2 0.2 0.2 9.0 9.0 9.0 9.1 9.1 9.1 9.2 9.2 9.2 KMeansScala.scala package com.bjsxt.kmeans import scala.tools.scalap.Main import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.mllib.clustering.KMeansModel import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors /** * 通過數據集使用kmeans訓練模型 */ object KMeansScala { def main(args: Array[String]): Unit = { //1 構建Spark對象 val conf = new SparkConf().setAppName("KMeans").setMaster("local") val sc = new SparkContext(conf) // 讀取樣本數據1,格式為LIBSVM format val data = sc.textFile("kmeans_data.txt") val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache() val numClusters = 4 // // 根據原始數據建議分兩類, val numIterations = 100 val model = new KMeans(). //設置聚類的類數 setK(numClusters). //設置找中心點最大的迭代次數 setMaxIterations(numIterations). run(parsedData) //四個中心點的坐標 val centers = model.clusterCenters val k = model.k centers.foreach(println) println(k) //保存模型 model.save(sc, "./Kmeans_model") //加載模型 val sameModel = KMeansModel.load(sc, "./Kmeans_model") println(sameModel.predict(Vectors.dense(1,1,1))) //SparkSQL讀取顯示4個中心點坐標 val sqlContext = new SQLContext(sc) sqlContext.read.parquet("./Kmeans_model/data").show() } } KMeans2.scala package com.bjsxt.kmeans import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.mllib.clustering.KMeansModel import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vector /** * 給kmeans指定中心點的位置 */ object KMeans2 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("KMeans2").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.parallelize(List( Vectors.dense(Array(-0.1, 0.0, 0.0)), Vectors.dense(Array(9.0, 9.0, 9.0)), Vectors.dense(Array(3.0, 2.0, 1.0)))) //指定文件 kmeans_data.txt 中的六個點為中心點坐標。 val centroids: Array[Vector] = sc.textFile("kmeans_data.txt") .map(_.split(" ").map(_.toDouble)) .map(Vectors.dense(_)) .collect() val model = new KMeansModel(clusterCenters=centroids) println("聚類個數 = "+model.k) //模型中心點 model.clusterCenters.foreach { println } //預測指定的三條數據 val result = model.predict(rdd) result.collect().foreach(println(_)) } }
微博案例
IK分詞器配置,以及配置外部詞典 ext.dic stopword.dic IKAnalyzer.cfg.xml <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer extra configuration</comment> <!-- configure your own dic here --> <entry key="ext_dict">ext.dic;</entry> <!-- configure your own stop dic here --> <entry key="ext_stopwords">stopword.dic</entry> </properties> testdata.txt 3794020835114249 九陽必須是其中之一的其中之一日出 3794020835114250 我要天天和當家看日出 3794020835114251 我要天天和當家看日出 test.scala package com.bjsxt.kmeans import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ListBuffer import org.apache.lucene.analysis.TokenStream import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.clustering.KMeansModel import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD import org.wltea.analyzer.lucene.IKAnalyzer import org.apache.spark.mllib.feature.IDFModel object test { def main(args: Array[String]) { val conf = new SparkConf().setAppName("KMeans1").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val rdd = sc.textFile("./testdata.txt") /** * wordRDD 是一個KV格式的RDD * K:微博ID * V:微博內容分詞后的結果 ArrayBuffer */ var wordRDD = rdd.mapPartitions(iterator => { val list = new ListBuffer[(String, ArrayBuffer[String])] while (iterator.hasNext) { //創建分詞對象 IKAnalyzer支持兩種分詞模式:最細粒度和智能分詞模式,如果構造函數參數為false,那么使用最細粒度分詞。 val analyzer = new IKAnalyzer(true) val line = iterator.next() val textArr = line.split("\t") val id = textArr(0) val text = textArr(1) //分詞 第一個參數只是標識性,沒有實際作用,第二個讀取的數據 val ts : TokenStream = analyzer.tokenStream("", text) //得到相應詞匯的內容 val term : CharTermAttribute = ts.getAttribute(classOf[CharTermAttribute]) //重置分詞器,使得tokenstream可以重新返回各個分詞 ts.reset() val arr = new ArrayBuffer[String] //遍歷分詞數據 while (ts.incrementToken()) { arr.+=(term.toString()) } list.append((id, arr)) analyzer.close() } list.iterator }) wordRDD = wordRDD.cache() //九陽, 必須, 是, 其中之一, 的 wordRDD.foreach(println) /** * HashingTF 使用hash表來存儲分詞 * HashingTF 是一個Transformer 轉換器,在文本處理中,接收詞條的集合然后把這些集合轉化成固定長度的特征向量,這個算法在哈希的同時會統計各個詞條的詞頻 * 1000:只是計算每篇微博中1000個單詞的詞頻 最大似然估計思想 */ val hashingTF: HashingTF = new HashingTF(100) /** * tfRDD * K:微博ID * V:Vector(tf,tf,tf.....) * * hashingTF.transform(x._2) 計算分詞頻數(TF) */ val tfRDD = wordRDD.map(x => { (x._1, hashingTF.transform(x._2)) }) tfRDD.foreach(println) /** * 得到IDFModel,要計算每個單詞在整個語料庫中的IDF * IDF是一個 Estimator 評價器,在一個數據集上應用它的fit()方法,產生一個IDFModel。 該IDFModel 接收特征向量(由HashingTF產生) * new IDF().fit(tfRDD.map(_._2)) 就是在組織訓練這個評價器,讓評價器知道語料庫中有那些個詞塊,方便計算IDF */ val idf: IDFModel = new IDF().fit(tfRDD.map(_._2)) /** * K:微博 ID * V:每一個單詞的TF-IDF值 * tfIdfs這個RDD中的Vector就是訓練模型的訓練集 */ val tfIdfs: RDD[(String, Vector)] = tfRDD.mapValues(idf.transform(_)) tfIdfs.foreach(x=>{ println("tfIdfs = "+x) }) //設置聚類個數 val kcluster = 20 val kmeans = new KMeans() kmeans.setK(kcluster) //使用的是kemans++算法來訓練模型 "random"|"k-means||" kmeans.setInitializationMode("k-means||") //設置最大迭代次數 kmeans.setMaxIterations(100) //訓練模型 val kmeansModel: KMeansModel= kmeans.run(tfIdfs.map(_._2)) // kmeansModel.save(sc, "d:/model001") //打印模型的20個中心點 val centers = kmeansModel.clusterCenters centers.foreach(println) // println(kmeansModel.clusterCenters) /** * 模型預測 */ val modelBroadcast = sc.broadcast(kmeansModel) /** * predicetionRDD KV格式的RDD * K:微博ID * V:分類號 */ val predicetionRDD = tfIdfs.mapValues(sample => { val model = modelBroadcast.value model.predict(sample) }) // predicetionRDD.saveAsTextFile("d:/resultttt") /** * 總結預測結果 * tfIdfs2wordsRDD:kv格式的RDD * K:微博ID * V:二元組(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....)) */ val tfIdfs2wordsRDD = tfIdfs.join(wordRDD) /** * result:KV * K:微博ID * V:(類別號,(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....))) */ val result = predicetionRDD.join(tfIdfs2wordsRDD) /** * 查看0號類別中tf-idf比較高的單詞,能代表這類的主題 */ result .filter(x => x._2._1 == 0) .flatMap(line => { val tfIdfV: Vector = line._2._2._1 val words: ArrayBuffer[String] = line._2._2._2 val tfIdfA: Array[Double] = tfIdfV.toArray val wordL = new ListBuffer[String]() val tfIdfL = new ListBuffer[Double]() var index = 0 for(i <- 0 until tfIdfA.length ;if tfIdfV(i) != 0){ wordL.+=(words(index)) tfIdfL.+=(tfIdfA(index)) index += 1 } println(wordL.length + "===" + tfIdfL.length) val list = new ListBuffer[(Double, String)] for (i <- 0 until wordL.length) { list.append((tfIdfV(i), words(i))) } list }).map(_.swap).reduceByKey(_+_).map(_.swap) .sortBy(x => x._1, false) .map(_._2) .take(30).foreach(println) sc.stop() } } original.txt 3793992720744105 #九陽有禮 無需多濾#陷入被窩溫柔鄉,起床靠毅力?九陽免濾豆漿機C668SG耀世首發!智能預約免過濾,貼心配置強到飛起,讓你再續溫柔一小時!真的很需要這款九陽豆漿機,這樣就可以和小寶貝多待會!@高海澄知 @歷銜楓 @郭河九元 3793993084926422 #謝謝你陪我走過2014#好吧,這一年馬上就要過去了,同樣這一年有歡笑,有淚水,但更多的還是幸福。雖然我知道我很任性[糾結],但寶寶姐姐老婆還是對我超級好好好好好好[群體圍觀],希望我明年能乖點,聽點話 @九陽 @璦o詠a國際范 3793993291060111 跨年啦。小伙伴們,新年快樂~[笑哈哈][笑哈哈][笑哈哈]@美的電飯煲官方微博 @美的生活電器 @九陽 @SKG互聯網家電 @中國電信湖北客服 3793993588106975 我的膽有0.9斤,我想要3.1斤重的鐵釜,有份量才夠膽量!九陽Alva0716 3793995102741635 《太上青玄慈悲太乙救苦天尊寶懺》 - 起讚 元始運元 神運元神 化太一尊 九陽天上布恩綸 手內楊枝 遍灑甘露春 大眾悉朝真 群荷深仁 朵朵擁祥雲 大... (來自 @頭條博客) - 頂禮太上青玄慈悲太乙救苦天尊 http://t.cn/zYwwlSY 3793995370610238 #九陽有禮 無需多濾#新年交好運!有了九陽,讓生活免濾無憂!@誰能許諾給我一世柔情 @索心進 @錯愛990 3793995484592300 #謝謝你陪我走過2014#2014年將至,希望能中一個好東西來送給我的家人。@九陽 @楓葉紅了112 37939954845923011 #謝謝你陪我走過2014#2014年將至,希望能中一個好東西來送給我的家人。@九陽 @楓葉紅了112 3793995781905340 免過濾,更順滑,#九陽有禮 無需多濾# 更多營養更安心!@princess佳妮昂 @木凝眉 @單純會讓人受傷航 3793996277455995 #謝謝你陪我走過2014#2014年將至,希望能中一個好東西來送給我的家人。@九陽 @楓葉紅了112 3793996323668014 #謝謝你陪我走過2014#2014年將至,希望能中一個好東西來送給我的家人。@九陽 @楓葉紅了112 KMeans11.scala package com.bjsxt.kmeans import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ListBuffer import org.apache.lucene.analysis.TokenStream import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF import org.apache.spark.mllib.feature.IDFModel import org.apache.spark.rdd.RDD import org.wltea.analyzer.lucene.IKAnalyzer import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.clustering.KMeansModel object KMeans11 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("KMeans1").setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.textFile("./original.txt") /** * wordRDD 是一個KV格式的RDD * K:微博ID * V:微博內容分詞后的結果 ArrayBuffer */ var wordRDD = rdd.mapPartitions(iterator => { val list = new ListBuffer[(String, ArrayBuffer[String])] while (iterator.hasNext) { //創建分詞對象 IKAnalyzer支持兩種分詞模式:最細粒度和智能分詞模式,如果構造函數參數為false,那么使用最細粒度分詞。 val analyzer = new IKAnalyzer(true) val line = iterator.next() val textArr = line.split("\t") val id = textArr(0) val text = textArr(1) //分詞 第一個參數只是標識性,沒有實際作用,第二個讀取的數據 val ts : TokenStream = analyzer.tokenStream("", text) //得到相應詞匯的內容 val term : CharTermAttribute = ts.getAttribute(classOf[CharTermAttribute]) //重置分詞器,使得tokenstream可以重新返回各個分詞 ts.reset() val arr = new ArrayBuffer[String] //遍歷分詞數據 while (ts.incrementToken()) { arr.+=(term.toString()) } list.append((id, arr)) analyzer.close() } list.iterator }) wordRDD = wordRDD.cache() /** * HashingTF 使用hash表來存儲分詞 * HashingTF 是一個Transformer 轉換器,在文本處理中,接收詞條的集合然后把這些集合轉化成固定長度的特征向量,這個算法在哈希的同時會統計各個詞條的詞頻 * 1000:只是計算每篇微博中1000個單詞的詞頻 最大似然估計思想 */ val hashingTF: HashingTF = new HashingTF(1000) /** * tfRDD * K:微博ID * V:Vector(tf,tf,tf.....) * * hashingTF.transform(x._2) * 按照hashingTF規則 計算分詞頻數(TF) */ val tfRDD = wordRDD.map(x => { (x._1, hashingTF.transform(x._2)) }) // tfRDD.foreach(println) /** * 得到IDFModel,要計算每個單詞在整個語料庫中的IDF * IDF是一個 Estimator 評價器,在一個數據集上應用它的fit()方法,產生一個IDFModel。 該IDFModel 接收特征向量(由HashingTF產生) * new IDF().fit(tfRDD.map(_._2)) 就是在組織訓練這個評價器,讓評價器知道語料庫中有那些個詞塊,方便計算IDF */ val idf: IDFModel = new IDF().fit(tfRDD.map(_._2)) /** * K:微博 ID * V:每一個單詞的TF-IDF值 * tfIdfs這個RDD中的Vector就是訓練模型的訓練集 * 計算TFIDF值 */ val tfIdfs: RDD[(String, Vector)] = tfRDD.mapValues(idf.transform(_)) // tfIdfs.foreach(println) //設置聚類個數 val kcluster = 20 val kmeans = new KMeans() kmeans.setK(kcluster) //使用的是kemans++算法來訓練模型 "random"|"k-means||" kmeans.setInitializationMode("k-means||") //設置最大迭代次數 kmeans.setMaxIterations(100) //訓練模型 val kmeansModel: KMeansModel= kmeans.run(tfIdfs.map(_._2)) // kmeansModel.save(sc, "d:/model001") //打印模型的20個中心點 println(kmeansModel.clusterCenters) /** * 模型預測 */ val modelBroadcast = sc.broadcast(kmeansModel) /** * predicetionRDD KV格式的RDD * K:微博ID * V:分類號 */ val predicetionRDD = tfIdfs.mapValues(vetor => { val model = modelBroadcast.value model.predict(vetor) }) // predicetionRDD.saveAsTextFile("d:/resultttt") /** * 總結預測結果 * tfIdfs2wordsRDD:kv格式的RDD * K:微博ID * V:二元組(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....)) */ val tfIdfs2wordsRDD = tfIdfs.join(wordRDD) /** * result:KV * K:微博ID * V:(類別號,(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....))) */ val result = predicetionRDD.join(tfIdfs2wordsRDD) /** * 查看0號類別中tf-idf比較高的單詞,能代表這類的主題 */ result .filter(x => x._2._1 == 0) .flatMap(line => { val tfIdfV: Vector = line._2._2._1 val words: ArrayBuffer[String] = line._2._2._2 val tfIdfA: Array[Double] = tfIdfV.toArray println("tfIdfA == "+tfIdfA.length) val wordL = new ListBuffer[String]() val tfIdfL = new ListBuffer[Double]() var index = 0 for(i <- 0 until tfIdfA.length ;if tfIdfV(i) != 0){ wordL.+=(words(index)) tfIdfL.+=(tfIdfA(index)) index += 1 } println(wordL.length + "===" + tfIdfL.length) val list = new ListBuffer[(Double, String)] for (i <- 0 until wordL.length) { list.append((tfIdfV(i), words(i))) } list }).map(_.swap).reduceByKey(_+_).map(_.swap) .sortBy(x => x._1, false) .map(_._2).filter(_.length()>=2) .take(30).foreach(println) /* val str1 = new StringBuilder val str2 = new StringBuilder val str3 = new StringBuilder val str4 = new StringBuilder val str5 = new StringBuilder result .filter(x=> x._2._1 == 0) .flatMap(x=>x._2._2._1.toArray) .sortBy(x=>x,false) .distinct .take(20) .foreach { x => { str1.append("," + tfIdf2Words.get(x).get) } } result .filter(x=> x._2._1 == 1) .flatMap(x=>x._2._2._1.toArray) .sortBy(x=>x,false) .distinct .take(20) .foreach { x => { str2.append("," + tfIdf2Words.get(x).get) } } result .filter(x=> x._2._1 == 2) .flatMap(x=>x._2._2._1.toArray) .sortBy(x=>x,false) .distinct .take(20) .foreach { x => { str3.append("," + tfIdf2Words.get(x).get) } } result .filter(x=> x._2._1 == 3) .flatMap(x=>x._2._2._1.toArray) .sortBy(x=>x,false) .distinct .take(20) .foreach { x => { str4.append("," + tfIdf2Words.get(x).get) } } result .filter(x=> x._2._1 == 4) .flatMap(x=>x._2._2._1.toArray) .sortBy(x=>x,false) .distinct .take(20) .foreach { x => { str5.append("," + tfIdf2Words.get(x).get) } } println(str1) println(str2) println(str3) println(str4) println(str5)*/ sc.stop() } }