大數據迅速發展,但是Hadoop的基礎地位一直沒有改變。理解並掌握Hadoop相關知識對於之后的相關組件學習有着地基的作用。本文整理了Hadoop基礎理論知識與常用組件介紹,雖然有一些組件已經不太常用。但是理解第一批組件的相關知識對於以后的學習很有幫助,未來的很多組件也借鑒了之前的設計理念。
文章較長,建議收藏后閱讀。
相關學習資料可以通過下面的方式下載,本文只是整理了大數據Hadoop基本知識,有精力的同學可以通過相關書籍進行更深入的學習。

本文通過以下章節由淺入深學習,建議閱讀前有一定的Linux基礎和Java基礎,並搭建好大數據環境。相關知識可以在大數據流動中獲取。

一個最簡單的大數據系統就是通過,zookeeper進行協調服務,並通過任務調度對hive或者mr進行計算任務執行,通過數據傳輸與外部系統建立聯系。當然架構在不變化,最新的大數據架構遠不止於此。但這些基本的組件對於理解大數據的原理非常的有幫助。

這些組件互相配合,最終形成了Hadoop的生態體系。
正文開始~
一、大數據發展史
信息時代數據量的爆炸性增長,讓大數據的發展異常迅速。簡單來說大數據是:
1、有海量的數據
2、有對海量數據進行挖掘的需求
3、有對海量數據進行挖掘的軟件工具(hadoop、spark、flink......)
Hadoop與大數據
HADOOP最早起源於Nutch項目。Nutch的設計目標是構建一個大型的全網搜索引擎,包括網頁抓取、索引、查詢等功能,但隨着抓取網頁數量的增加,遇到了嚴重的可擴展性問題——如何解決數十億網頁的存儲和索引問題。
2003年、2004年谷歌發表的兩篇論文為該問題提供了可行的解決方案。
——分布式文件系統(GFS),可用於處理海量網頁的存儲。
——分布式計算框架MAPREDUCE,可用於處理海量網頁的索引計算問題。
Nutch的開發人員完成了相應的開源實現HDFS和MAPREDUCE,並從Nutch中剝離成為獨立項目HADOOP,到2008年1月,HADOOP成為Apache頂級項目,迎來了它的快速發展期。
大數據組件
在大數據的發展中,組件化一直都是一個非常大的趨勢。屏蔽復雜的底層研發,只關注數據工程與數據分析本身,讓大數據得以迅速地發展。而開源的技術發展更是讓大數據的發展得到了長足的進步,大量的公司及個人貢獻了很多的開源方案。這也讓數據采集,清洗,分析,應用都變得輕而易舉。
Hadoop,Hive,Spark,Flink等等開源框架不斷的發展出現。
這些組件相互配合,共同構建起了大數據的平台體系。所以學習好大數據的相關組件知識就非常的重要,也是做好大數據應用的基礎。
大數據架構
大數據的技術與應用的發展同步進行,催生着架構的不斷演變。
從離線到實時,從數據倉庫到數據湖,從大數據平台到數據中台。有人會說大數據有點誇大,大屏泛濫沒有實際應用。但是事物的發展正是經過了從概念到實踐到落地的過程。不得不承認,大數據的架構在不斷的向更好的方向演變。
大數據發展
大數據的應用范圍在逐漸的擴大,用戶畫像,推薦系統等等領域都是大數據在支撐。而數據治理的發展讓數據安全,數據質量也得到了重視。
未來的大數據,將是大數據+數據分析+人工智能的結合體,架構和技術都將不斷的演進,越來越影響並改變我們的生活。
大數據的發展讓大數據相關崗位的需求猛增,大數據工程師,架構師,數據分析師,大數據運維等等都是非常不錯的職業選擇。不過要提醒的是大數據的技術發展迅速,要保持學習,不斷的獲取新的知識。
二、分布式協調服務——Zookeeper
在學習hadoop組件之前,要先了解下zookeeper。zookeeper是一個分布式協調服務;就是為用戶的分布式應用程序提供協調服務。
簡單的說zk解決了分布式系統的一致性問題,可以將需要一致性的數據放在zk中,同時zk也提供了監聽等機制。zk為hadoop分布式的實現提供了保證,所以大家之后不用糾結hadoop很多的操作是如何實現的,很多都依賴了zk。
zk是什么?
1、Zookeeper是為別的分布式程序服務的
2、Zookeeper本身就是一個分布式程序(只要有半數以上節點存活,zk就能正常服務)
3、Zookeeper所提供的服務涵蓋:主從協調、服務器節點動態上下線、統一配置管理、分布式共享鎖、統一名稱服務……
4、雖然說可以提供各種服務,但是zookeeper在底層其實只提供了兩個功能:
a、管理(存儲,讀取)用戶程序提交的數據;
b、並為用戶程序提供數據節點監聽服務;
不僅是大數據領域,在很多分布式系統中,zk都有着非常大的應用。
Zookeeper工作機制
1、Zookeeper:一個leader,多個follower組成的集群
2、全局數據一致:每個server保存一份相同的數據副本,client無論連接到哪個server,數據都是一致的
3、分布式讀寫,更新請求轉發,由leader實施
4、更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行
5、數據更新原子性,一次數據更新要么成功(半數以上節點成功),要么失敗
6、實時性,在一定時間范圍內,client能讀到最新數據
Zookeeper數據結構
1、層次化的目錄結構,命名符合常規文件系統規范(見下圖)
2、每個節點在zookeeper中叫做znode,並且其有一個唯一的路徑標識
3、節點Znode可以包含數據(只能存儲很小量的數據,<1M;最好是1k字節以內)和子節點
4、客戶端應用可以在節點上設置監視器

zookeeper的選舉機制
(1)Zookeeper集群中只有超過半數以上的服務器啟動,集群才能正常工作;
(2)在集群正常工作之前,myid小的服務器給myid大的服務器投票,直到集群正常工作,選出Leader;
(3)選出Leader之后,之前的服務器狀態由Looking改變為Following,以后的服務器都是Follower。
zk命令行操作
運行 zkCli.sh –server
查看znode路徑 ls /aaa
獲取znode數據 get /aaa
zk客戶端API
org.apache.zookeeper.Zookeeper是客戶端入口主類,負責建立與server的會話
它提供以下幾類主要方法 :
| 功能 | 描述 |
|---|---|
| create | 在本地目錄樹中創建一個節點 |
| delete | 刪除一個節點 |
| exists | 測試本地是否存在目標節點 |
| get/set data | 從目標節點上讀取 / 寫數據 |
| get/set ACL | 獲取 / 設置目標節點訪問控制列表信息 |
| get children | 檢索一個子節點上的列表 |
| sync | 等待要被傳送的數據 |
三、分布式文件系統——HDFS
HDFS概念
分而治之:將大文件、大批量文件,分布式存放在大量服務器上,以便於采取分而治之的方式對海量數據進行運算分析;
HDFS是一個文件系統,用於存儲文件,通過統一的命名空間——目錄樹來定位文件;
HDFS是分布式的,由很多服務器聯合起來實現其功能,集群中的服務器有各自的角色;
重要特性:
HDFS中的文件在物理上是分塊存儲(block),塊的大小可以配置;
HDFS文件系統會給客戶端提供一個統一的抽象目錄樹,客戶端通過路徑來訪問文件,形如:hdfs://namenode:port/dir/file;
目錄結構及文件分塊位置信息(元數據)的管理由namenode節點承擔——namenode是HDFS集群主節點,負責維護整個hdfs文件系統的目錄樹,以及每一個路徑(文件)所對應的block塊信息;
文件的各個block的存儲管理由datanode節點承擔——datanode是HDFS集群從節點,每一個block都可以在多個datanode上存儲多個副本;
HDFS是設計成適應一次寫入,多次讀出的場景,且不支持文件的修改(適合用來做數據分析,並不適合用來做網盤應用,因為,不便修改,延遲大,網絡開銷大,成本太高)
HDFS基本操作
不同的hadoop版本,hdfs操作命令不同。下面是hadoop3的操作命令,如果是其他版本要查詢對應的操作命令,可以使用-help 來查看幫助。
1、查詢命令
hdfs dfs -ls / 查詢/目錄下的所有文件和文件夾
hdfs dfs -ls -R 以遞歸的方式查詢/目錄下的所有文件
2、創建文件夾
hdfs dfs -mkdir /test 創建test文件夾
3、創建新的空文件
hdfs dfs -touchz /aa.txt 在/目錄下創建一個空文件aa.txt
4、增加文件
hdfs dfs -put aa.txt /test 將當前目錄下的aa.txt文件復制到/test目錄下(把-put換成-copyFromLocal效果一樣-moveFromLocal會移除本地文件)
5、查看文件內容
hdfs dfs -cat /test/aa.txt 查看/test目錄下文件aa.txt的內容(將-cat 換成-text效果一樣)
6、復制文件
hdfs dfs -copyToLocal /test/aa.txt . 將/test/aa.txt文件復制到當前目錄(.是指當前目錄,也可指定其他的目錄)
7、刪除文件或文件夾
hdfs dfs -rm -r /test/aa.txt 刪除/test/aa.txt文件(/test/aa.txt可以替換成文件夾就是刪除文件夾)
8、重命名文件
hdfs dfs -mv /aa.txt /bb.txt 將/aa.txt文件重命名為/bb.txt
9、將源目錄中的所有文件排序合並到一個本地文件
hdfs dfs -getmerge / local-file 將/目錄下的所有文件合並到本地文件local-file中
可以訪問web端對文件操作有一個直觀的認識。訪問NameNode Web UI進行查看。

我們可以理解為我們通過命令對文件及文件夾進行了操作,但這都是hdfs給我們提供的服務,而hdfs底層會將我們的文件分布式存儲。
HDFS工作機制
可以通過hdfs的工作機制來理解一下原理。來了解一下hdfs是如何通過指令完成文件存取工作的。
-
HDFS集群分為兩大角色:NameNode、DataNode (Secondary Namenode)
-
NameNode負責管理整個文件系統的元數據
-
DataNode 負責管理用戶的文件數據塊
-
文件會按照固定的大小(blocksize)切成若干塊后分布式存儲在若干台datanode上
-
每一個文件塊可以有多個副本,並存放在不同的datanode上
-
Datanode會定期向Namenode匯報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量
-
HDFS的內部工作機制對客戶端保持透明,客戶端請求訪問HDFS都是通過向namenode申請來進行
寫數據
客戶端要向HDFS寫數據,首先要跟namenode通信以確認可以寫文件並獲得接收文件block的datanode,然后,客戶端按順序將文件逐個block傳遞給相應datanode,並由接收到block的datanode負責向其他datanode復制block的副本。
讀數據
客戶端將要讀取的文件路徑發送給namenode,namenode獲取文件的元信息(主要是block的存放位置信息)返回給客戶端,客戶端根據返回的信息找到相應datanode逐個獲取文件的block並在客戶端本地進行數據追加合並從而獲得整個文件。
我們要理解的是namenode的工作機制尤其是元數據管理機制,這對於以后做數據治理也非常的有幫助。
Namenode的工作機制
1、namenode職責:負責客戶端請求的響應,元數據的管理(查詢,修改)。
2、namenode對數據的管理采用了三種存儲形式:
內存元數據(NameSystem)
磁盤元數據鏡像文件
數據操作日志文件(可通過日志運算出元數據)
3、元數據存儲方式:
內存中有一份完整的元數據(內存meta data)
磁盤有一個“准完整”的元數據鏡像(fsimage)文件(在namenode的工作目錄中)
用於銜接內存metadata和持久化元數據鏡像fsimage之間的操作日志(edits文件)
4、checkpoint:每隔一段時間,會由secondary namenode將namenode上積累的所有edits和一個最新的fsimage下載到本地,並加載到內存進行merge(這個過程稱為checkpoint)
Datanode工作機制
1、Datanode工作職責:
存儲管理用戶的文件塊數據
定期向namenode匯報自身所持有的block信息(通過心跳信息上報)
2、Datanode掉線判斷
datanode進程死亡或者網絡故障造成datanode無法與namenode通信,namenode不會立即把該節點判定為死亡,要經過一段時間。
客戶端操作
hdfs提供了對外的api,可以進行客戶端的操作。我們只需要引入相關依賴就可以進行操作了。
這里是java示例,也有其他語言的操作。這種基本的操作后期的新組件也都有替代的方案,這里主要是熟悉為主。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
如果是windows下研發,要指定hadoop安裝包位置,才能引入相關包操作,安裝包可以去資料中查看。
示例代碼如下:
Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://192.168.137.101:9820"), conf, "root");
fs.copyFromLocalFile(new Path("D:\\aaa.txt"), new Path("/aaa"));
fs.close();
四、分布式運算框架——Mapreduce
Mapreduce是一個分布式運算程序的編程框架。大概的意思可以理解為對於hdfs中的分布式數據,可以通過Mapreduce這種分布式的框架方式來進行復雜的運算。試想一下,如果手寫分布式運算,要進行任務分配,分批執行,再匯總。這是非常復雜的工程,分布式運算框架的作用就是簡化這個過程。
Mapreduce是偏底層的技術,后期的Hive框架將sql語句轉化成Mapreduce語句進行執行,來簡化操作。后期的spark,flink也都是支持sql語句的。不過這種分布式預算的思想還是非常的重要,也影響了后來很多框架的運算原理。實際工作中不會遇到,但是要對原理有一個了解。
一個完整的mapreduce程序在分布式運行時有三類實例進程:
1、MRAppMaster:負責整個程序的過程調度及狀態協調
2、mapTask:負責map階段的整個數據處理流程
3、ReduceTask:負責reduce階段的整個數據處理流程
執行流程
1、 一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動后根據本次job的描述信息,計算出需要的maptask實例數量,然后向集群申請機器啟動相應數量的maptask進程
2、 maptask進程啟動之后,根據給定的數據切片(哪個文件的哪個偏移量范圍)范圍進行數據處理,主體流程為:
a) 利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對
b) 將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到緩存
c) 將緩存中的KV對按照K分區排序后不斷溢寫到磁盤文件
3、 MRAppMaster監控到所有maptask進程任務完成之后(真實情況是,某些maptask進程處理完成后,就會開始啟動reducetask去已完成的maptask處fetch數據),會根據客戶指定的參數啟動相應數量的reducetask進程,並告知reducetask進程要處理的數據范圍(數據分區)
4、Reducetask進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干台maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行重新歸並排序,然后按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然后調用客戶指定的outputformat將結果數據輸出到外部存儲。
mapreduce的shuffle機制
mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle;
具體來說:就是將maptask輸出的處理結果數據,分發給reducetask,並在分發的過程中,對數據按key進行了分區和排序;

Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。
隨后將mr的程序開發好,並運行即可,這就涉及到一個問題。如何運行。
五、資源調度——Yarn
在hadoop最開始的版本中,mapreduce的程序要想運行必須自己進行調度,調配資源。這就導致管理越老越混亂,Yarn就出現了。
Apache Hadoop YARN:Yet Another Resource Negotiator,另一種資源協調者。
Yarn是一個資源調度平台,負責為運算程序提供服務器運算資源,相當於一個分布式的操作系統平台,而mapreduce等運算程序則相當於運行於操作系統之上的應用程序。隨着hadoop的發展,yarn一直是最核心的資源調度中心,未來我們寫的spark,flink程序都可以通過Yarn來進行調度。
YARN的重要概念
1、 yarn並不清楚用戶提交的程序的運行機制
2、 yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)
3、 yarn中的主管角色叫ResourceManager
4、 yarn中具體提供運算資源的角色叫NodeManager
5、 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味着yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、spark,flink……
6、 所以,spark等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規范的資源請求機制即可
Yarn就成為一個通用的資源調度平台,從此,企業中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享。
ResourceManager
ResourceManager是YARN中的主節點服務,它負責集群中所有資源的統一管理和作業調度。
簡單來講,ResourceManager主要完成的功能包括:
- 與客戶端交互,處理來自客戶端的請求;
- 啟動和管理ApplicationMaster,並在它運行失敗時重新啟動它;
- 管理NodeManager,接收來自NodeManager的資源匯報信息,並向NodeManager下達管理指令(比如殺死container等);
- 資源管理與調度,接收來自ApplicationMaster的資源申請請求,並為之分配資源。
NodeManager
NodeManager是YARN集群中的每個具體節點的資源和任務管理者。NodeManager的主要功能包括:
- 定時向ResourceManager匯報本節點上的資源使用情況和各個Container的運行狀態;
- 接收並處理ApplicationMaster對container的啟動、停止等各種請求;
- 管理Container的生命周期,監控Container的資源使用;
- 管理任務日志和不同應用程序用到的附屬服務(auxiliary service)。
ApplicationMaster
用戶提交的每個應用程序均包含一個ApplicationMaster,主要功能包括:
- 與ResourceManager調度器協商以獲取資源;
- 將得到的資源進一步分配給內部的任務;
- 與NodeManager通信以啟動或停止任務;
- 監控所有任務的運行狀態,並在任務運行失敗時負責進行容錯處理。
Container
Container是YARN中的資源抽象,它封裝了某個節點上的多個維度的資源,如CPU、內存、磁盤、網絡等。當ApplicationMaster向ResourceManager申請資源時,ResourceManager為ApplicationMaster 返回的資源是用Container表示的。
當用戶向YARN中提交一個應用程序后,YARN將分兩個階段運行該應用程序:
第一階段:啟動ApplicationMaster;
第二階段:由ApplicationMaster創建應用程序;為它申請資源,並監控它的整個運行過程,直到運行完成。

第1步:
client 讀取作業配置信息並創建Job的環境,調用job.waitForCompletion 方法,向集群提交一個MapReduce 作業 。
第2步:
資源管理器給任務分配一個新的作業ID 。
第3步:
作業的client核實作業的輸出路徑,計算輸入文件的分片,將作業的資源 (包括:Jar包、配置文件,split信息等) 拷貝到HDFS集群上的作業提交目錄。
第4步:
通過調用資源管理器的submitApplication()來提交作業。
第5步:
當資源管理器收到submitApplciation()的請求時,就將該請求發給調度器 (scheduler),調度器向NodeManager發送一個啟動container的請求。
第6步:
節點管理器NodeManager啟動container,內部運行着一個主類為 MRAppMaster的Java應用。其通過創造一些對象來監控作業的進度,得到各個task的進度和完成報告 。
第7步:
然后其通過分布式文件系統HDFS來獲取由客戶端提前計算好的輸入split,然后為每個輸入split創建一個map任務,根據mapreduce.job.reduces創建 reduce任務對象。
第8步:
如果不是小作業,那應用管理器向資源管理器請求container來運行所有的map和reduce任務 。
這些請求是通過心跳來傳輸的,包括每個map任務的數據位置。比如:存放輸入split的主機名和機架(rack),調度器利用這些信息來調度任務,盡量將任務分配給存儲數據的節點或相同機架的節點。
第9步:
當一個任務由資源管理器的調度器分配給一個container后,AppMaster通過聯系NodeManager來啟動container。
第10步:
任務由一個主類為YarnChild的Java應用執行,在運行任務之前首先本地化任務需要的資源。比如:作業配置、JAR文件以及分布式緩存的所有依賴文件 。
第11步:
最后,啟動並運行map或reduce任務 。
同理在向yarn提交spark程序時也會按這種方式進行。這就讓資源的調度與程序本身分離。
六、數倉工具——Hive
Hive是基於Hadoop的一個數據倉庫工具(離線),可以將結構化的數據文件映射為一張數據庫表,並提供類SQL查詢功能。
Hive解決了MapReduce的復雜研發問題,采用類SQL語法學習成本低。
Hive需要有一個存儲元數據的數據庫,可以用mysql等等。
簡單來說,通過Hive就可以與hdfs文件建立映射關系。我們只需要通過開發hivesql語句,就可以對hdfs上的文件進行操作了。
Hive基本操作
hive中有一個默認的庫:
庫名: default
庫目錄:hdfs://ip:9000/user/hive/warehouse
新建庫:
create database db_order;
庫建好后,在hdfs中會生成一個庫目錄:
hdfs://hdp20-01:9000/user/hive/warehouse/db_order.db
建表:
use db_order;
create table t_order(id string,create_time string,amount float,uid string);
表建好后,會在所屬的庫目錄中生成一個表目錄
/user/hive/warehouse/db_order.db/t_order
只是,這樣建表的話,hive會認為表數據文件中的字段分隔符為 ^A
正確的建表語句為:
create table t_order(id string,create_time string,amount float,uid string)
row format delimited
fields terminated by ',';
這樣就指定了,我們的表數據文件中的字段分隔符為 ","
刪除表:
drop table t_order;
刪除表的效果是:
hive會從元數據庫中清除關於這個表的信息;
hive還會從hdfs中刪除這個表的表目錄;
內部表與外部表
內部表(MANAGED_TABLE):表目錄按照hive的規范來部署,位於hive的倉庫目錄/user/hive/warehouse中
外部表(EXTERNAL_TABLE):表目錄由建表用戶自己指定
create external table t_access(ip string,url string,access_time string)
row format delimited
fields terminated by ','
location '/access/log';
外部表和內部表的特性差別:
1、內部表的目錄在hive的倉庫目錄中 VS 外部表的目錄由用戶指定
2、drop一個內部表時:hive會清除相關元數據,並刪除表數據目錄
3、drop一個外部表時:hive只會清除相關元數據;
一個hive的數據倉庫,最底層的表,一定是來自於外部系統,為了不影響外部系統的工作邏輯,在hive中可建external表來映射這些外部系統產生的數據目錄;
然后,后續的etl操作,產生的各種表建議用managed_table
分區表
分區表的實質是:在表目錄中為數據文件創建分區子目錄,以便於在查詢時,MR程序可以針對分區子目錄中的數據進行處理,縮減讀取數據的范圍。
比如,網站每天產生的瀏覽記錄,瀏覽記錄應該建一個表來存放,但是,有時候,我們可能只需要對某一天的瀏覽記錄進行分析
這時,就可以將這個表建為分區表,每天的數據導入其中的一個分區;
當然,每日的分區目錄,應該有一個目錄名(分區字段)
示例:
create table t_access(ip string,url string,access_time string)
partitioned by(dt string)
row format delimited
fields terminated by ',';
數據導入導出
方式1:導入數據的一種方式:
手動用hdfs命令,將文件放入表目錄;
方式2:在hive的交互式shell中用hive命令來導入本地數據到表目錄
hive>load data local inpath '/root/order.data.2' into table t_order;
方式3:用hive命令導入hdfs中的數據文件到表目錄
hive>load data inpath '/access.log' into table t_access partition(dt='20210806');
文件格式
HIVE支持很多種文件格式: SEQUENCE FILE | TEXT FILE | PARQUET FILE | RC FILE
create table t_pq(movie string,rate int) stored as textfile;
create table t_pq(movie string,rate int) stored as sequencefile;
create table t_pq(movie string,rate int) stored as parquetfile;
七、任務調度——azkaban
azkaban是一個工作流調度系統。與之類似的還有oozie,airflow等等。
一個完整的數據分析系統通常都是由大量任務單元組成:
shell腳本程序,java程序,mapreduce程序、hive腳本等;
各任務單元之間存在時間先后及前后依賴關系;
為了很好地組織起這樣的復雜執行計划,需要一個工作流調度系統來調度執行。
在實際工作中,絕不是一個程序就能搞定一切的。需要分為多個程序運行,還有前后順序,所以任務調度系統一直存在。也在不斷的發展。
簡單的任務調度:直接使用linux的crontab來定義;
復雜的任務調度:開發調度平台
或使用現成的開源調度系統,比如ooize、azkaban等。
Azkaban介紹
Azkaban是由Linkedin開源的一個批量工作流任務調度器。用於在一個工作流內以一個特定的順序運行一組工作和流程。Azkaban定義了一種KV文件格式來建立任務之間的依賴關系,並提供一個易於使用的web用戶界面維護和跟蹤你的工作流。
地址:https://github.com/azkaban/azkaban
Azkaban使用
Azkaba內置的任務類型支持command、java
1、創建job描述文件
vi command.job
command.jobtype=command command=echo 'hello'
2、將job資源文件打包成zip文件
zip command.job
3、通過azkaban的web管理平台創建project並上傳job壓縮包
首先創建project

上傳zip包

4、啟動執行該job

Command類型多job工作流flow
1、創建有依賴關系的多個job描述
第一個job:foo.job
foo.jobtype=commandcommand=echo foo
第二個job:bar.job依賴foo.job
bar.jobtype=commanddependencies=foocommand=echo bar
2、將所有job資源文件打到一個zip包中

3、在azkaban的web管理界面創建工程並上傳zip包
4、啟動工作流flow
HDFS操作任務
1、創建job描述文件
fs.jobtype=commandcommand=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz
2、將job資源文件打包成zip文件

3、通過azkaban的web管理平台創建project並上傳job壓縮包
4、啟動執行該job
八、數據傳輸Sqoop
Sqoop是一個用於在Hadoop\和關系型數據庫之間流轉數據的一個工具。可以使用Sqoop將數據從關系型數據庫系統(RDBMS)比如MySQL或者Oracle導入到hadoop分布式文件系統(HDFS)上,然后數據在Hadoop MapReduce上轉換,以及將數據導出到RDBMS中。
Sqoop自動實現了上面提到的很多過程,Sqoop使用MapReduce來導入和導出數據,這樣既可以提供並行化操作又可以提高容錯能力。
Sqoop是Apache軟件基金會的一個開源項目。可以訪問http://Sqoop.apache.org獲取,sqoop目前已經趨於穩定,從apache退休了。
在每天定時定時調度把mysql數據傳到大數據集群中,或者把hive中數據傳走時會用到。不過隨時數據實時化的要求變高,sqoop的作用小了很多。但是一些歷史數據的導入還是需要的。
Sqoop使用
Sqoop提供了一系列的操作工具,使用Sqoop需要指定你想要使用的具體工具,以及提供對應的一些參數,使用方式如下。
sqoop tool-name [tool-arguments]
可以使用sqoop help命令查看幫助信息
sqoop help
Available commands:
codegen 生成Java代碼
create-hive-table 根據表結構生成hive表
eval 執行SQL語句並返回結果
export 導出HDFS文件到數據庫表
help 幫助
import 從數據庫導入數據到HDFS
import-all-tables 導入數據庫所有表到HDFS
list-databases 列舉所有的database
list-tables 列舉數據庫中的所有表
version 查看版本信息
可以看到,sqoop提供的操作工具有10個。具體工具的使用幫助可以sqoop help (tool-name)或者sqoop tool-name --help進行查看。
sqoop-import
import工具可以用於從RDBMS中導入一張表到HDFS。表中的每一條記錄對應生成HDFS文件中的每一行。這些記錄可以以text files或者Avro或者SequenceFiles格式進行存儲。
使用方法如下
$ sqoop-import (generic-args) (import-args)
參數列表-import基本參數
| 參數 | 描述 |
|---|---|
| –connect < jdbc-uri > | JDBC連接串 |
| –connection-manager < class-name > | 連接管理類 |
| –driver < class-name > | 手動指定JDBC驅動類 |
| –hadoop-mapred-home < dir > | 可以覆蓋$HADOOP_MAPRED_HOME |
| –help | 使用幫助 |
| –password-file | 指定包含密碼的文件 |
| -P | 執行import時會暫停,等待用戶手動輸入密碼 |
| –password < password > | 直接將密碼寫在命令行中 |
| –username < username > | 指定用戶名 |
| –verbose | 顯示Sqoop任務更多執行信息 |
| –connection-param-file < filename > | 可選的參數,用於提供連接參數 |
| –relaxed-isolation | 設置每個mapmer的連接事務隔離 |
Hive參數
以下是導入到 Hive 中時可選的參數:
--hive-home <dir>:覆蓋 $HIVE_HOME。
--hive-import:將表導入Hive(如果沒有設置,則使用Hive的默認分隔符。)
--hive-overwrite:覆蓋Hive表中的現有數據。
--create-hive-table:如果設置,那么如果存在目標hivetable,作業將失敗。默認情況下,此屬性為false。
--hive-table <table-name>:設置導入到Hive時要使用的表名。
--hive-drop-import-delims:導入到Hive時,從字符串字段中刪除\n、\r和\01。
--hive-delims-replacement:在導入到Hive時,將字符串字段中的\n、\r和\01替換為用戶定義的字符串。
--hive-partition-key:分配到分區的Hive字段的名稱。
--hive-partition-value <v>:作為該任務導入到Hive中的分區鍵的字符串值。
示例:
bin/sqoop import \
--connect jdbc:mysql://hostname:3306/mydb \
--username root \
--password root \
--table mytable \
--num-mappers 1 \
--hive-import \
--hive-database mydb \
--hive-table mytable \
--fields-terminated-by "\t" \
--delete-target-dir \
--hive-overwrite
sqoop-export
Sqoop的export工具可以從HDFS同步一系列文件數據到RDBMS中。使用這個工具的前提是導出目標表在數據庫必須存在。導出文件根據用戶指定的分隔符轉化成一系列的輸出記錄。
默認的導出操作會將這些記錄轉化成一系列的INSERT語句,根據這些語句將記錄插入到關系型數據庫中。而在update模式下,Sqoop會生成一系列的UPDATE語句,將數據庫中已經存在的記錄進行更新。在call模式下,Sqoop會為每一條記錄調用一個存儲過程來處理。
$ sqoop-export (generic-args) (export-args)
基本參數
| *參數* | *描述* |
|---|---|
| –connect < jdbc-uri > | JDBC連接串 |
| –connection-manager < class-name > | 連接管理類 |
| –driver < class-name > | 手動指定JDBC驅動類 |
| –hadoop-mapred-home < dir > | 可以覆蓋$HADOOP_MAPRED_HOME |
| –help | 使用幫助 |
| –password-file | 指定包含密碼的文件 |
| -P | 執行import時會暫停,等待用戶手動輸入密碼 |
| –password < password > | 直接將密碼寫在命令行中 |
| –username < username > | 指定用戶名 |
示例:
$ bin/sqoop export \
--connect jdbc:mysql://hostname:3306/mydb \
--username root \
--password root \
--table mytable \
--num-mappers 1 \
--export-dir /user/hive/warehouse/mydb.db/mytable \
--input-fields-terminated-by "\t"
九、數據收集-Flume
- Flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統
- 支持在日志系統中定制各類數據發送方,用於收集數據
- Flume提供對數據進行簡單處理,並寫到各種數據接收方
Flume是成熟的開源日志采集系統,且本身就是hadoop生態體系中的一員,與hadoop體系中的各種框架組件具有天生的親和力,可擴展性強。
相對於用Shell腳本和Java的收集方式,規避了對日志采集過程中的容錯處理不便控制,減少了開發工作量。
例如對於實時的日志分析這種場景中,對數據采集部分的可靠性、容錯能力要求通常不會非常嚴苛,因此使用通用的flume日志采集框架完全可以滿足需求。
Flume的配置
安裝好flume以后需要對其進行配置。
flume通過事件(agent)進行運作,事件下包含如下的概念。
Source: 用來定義采集系統的源頭
Channel: 把Source采集到的日志進行傳輸,處理
Sink:定義數據的目的地
下面是一個示例。
有一個概念就是,我們定義了agent1這個agent。
定義了agent1.sources的系列設置去執行tail -F實時的采集日志數據。
通過Channel傳輸,最后指定Sink將日志存入hdfs。
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
#使用exec作為數據源source組件
agent1.sources.source1.type = exec
#使用tail -F命令實時收集新產生的日志數據
agent1.sources.source1.command = tail -F /var/logs/nginx/access_log
agent1.sources.source1.channels = channel1
#configure host for source
#配置一個攔截器插件
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
#使用攔截器插件獲取agent所在服務器的主機名
agent1.sources.source1.interceptors.i1.hostHeader = hostname
#配置sink組件為hdfs
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
#agent1.sinks.sink1.hdfs.path=hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H%M%S
#指定文件sink到hdfs上的路徑
agent1.sinks.sink1.hdfs.path=
hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M_%hostname
#指定文件名前綴
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
#指定每批下沉數據的記錄條數
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#指定下沉文件按1G大小滾動
agent1.sinks.sink1.hdfs.rollSize = 1024*1024*1024
#指定下沉文件按1000000條數滾動
agent1.sinks.sink1.hdfs.rollCount = 1000000
#指定下沉文件按30分鍾滾動
agent1.sinks.sink1.hdfs.rollInterval = 30
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
#使用memory類型channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
隨后將flume用指定的配置文件啟動即可。
bin/flume-ng agent --conf ./conf -f ./conf/weblog.properties.2 -n agent
注意:啟動命令中的 -n 參數要給配置文件中配置的agent名稱
目前市面針對日志采集的有 Flume,Logstash,Filebeat,Fluentd ,rsyslog 很多種。但基本的原理是相同的,要根據公司的情況進行選擇。
本文從大數據理論到常用的基礎組件進行的筆記的整理,更深入的hadoop理論知識建議通過書籍進行深入的閱讀學習。而Spark,Flink等組件的學習將會通過單獨的文章進行筆記整理。希望對大家有所幫助,更多大數據相關知識,請關注 大數據流動~

