前言:
上一篇文章 我學習使用pandas進行簡單的數據分析,但是各位...... Pandas處理、分析不了TB級別數據的大數據,於是再看看Hadoop。
另附上人心不足蛇吞象 對故事一的感悟:
Hadoop背景
我接觸過的數據總結為3類:
1.結構化數據
關系數據中的數據,有字段進行約束;(有規則)
2.半結構化數據
HTMLXml/Json....這種數據雖然有結構,但約束不是很嚴格;(還有些規則可言)
3.非結構化數據
.text文本/日志....這種數據沒有head、body、key這些標簽標記,更沒有什么字段約束;(沒有規則可言)
4.如何儲存海量的非結構化數據?
那么問題來了我們如何把大量的非結構化/半結構化的數據儲存起來,進行高效得 分析、檢索呢?
Google公司通過論文方式 提出了的解決方案;(沒告訴咋實現哦!)
1.如何完成海量數據安全儲存?
把海量數據分布式存儲,不同得服務器集群節點;(分布式:以后數據越大也不怕了,可以動態擴展服務器來分解。)
2.如何對海量數據高效分析、檢索?
MapReduce:編程思想 Simplified Data Processing on Large Clusters
把一個某個復雜的計算任務 --------》分割成小的任務單元----------》並行在各個節點上運行
搜集各個節點上運行結果---------》合並運行--------》二次map------>二次reduce........》直到 計算出結果位置;
5.什么是Hadoop?
有一個大神級程序員 Dong Cutting,受Google以上三篇論文的啟發,用Java開發出來Hadoop,
6.python怎么調用Hadoop?
hadoop的MapReduce這么厲害,作為python小白我怎么調用它呢?Hadoop的調用API也叫MapReduce
一、Hadoop v2 架構圖
二、Hadoop的運行模型
HDFS集群: data_node 數據存儲節點 name_node 名稱節點 、secondary_node輔助名稱節點
YARN:集群資源管理
三、centos7安裝Hadoop2.6.3
1.環境准備
centos7中一般已經自帶JDK
[root@localhost zhanggen]# java -version openjdk version "1.8.0_102" OpenJDK Runtime Environment (build 1.8.0_102-b14) OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)
[root@localhost profile.d]# yum -y install java-1.8.0-openjdk*
Centos7關閉防火牆
查看狀態: systemctl status firewalld
開機禁用 : systemctl disable firewalld
開機啟用 : systemctl enable firewalld
Centos7 關閉selinux服務
[root@localhost hdfs]# setenforce 1 [root@localhost hdfs]# getenforce Enforcing [root@localhost hdfs]# setenforce 0 [root@localhost hdfs]# getenforce Permissive
3.編譯
[root@localhost bdapps]# mkdir /bdapps/ [root@localhost bdapps]# ls hadoop-2.6.2 [root@localhost bdapps]# tar -zxvf /home/zhanggen/Desktop/hadoop-2.6.2.tar.gz -C /bdapps/
[root@localhost bdapps]# ln -sv /bdapps/hadoop-2.6.2 /bdapps/hadoop ‘/bdapps/hadoop’ -> ‘/bdapps/hadoop-2.6.2’
4.設置Java和Hadoop相關環境變量

export HADOOP_PREFIX=/bdapps/hadoop export PATH=$PATH:${HADOOP_PREFIX}/bin:${HADOOP_PREFIX}/sbin export HADOOP_YARN_HOME=${HADOOP_PREFIX} export HADOOP_MAPPRED_HOME=${HADOOP_PREFIX} export HADOOP_COMMON_HOME=${HADOOP_PREFIX} export HADOOP_HDFS_HOME=${HADOOP_PREFIX}

export java_HOME=/usr

groupadd hadoop
useradd -g hadoop hadoop

mkdir -pv /data/hadoop/hdfs/{nn,dn,snn} chown -R hadoop:hadoop /data/hadoop/hdfs/

cd /bdapps/hadoop/ mkdir logs chown -R hadoop:hadoop ./*
PS:如果你的MapReduce任務執行失敗了,去獲取applicationId查看報錯信息
yarn logs -applicationId application_1551852706740_0001 #查看任務執行日志
征服Hadoop的奧秘:首先取到程序運行日志-------》再分析日志中出現錯誤的原因------》解決問題
8.Hadoop主要配置文件(/bdapps/hadoop/etc/hadoop)
8.0.core-site.xml
針對NameNode IP地址 、端口(默認為8020)

<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://0.0.0.0:8020</value> <final>false</final> </property> </configuration>
8.1.hdfs-site.xml
針對HDFS相關的屬性,每一個數據塊的副本數量、NN和DA存儲數據的目錄 step6中創建的目錄。

<configuration> <property> <name>dfs.http.address</name> <value>0.0.0.0:50070</value> </property> <property> <name>dfs.datanode.http.address</name> <value>0.0.0.0:50075</value> </property> <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:///data/hadoop/hdfs/nn</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:///data/hadoop/hdfs/dn</value> </property> <property> <name>fs.checkpoint.dir</name> <value>file:///data/hadoop/hdfs/snn</value> </property> <property> <name>fs.checkpoint.edits.dir</name> <value>file:///data/hadoop/hdfs/snn</value> </property> </configuration>
8.2.mapred-site.xml(指定使用yarn)
指定MapReduce是單獨運行 還是運行在yarn之上,Hadoop2肯定是運行在yarn之上的;見 二、Hadoop的運行模型

<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
8.3.yarn-site.xml
yarn-site.xml 用於配置YARN進程及YARN的相關屬性,首先需要指定ResourceManager守護進程的主機和監聽的端口,對於偽分布式模型來講,其主機為localhost,
默認的端口為8032;其次需要指定ResourceManager使用的scheduler,以及NodeManager的輔助服務。一個簡要的配置示例如下所示:

<configuration> <property> <name>yarn.resourcemanager.address</name> <value>0.0.0.0:8032</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>0.0.0.0:8030</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>0.0.0.0:8031</value> </property> <property> <name>yarn.resourcemanager.admin.address</name> <value>0.0.0.0:8033</value> </property> <property> <name>yarn.resourcemanager.webapp.address</name> <value>0.0.0.0:8088</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.auxservices.mapreduce_shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value> </property> </configuration>
8.4.slave文件
slave文件存儲了當前集群所有slave節點的列表,對於偽分布式模型,其文件內容僅應該為localhost,這特的確是這個文件的默認值。因此,為分布式模型中,次文件的內容保持默認即可。
PS:
如果服務器/虛擬機的進程起不來請確保本地 IP和配置文件里面的IP是否已經發生變化!
8.5.格式化HDFS
在HDFS的NN啟動之前需要先初始化其用於存儲數據的目錄。
如果hdfs-site.xml中dfs.namenode.name.dir屬性指定的目錄不存在,格式化命令會自動創建之;
如果事先存在,請確保其權限設置正確,此時格式操作會清除其內部的所有數據並重新建立一個新的文件系統,需要以hdfs用戶的身份執行如下命令
19/03/01 11:31:22 INFO namenode.FSImage: Allocated new BlockPoolId: BP-1276811871-127.0.0.1-1551411082356 19/03/01 11:31:22 INFO common.Storage: Storage directory /data/hadoop/hdfs/nn has been successfully formatted. 19/03/01 11:31:22 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0 19/03/01 11:31:22 INFO util.ExitUtil: Exiting with status 0 19/03/01 11:31:22 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at localhost/127.0.0.1 ************************************************************/ [hdfs@localhost hadoop]$ hdfs namenode -format
9.啟動Hadoop
HDFS格式化完成之后就可以啟動 去/bdapps/hadoop/etc/hadoop目錄下啟動Hadoop的5大守護進程了。
9.1.啟動HDFS集群
HDFS有3個守護進程:namenode、datanode和secondarynamenode,他們都表示通過hadoop-daemon.sh腳本啟動或停止。以hadoop用戶執行相關命令;

hadoop-daemon.sh start namenode hadoop-daemon.sh start secondarynamenode hadoop-daemon.sh start datanode jps #jps命令:專門用於查看當前運行的java程序的,還支持遠程,python有嗎? 61392 NameNode 61602 Jps 61480 SecondaryNameNode 61532 DataNode
HDFS集群web訪問接口:
http://127.0.0.1:50070/dfshealth.html#tab-overview
9.2啟動yarn集群
切換成yarn用戶:YARN有2個守護進程:resourcemanager和nodemanager,它們通過yarn-daemon.sh腳本啟動或者停止。以hadoop用戶執行相關命令即可。

yarn-daemon.sh start resourcemanager yarn-daemon.sh start nodemanager jps 61803 ResourceManager 62043 NodeManager 62142 Jps
yarn集群web訪問接口:
http://127.0.0.1:8088/cluster
10.測試
使用Hadoop自帶的 hadoop-mapreduce-examples-2.6.2.jar,執行MapReduce任務是否可以正常執行,如果可以就意味着安裝成功了。
在執行任務是要切換到hdfs用戶下

[hdfs@localhost mapreduce]$ yarn jar hadoop-mapreduce-examples-2.6.2.jar wordcount /test/a.txt /test/a.out

[hdfs@localhost mapreduce]$ hdfs dfs -ls /test/a.out Found 2 items -rw-r--r-- 1 hdfs supergroup 0 2019-03-01 14:09 /test/a.out/_SUCCESS -rw-r--r-- 1 hdfs supergroup 54 2019-03-01 14:09 /test/a.out/part-r-00000 [hdfs@localhost mapreduce]$ hdfs dfs -cat /test/a.out/part-r-00000 aaaaaaaaa 1 aaaaaaaaaaaaaaaaa 1 aaaaaaaaaaaaaaaaaaa 1
11、python3調用HDFS集群API
Hadoop安裝好了;(雖說是偽分布式的,如果要做分布式做好ssh免密碼登錄,把配置文件分發出去就好了)
但是我在網上看到python的pyhdfs模塊可以調用HDFS集群的API進行上傳、下載、查找....文件...於是儲備下來了,也許可以用作后期 Hadoop自動化項目;
注意:在使用pyhdfs模塊之前一定要確保Hadoop的配置文件都監聽在外網端口並修改host文件。
192.168.226.142 localhost #windows hosts文件的路徑 C:\WINDOWS\system32\drivers\etc\host Linux /etc/host
pip install pyhdfs -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

import pyhdfs fs = pyhdfs.HdfsClient(hosts='192.168.226.142,50070',user_name='hdfs') fs.get_home_directory()#返回這個用戶的根目錄 fs.get_active_namenode()#返回可用的namenode節點 path='/zhanggen/' file='myfile.txt' file_name=path+file #在上傳文件之前,請修改本地 host文件 192.168.226.142 localhost C:\WINDOWS\system32\drivers\etc\host print('路徑已經存在') if fs.exists(path) else fs.mkdirs(path) print('文件已存在') if fs.exists(path+file) else fs.copy_from_local('c.txt',path+file,) #上傳本地文件到HDFS集群 fs.copy_to_local(path+file, 'zhanggen.txt')# 從HDFS集群上copy 文件到本地 fs.listdir(path) #以列表形式['a.out', 'a.txt'],返回指定目錄下的所有文件 response=fs.open(path+file) #查看文件內容 print(response.read()) fs.append(file_name,'Thanks myself for fighting ',) #在HDFS集群的文件里面添加內容 response=fs.open(file_name) #查看文件內容 print(response.read()) print(fs.get_file_checksum(file_name)) #查看文件大小 print(fs.list_status(path))#查看單個路徑的狀態 print(fs.list_status(file_name))#查看單個文件狀態
四、Python3調用Hadoop MapReduce API
pip3 install mrjob -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
hadoop fs -chown -R hadoop:hadoop /tmp #在執行MapReduce任務的時候hadoop用戶會創建socket,通過jdbc訪問。所以在執行你寫得MapReduce之前一定要設置權限
MapReduce 任務工作流程(假設要對以下3行數據,統計詞頻):
a b c
a c
a
第1步:map 把每個字符串映射成鍵、值對
(a,1)(b,1)(c,1)
(a,1)(c1)
(a1)
自動shuffle & sort:
shuffle: 把相同鍵的 值組合成1個的列表,(洗牌:平時玩牌的時候 把手里數字/字母相同的撲克牌們,碼放在一起例如3A一起,兩個2一起!)
sort: 再根據鍵排序;
(a,[1,1,1])
(b,[1])
(c,[1,1])
第2步:shuffle and sort之后,把鍵相同的值放到列表了,就方便reduce的時候對值進行計算、聚合操作(sum,mean,max)了!
(a,3)
(b,1)
(c,2)

#!/usr/bin/python # -*- coding: utf-8 -*- from mrjob.job import MRJob import re class MRwordCount(MRJob): ''' line:一行數據 (a,1)(b,1)(c,1) (a,1)(c1) (a1) ''' def mapper(self, _, line): pattern=re.compile(r'(\W+)') for word in re.split(pattern=pattern,string=line): if word.isalpha(): yield (word.lower(),1) def reducer(self, word, count): #shuff and sort 之后 ''' (a,[1,1,1]) (b,[1]) (c,[1]) ''' l=list(count) yield (word,sum(l)) if __name__ == '__main__': MRwordCount.run() #run()方法,開始執行MapReduce任務。
python /MyMapReduce.py /a.txt -r hadoop #在Hadoop集群,執行Python的MapReduce任務。

[hdfs@localhost hadoop]$ python /MyMapReduce.py /a.txt -r hadoop No configs found; falling back on auto-configuration No configs specified for hadoop runner Looking for hadoop binary in /bdapps/hadoop/bin... Found hadoop binary: /bdapps/hadoop/bin/hadoop Using Hadoop version 2.6.2 Looking for Hadoop streaming jar in /bdapps/hadoop... Found Hadoop streaming jar: /bdapps/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar Creating temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477 Copying local files to hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/files/... Running step 1 of 1... packageJobJar: [/tmp/hadoop-unjar1053011439569578237/] [] /tmp/streamjob2611643769127644921.jar tmpDir=null Connecting to ResourceManager at /192.168.226.142:8032 Connecting to ResourceManager at /192.168.226.142:8032 Total input paths to process : 1 number of splits:2 Submitting tokens for job: job_1551427459997_0003 Submitted application application_1551427459997_0003 The url to track the job: http://192.168.226.142:8088/proxy/application_1551427459997_0003/ Running job: job_1551427459997_0003 Job job_1551427459997_0003 running in uber mode : false map 0% reduce 0% map 50% reduce 0% map 100% reduce 0% map 100% reduce 100% Job job_1551427459997_0003 completed successfully Output directory: hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output Counters: 49 File Input Format Counters Bytes Read=18 File Output Format Counters Bytes Written=18 File System Counters FILE: Number of bytes read=54 FILE: Number of bytes written=331118 FILE: Number of large read operations=0 FILE: Number of read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=320 HDFS: Number of bytes written=18 HDFS: Number of large read operations=0 HDFS: Number of read operations=9 HDFS: Number of write operations=2 Job Counters Data-local map tasks=2 Launched map tasks=2 Launched reduce tasks=1 Total megabyte-seconds taken by all map tasks=20077568 Total megabyte-seconds taken by all reduce tasks=5390336 Total time spent by all map tasks (ms)=19607 Total time spent by all maps in occupied slots (ms)=19607 Total time spent by all reduce tasks (ms)=5264 Total time spent by all reduces in occupied slots (ms)=5264 Total vcore-seconds taken by all map tasks=19607 Total vcore-seconds taken by all reduce tasks=5264 Map-Reduce Framework CPU time spent (ms)=1990 Combine input records=0 Combine output records=0 Failed Shuffles=0 GC time elapsed (ms)=352 Input split bytes=302 Map input records=3 Map output bytes=36 Map output materialized bytes=60 Map output records=6 Merged Map outputs=2 Physical memory (bytes) snapshot=501116928 Reduce input groups=3 Reduce input records=6 Reduce output records=3 Reduce shuffle bytes=60 Shuffled Maps =2 Spilled Records=12 Total committed heap usage (bytes)=319430656 Virtual memory (bytes) snapshot=6355677184 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 job output is in hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output Streaming final output from hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477/output... "a" 3 "b" 1 "c" 2 Removing HDFS temp directory hdfs:///user/hdfs/tmp/mrjob/MyMapReduce.hdfs.20190304.084739.219477... Removing temp directory /tmp/MyMapReduce.hdfs.20190304.084739.219477... [hdfs@localhost hadoop]$
1.MapReduce案例
統計一下本周的報警情況
由於遺留了Zabbix報警未分類的問題,導致zabbix報警-----》轉換到運維平台的工單信息---------》都是一個text字段!

#!/usr/bin/python # -*- coding: utf-8 -*- from mrjob.job import MRJob import re,csv key_list=['Free disk space','Zabbix agent','Alive ecerpdb.com','Oracle','FTP service','No data received from Orabbix','Alive ecpim'] class MRwordCount(MRJob): def mapper(self, _, line): #文本有幾行mapper 就執行幾次 row = csv.reader([line]).__next__() #讀取CSV文件的每一行,變成列表形式! for key in key_list: if key in row[-1]: yield (key,1) #自動shuffle & reduce def reducer(self, word, count): #maper yeild 幾個key ,reducer就執行幾次 l=list(count) yield (word,sum(l)) if __name__ == '__main__': MRwordCount.run() #run()方法,開始執行MapReduce任務。

#!/usr/bin/python # -*- coding:utf-8 -*- from mrjob.job import MRJob import re,csv,sys class University_top10(MRJob): def mapper(self, _,line): row = csv.reader([line]).__next__() # 讀取CSV文件的每一行,變成列表形式! if not row[0].isdigit():#跳過['名次', '學校名稱', '總分', '類型', '所在省份', '所在城市', '辦學方向', '主管部門'] return yield ('top',(float(row[2]),row[1])) #學校名稱,總分 def reducer(self, top_key,score_and_university_name): top10=[] for key in list(score_and_university_name): top10.append(key) top10.sort() top10=top10[-10:] top10.reverse() for key in top10: yield key[1],key[0] if __name__ == '__main__': University_top10.run()

#!/usr/bin/python # -*- coding:utf-8 -*- from mrjob.job import MRJob import re,csv,sys class University_top10(MRJob): def mapper(self, _,line): row = csv.reader([line]).__next__() # 讀取CSV文件的每一行,變成列表形式! if not row[0].isdigit(): #跳過['名次', '學校名稱', '總分', '類型', '所在省份', '所在城市', '辦學方向', '主管部門','人均消費'] return yield ('top',(float(row[2]),row[1])) #yield('top',學校名稱,總分) if row[-1].isdigit(): yield ('cost',(float(row[-1]),row[1])) #yeild('coast',學校名稱,人均消費) def reducer(self,key,value): #由於mapper方法yeild了2個key【top和coast】,所以reducer方法執行2次 top10=[] for list_item in list(value): top10.append(list_item) top10.sort() top10=top10[-10:] top10.reverse() if key=='top'else top10.sort()#求出得分前十的大學,和消費前十的大學 for list_item in top10: yield list_item[1],list_item[0] if __name__ == '__main__': University_top10.run()

#!/usr/bin/python # -*- coding:utf-8 -*- from mrjob.job import MRJob class Max_Mix_Temperature(MRJob): def mapper(self, _,line): row=line.split(',') if row[2]== 'min': yield 'min',(float(row[3]),row[1]) if row[2]=='max': yield 'max',(float(row[3]),row[1]) def reducer(self,key,value): l=list(value) if key=='max': yield key,max(l) elif key=='min': yield key,min(l) if __name__ == '__main__': Max_Mix_Temperature.run()

1 # #!/usr/bin/python 2 # # -*- coding:utf-8 -*- 3 from mrjob.job import MRJob,MRStep 4 5 class Top3_Mean_Friends(MRJob): 6 def mapper1(self, _,line): 7 row=line.split(',') 8 if row[2].isdigit() and row[3].isdigit(): 9 yield (row[2],int(row[3])) #返回年齡 和朋友個數 10 11 12 def reducer1(self,age,friends): 13 friends_count=list(friends) 14 yield (age, sum(friends_count)/len(friends_count)) #每個年齡段的 平均朋友個數 15 16 def mapper2(self, age,average_coun): 17 yield (None,(average_coun,str(age)+'year')) 18 19 def reducer2(self, _,average_list): #在平均朋友個數的基礎上,求出朋友數數量最大的top3 20 l=list(average_list) 21 l.sort() 22 top3=l[-3:] 23 top3.reverse() 24 for i in top3: 25 yield (i[0],i[1]) 26 27 def steps(self): #連接多個mapper、reducer 28 return [ 29 MRStep(mapper=self.mapper1,reducer=self.reducer1), 30 MRStep(mapper=self.mapper2,reducer=self.reducer2) 31 ] 32 33 if __name__ == '__main__': 34 Top3_Mean_Friends.run()

from mrjob.job import MRJob,MRStep class Top_AnnualSalary_Job(MRJob): #ID,Name,JobTitle,AnnualSalary,GrossSpend def mapper1(self, _,line): row=line.split(',') if row[0]=='ID': return yield (row[2],int(row[3])) def reducer1(self,jobtitle,annualsalary): AnnualSalary=list(annualsalary) yield ('job_annualsalary',(sum(AnnualSalary)/len(AnnualSalary),jobtitle)) def mapper2(self,key,job_annualsalary): yield key,job_annualsalary def reducer2(self, key, values): l=list(values) print('old',l) new_l=[] for i in l: new_l.append(i) new_l.sort(reverse=True) new_l=new_l[0:3] for k in new_l: yield k[1],k[0] def steps(self): # 連接多個mapper、reducer return [ MRStep(mapper=self.mapper1, reducer=self.reducer1),MRStep(mapper=self.mapper2, reducer=self.reducer2)] if __name__ == '__main__': Top_AnnualSalary_Job.run()
2.map + combine +reduce
map節點把所有集合計算的工作通過TCP協議傳輸到reduce節點會出現 單點負載壓力的問題,所以combine出現了;
combine就是小的reduce,可以在map發送數據給reduce之前進行在map節點做初步的聚合運算,減小reduce節點的壓力, 加速MapReduce任務的執行;