spark ---詞頻統計(二)


利用python來操作spark的詞頻統計,現將過程分享如下:

1.新建項目:(這里是在已有的項目中創建的,可單獨創建wordcount項目)

①新建txt文件: wordcount.txt (文件內容: 跟詞頻統計(一)中文件一致)

②創建py文件: word.py

from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName('word').setMaster('local')
sc = SparkContext(conf=conf)
wordcount = sc.textFile(r'E:\Hbase\api\wordcount')
counts = wordcount.flatMap(lambda x: x.split(" "))\
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b).collect()
print(counts)

打印結果:

[('development', 1), ('producing', 1), ('among', 1), ('Source,', 1), ('for', 1), ('quality', 1), ('to', 1), ('influencers', 1), ('advances', 1), ('collaborative', 1), ('model', 1), ('in', 1), ('the', 2), ('of', 1), ('has', 1), ('successful', 1), ('Software', 1), ("Foundation's", 1), ('most', 1), ('long', 1), ('that', 1), ('uded', 1), ('as', 1), ('Open', 1), ('The', 1), ('commitment', 1), ('software', 1), ('consistently', 1), ('a', 1), ('development.', 1), ('high', 1), ('future', 1), ('Apache', 1), ('served', 1), ('open', 1), ('https://s.apache.org/PIRA', 1)]

2.如果詞頻統計的數據量較小,可以如下:

from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName('word').setMaster('local')
sc = SparkContext(conf=conf)
data = [r"uded among the most successful influencers in Open Source, The Apache Software Foundation's\
       commitment to collaborative development has long served as a model for producing consistently\
       high quality software that advances the future of open development. https://s.apache.org/PIRA\
      "]
datardd = sc.parallelize(data)

result = datardd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect()
print(result)

打印結果:

[('', 18), ('development', 1), ('producing', 1), ('among', 1), ('Source,', 1), ('for', 1), ('quality', 1), ('to', 1), ('influencers', 1), ('served', 1), ('collaborative', 1), ('in', 1), ('the', 2), ('Open', 1), ('of', 1), ('has', 1), ('long', 1), ('https://s.apache.org/PIRA\\\n', 1), ('successful', 1), ('Software', 1), ('most', 1), ('consistently\\\n', 1), ('a', 1), ("Foundation's\\\n", 1), ('uded', 1), ('as', 1), ('advances', 1), ('The', 1), ('commitment', 1), ('software', 1), ('that', 1), ('development.', 1), ('high', 1), ('future', 1), ('Apache', 1), ('model', 1), ('open', 1)]
18/07/27 17:14:34 INFO SparkContext: Invoking stop() from shutdown hook
result = datardd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect()
print(result)

總結:

①在window上利用python操作spark詞頻統計前提: 本機要有spark的系統環境配置 和java的環境配置,配置步驟類似於python,必須確保安裝無誤才能運行結果.

②注意本機的python 跟spark的版本的兼容性,本機是python3.6 /spark1.6,很明顯兩者不兼容,需要重新安裝3.5版本的python, linux上python跟spark也是同理.

③實際工作過程中需要注意:collect()的數據收集,在大數據處理過程中都是p量級的海量數據,如果不加思索直接collect()會直接導致內存崩潰.

​ 針對③的情況,建議操作有:

from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName('word').setMaster('local')
sc = SparkContext(conf=conf)
data = [r"uded among the most successful influencers in Open Source, The Apache Software Foundation's\
       commitment to collaborative development has long served as a model for producing consistently\
       high quality software that advances the future of open development. https://s.apache.org/PIRA\
      "]
datardd = sc.parallelize(data)

# result = datardd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).collect()
# print(result)
result = datardd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
def f(x):
    print(x)

result2 = result.foreach(f)
print(result2)
解釋:它是通過foreach()遍歷循環將數據結果挨個挨個打印到后台,避免撐爆內存的風險!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM