萬字長文|Hadoop入門筆記(附資料)


大數據迅速發展,但是Hadoop的基礎地位一直沒有改變。理解並掌握Hadoop相關知識對於之后的相關組件學習有着地基的作用。本文整理了Hadoop基礎理論知識與常用組件介紹,雖然有一些組件已經不太常用。但是理解第一批組件的相關知識對於以后的學習很有幫助,未來的很多組件也借鑒了之前的設計理念。

文章較長,建議收藏后閱讀。

相關學習資料可以通過下面的方式下載,本文只是整理了大數據Hadoop基本知識,有精力的同學可以通過相關書籍進行更深入的學習。

本文通過以下章節由淺入深學習,建議閱讀前有一定的Linux基礎Java基礎,並搭建好大數據環境。相關知識可以在大數據流動中獲取。

一個最簡單的大數據系統就是通過,zookeeper進行協調服務,並通過任務調度對hive或者mr進行計算任務執行,通過數據傳輸與外部系統建立聯系。當然架構在不變化,最新的大數據架構遠不止於此。但這些基本的組件對於理解大數據的原理非常的有幫助。

這些組件互相配合,最終形成了Hadoop的生態體系。

正文開始~

一、大數據發展史

信息時代數據量的爆炸性增長,讓大數據的發展異常迅速。簡單來說大數據是:

1、有海量的數據

2、有對海量數據進行挖掘的需求

3、有對海量數據進行挖掘的軟件工具(hadoop、spark、flink......)

Hadoop與大數據

HADOOP最早起源於Nutch項目。Nutch的設計目標是構建一個大型的全網搜索引擎,包括網頁抓取、索引、查詢等功能,但隨着抓取網頁數量的增加,遇到了嚴重的可擴展性問題——如何解決數十億網頁的存儲和索引問題。

2003年、2004年谷歌發表的兩篇論文為該問題提供了可行的解決方案。

——分布式文件系統(GFS),可用於處理海量網頁的存儲。

——分布式計算框架MAPREDUCE,可用於處理海量網頁的索引計算問題。

Nutch的開發人員完成了相應的開源實現HDFS和MAPREDUCE,並從Nutch中剝離成為獨立項目HADOOP,到2008年1月,HADOOP成為Apache頂級項目,迎來了它的快速發展期。

大數據組件

在大數據的發展中,組件化一直都是一個非常大的趨勢。屏蔽復雜的底層研發,只關注數據工程與數據分析本身,讓大數據得以迅速地發展。而開源的技術發展更是讓大數據的發展得到了長足的進步,大量的公司及個人貢獻了很多的開源方案。這也讓數據采集,清洗,分析,應用都變得輕而易舉。

Hadoop,Hive,Spark,Flink等等開源框架不斷的發展出現。

這些組件相互配合,共同構建起了大數據的平台體系。所以學習好大數據的相關組件知識就非常的重要,也是做好大數據應用的基礎。

大數據架構

大數據的技術與應用的發展同步進行,催生着架構的不斷演變。

從離線到實時,從數據倉庫到數據湖,從大數據平台到數據中台。有人會說大數據有點誇大,大屏泛濫沒有實際應用。但是事物的發展正是經過了從概念到實踐到落地的過程。不得不承認,大數據的架構在不斷的向更好的方向演變。

大數據發展

大數據的應用范圍在逐漸的擴大,用戶畫像,推薦系統等等領域都是大數據在支撐。而數據治理的發展讓數據安全,數據質量也得到了重視。

未來的大數據,將是大數據+數據分析+人工智能的結合體,架構和技術都將不斷的演進,越來越影響並改變我們的生活。

大數據的發展讓大數據相關崗位的需求猛增,大數據工程師,架構師,數據分析師,大數據運維等等都是非常不錯的職業選擇。不過要提醒的是大數據的技術發展迅速,要保持學習,不斷的獲取新的知識。

二、分布式協調服務——Zookeeper

在學習hadoop組件之前,要先了解下zookeeper。zookeeper是一個分布式協調服務;就是為用戶的分布式應用程序提供協調服務。

簡單的說zk解決了分布式系統的一致性問題,可以將需要一致性的數據放在zk中,同時zk也提供了監聽等機制。zk為hadoop分布式的實現提供了保證,所以大家之后不用糾結hadoop很多的操作是如何實現的,很多都依賴了zk。

zk是什么?

1、Zookeeper是為別的分布式程序服務的

2、Zookeeper本身就是一個分布式程序(只要有半數以上節點存活,zk就能正常服務)

3、Zookeeper所提供的服務涵蓋:主從協調、服務器節點動態上下線、統一配置管理、分布式共享鎖、統一名稱服務……

4、雖然說可以提供各種服務,但是zookeeper在底層其實只提供了兩個功能:

a、管理(存儲,讀取)用戶程序提交的數據;

b、並為用戶程序提供數據節點監聽服務;

不僅是大數據領域,在很多分布式系統中,zk都有着非常大的應用。

Zookeeper工作機制

1、Zookeeper:一個leader,多個follower組成的集群

2、全局數據一致:每個server保存一份相同的數據副本,client無論連接到哪個server,數據都是一致的

3、分布式讀寫,更新請求轉發,由leader實施

4、更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行

5、數據更新原子性,一次數據更新要么成功(半數以上節點成功),要么失敗

6、實時性,在一定時間范圍內,client能讀到最新數據

Zookeeper數據結構

1、層次化的目錄結構,命名符合常規文件系統規范(見下圖)

2、每個節點在zookeeper中叫做znode,並且其有一個唯一的路徑標識

3、節點Znode可以包含數據(只能存儲很小量的數據,<1M;最好是1k字節以內)和子節點

4、客戶端應用可以在節點上設置監視器

zookeeper的選舉機制

(1)Zookeeper集群中只有超過半數以上的服務器啟動,集群才能正常工作;

(2)在集群正常工作之前,myid小的服務器給myid大的服務器投票,直到集群正常工作,選出Leader;

(3)選出Leader之后,之前的服務器狀態由Looking改變為Following,以后的服務器都是Follower。

zk命令行操作

運行 zkCli.sh –server 進入命令行工具

查看znode路徑 ls /aaa

獲取znode數據 get /aaa

zk客戶端API

org.apache.zookeeper.Zookeeper是客戶端入口主類,負責建立與server的會話

它提供以下幾類主要方法 :

功能 描述
create 在本地目錄樹中創建一個節點
delete 刪除一個節點
exists 測試本地是否存在目標節點
get/set data 從目標節點上讀取 / 寫數據
get/set ACL 獲取 / 設置目標節點訪問控制列表信息
get children 檢索一個子節點上的列表
sync 等待要被傳送的數據

三、分布式文件系統——HDFS

HDFS概念

分而治之:將大文件、大批量文件,分布式存放在大量服務器上,以便於采取分而治之的方式對海量數據進行運算分析;

HDFS是一個文件系統,用於存儲文件,通過統一的命名空間——目錄樹來定位文件;

HDFS是分布式的,由很多服務器聯合起來實現其功能,集群中的服務器有各自的角色;

重要特性

HDFS中的文件在物理上是分塊存儲(block),塊的大小可以配置;

HDFS文件系統會給客戶端提供一個統一的抽象目錄樹,客戶端通過路徑來訪問文件,形如:hdfs://namenode:port/dir/file;

目錄結構及文件分塊位置信息(元數據)的管理由namenode節點承擔——namenode是HDFS集群主節點,負責維護整個hdfs文件系統的目錄樹,以及每一個路徑(文件)所對應的block塊信息;

文件的各個block的存儲管理由datanode節點承擔——datanode是HDFS集群從節點,每一個block都可以在多個datanode上存儲多個副本;

HDFS是設計成適應一次寫入,多次讀出的場景,且不支持文件的修改(適合用來做數據分析,並不適合用來做網盤應用,因為,不便修改,延遲大,網絡開銷大,成本太高)

HDFS基本操作

不同的hadoop版本,hdfs操作命令不同。下面是hadoop3的操作命令,如果是其他版本要查詢對應的操作命令,可以使用-help 來查看幫助。

1、查詢命令
  hdfs dfs -ls / 查詢/目錄下的所有文件和文件夾

 hdfs dfs -ls -R 以遞歸的方式查詢/目錄下的所有文件

2、創建文件夾
  hdfs dfs -mkdir /test 創建test文件夾

3、創建新的空文件
  hdfs dfs -touchz /aa.txt 在/目錄下創建一個空文件aa.txt

4、增加文件
  hdfs dfs -put aa.txt /test 將當前目錄下的aa.txt文件復制到/test目錄下(把-put換成-copyFromLocal效果一樣-moveFromLocal會移除本地文件)

5、查看文件內容
  hdfs dfs -cat /test/aa.txt 查看/test目錄下文件aa.txt的內容(將-cat 換成-text效果一樣)

6、復制文件
  hdfs dfs -copyToLocal /test/aa.txt . 將/test/aa.txt文件復制到當前目錄(.是指當前目錄,也可指定其他的目錄)

7、刪除文件或文件夾
  hdfs dfs -rm -r /test/aa.txt 刪除/test/aa.txt文件(/test/aa.txt可以替換成文件夾就是刪除文件夾)

8、重命名文件
  hdfs dfs -mv /aa.txt /bb.txt 將/aa.txt文件重命名為/bb.txt

9、將源目錄中的所有文件排序合並到一個本地文件
  hdfs dfs -getmerge / local-file 將/目錄下的所有文件合並到本地文件local-file中

可以訪問web端對文件操作有一個直觀的認識。訪問NameNode Web UI進行查看。

我們可以理解為我們通過命令對文件及文件夾進行了操作,但這都是hdfs給我們提供的服務,而hdfs底層會將我們的文件分布式存儲。

HDFS工作機制

可以通過hdfs的工作機制來理解一下原理。來了解一下hdfs是如何通過指令完成文件存取工作的。

  1. HDFS集群分為兩大角色:NameNode、DataNode (Secondary Namenode)

  2. NameNode負責管理整個文件系統的元數據

  3. DataNode 負責管理用戶的文件數據塊

  4. 文件會按照固定的大小(blocksize)切成若干塊后分布式存儲在若干台datanode上

  5. 每一個文件塊可以有多個副本,並存放在不同的datanode上

  6. Datanode會定期向Namenode匯報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量

  7. HDFS的內部工作機制對客戶端保持透明,客戶端請求訪問HDFS都是通過向namenode申請來進行

寫數據

客戶端要向HDFS寫數據,首先要跟namenode通信以確認可以寫文件並獲得接收文件block的datanode,然后,客戶端按順序將文件逐個block傳遞給相應datanode,並由接收到block的datanode負責向其他datanode復制block的副本。

讀數據

客戶端將要讀取的文件路徑發送給namenode,namenode獲取文件的元信息(主要是block的存放位置信息)返回給客戶端,客戶端根據返回的信息找到相應datanode逐個獲取文件的block並在客戶端本地進行數據追加合並從而獲得整個文件。

我們要理解的是namenode的工作機制尤其是元數據管理機制,這對於以后做數據治理也非常的有幫助。

Namenode的工作機制

1、namenode職責:負責客戶端請求的響應,元數據的管理(查詢,修改)。

2、namenode對數據的管理采用了三種存儲形式:

內存元數據(NameSystem)

磁盤元數據鏡像文件

數據操作日志文件(可通過日志運算出元數據)

3、元數據存儲方式:

內存中有一份完整的元數據(內存meta data)

磁盤有一個“准完整”的元數據鏡像(fsimage)文件(在namenode的工作目錄中)

用於銜接內存metadata和持久化元數據鏡像fsimage之間的操作日志(edits文件)

4、checkpoint:每隔一段時間,會由secondary namenode將namenode上積累的所有edits和一個最新的fsimage下載到本地,並加載到內存進行merge(這個過程稱為checkpoint)

Datanode工作機制

1、Datanode工作職責:

存儲管理用戶的文件塊數據

定期向namenode匯報自身所持有的block信息(通過心跳信息上報)

2、Datanode掉線判斷

datanode進程死亡或者網絡故障造成datanode無法與namenode通信,namenode不會立即把該節點判定為死亡,要經過一段時間。

客戶端操作

hdfs提供了對外的api,可以進行客戶端的操作。我們只需要引入相關依賴就可以進行操作了。

這里是java示例,也有其他語言的操作。這種基本的操作后期的新組件也都有替代的方案,這里主要是熟悉為主。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
</dependency>

如果是windows下研發,要指定hadoop安裝包位置,才能引入相關包操作,安裝包可以去資料中查看。

示例代碼如下:

Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://192.168.137.101:9820"), conf, "root");
fs.copyFromLocalFile(new Path("D:\\aaa.txt"), new Path("/aaa"));
fs.close();

四、分布式運算框架——Mapreduce

Mapreduce是一個分布式運算程序的編程框架。大概的意思可以理解為對於hdfs中的分布式數據,可以通過Mapreduce這種分布式的框架方式來進行復雜的運算。試想一下,如果手寫分布式運算,要進行任務分配,分批執行,再匯總。這是非常復雜的工程,分布式運算框架的作用就是簡化這個過程。

Mapreduce是偏底層的技術,后期的Hive框架將sql語句轉化成Mapreduce語句進行執行,來簡化操作。后期的spark,flink也都是支持sql語句的。不過這種分布式預算的思想還是非常的重要,也影響了后來很多框架的運算原理。實際工作中不會遇到,但是要對原理有一個了解。

一個完整的mapreduce程序在分布式運行時有三類實例進程:

1、MRAppMaster:負責整個程序的過程調度及狀態協調

2、mapTask:負責map階段的整個數據處理流程

3、ReduceTask:負責reduce階段的整個數據處理流程

執行流程

1、 一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動后根據本次job的描述信息,計算出需要的maptask實例數量,然后向集群申請機器啟動相應數量的maptask進程

2、 maptask進程啟動之后,根據給定的數據切片(哪個文件的哪個偏移量范圍)范圍進行數據處理,主體流程為:

a) 利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對

b) 將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到緩存

c) 將緩存中的KV對按照K分區排序后不斷溢寫到磁盤文件

3、 MRAppMaster監控到所有maptask進程任務完成之后(真實情況是,某些maptask進程處理完成后,就會開始啟動reducetask去已完成的maptask處fetch數據),會根據客戶指定的參數啟動相應數量的reducetask進程,並告知reducetask進程要處理的數據范圍(數據分區)

4、Reducetask進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干台maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行重新歸並排序,然后按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然后調用客戶指定的outputformat將結果數據輸出到外部存儲。

mapreduce的shuffle機制

mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle;

具體來說:就是將maptask輸出的處理結果數據,分發給reducetask,並在分發的過程中,對數據按key進行了分區和排序;

Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。

隨后將mr的程序開發好,並運行即可,這就涉及到一個問題。如何運行。

五、資源調度——Yarn

在hadoop最開始的版本中,mapreduce的程序要想運行必須自己進行調度,調配資源。這就導致管理越老越混亂,Yarn就出現了。

Apache Hadoop YARN:Yet Another Resource Negotiator,另一種資源協調者。

Yarn是一個資源調度平台,負責為運算程序提供服務器運算資源,相當於一個分布式的操作系統平台,而mapreduce等運算程序則相當於運行於操作系統之上的應用程序。隨着hadoop的發展,yarn一直是最核心的資源調度中心,未來我們寫的spark,flink程序都可以通過Yarn來進行調度。

YARN的重要概念

1、 yarn並不清楚用戶提交的程序的運行機制

2、 yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)

3、 yarn中的主管角色叫ResourceManager

4、 yarn中具體提供運算資源的角色叫NodeManager

5、 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味着yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、spark,flink……

6、 所以,spark等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規范的資源請求機制即可

Yarn就成為一個通用的資源調度平台,從此,企業中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享。

ResourceManager

ResourceManager是YARN中的主節點服務,它負責集群中所有資源的統一管理和作業調度。

簡單來講,ResourceManager主要完成的功能包括:

  1. 與客戶端交互,處理來自客戶端的請求;
  2. 啟動和管理ApplicationMaster,並在它運行失敗時重新啟動它;
  3. 管理NodeManager,接收來自NodeManager的資源匯報信息,並向NodeManager下達管理指令(比如殺死container等);
  4. 資源管理與調度,接收來自ApplicationMaster的資源申請請求,並為之分配資源。

NodeManager

NodeManager是YARN集群中的每個具體節點的資源和任務管理者。NodeManager的主要功能包括:

  1. 定時向ResourceManager匯報本節點上的資源使用情況和各個Container的運行狀態;
  2. 接收並處理ApplicationMaster對container的啟動、停止等各種請求;
  3. 管理Container的生命周期,監控Container的資源使用;
  4. 管理任務日志和不同應用程序用到的附屬服務(auxiliary service)。

ApplicationMaster

用戶提交的每個應用程序均包含一個ApplicationMaster,主要功能包括:

  1. 與ResourceManager調度器協商以獲取資源;
  2. 將得到的資源進一步分配給內部的任務;
  3. 與NodeManager通信以啟動或停止任務;
  4. 監控所有任務的運行狀態,並在任務運行失敗時負責進行容錯處理。

Container

Container是YARN中的資源抽象,它封裝了某個節點上的多個維度的資源,如CPU、內存、磁盤、網絡等。當ApplicationMaster向ResourceManager申請資源時,ResourceManager為ApplicationMaster 返回的資源是用Container表示的。

當用戶向YARN中提交一個應用程序后,YARN將分兩個階段運行該應用程序:

第一階段:啟動ApplicationMaster;

第二階段:由ApplicationMaster創建應用程序;為它申請資源,並監控它的整個運行過程,直到運行完成。

第1步:

client 讀取作業配置信息並創建Job的環境,調用job.waitForCompletion 方法,向集群提交一個MapReduce 作業 。

第2步:

資源管理器給任務分配一個新的作業ID 。

第3步:

作業的client核實作業的輸出路徑,計算輸入文件的分片,將作業的資源 (包括:Jar包、配置文件,split信息等) 拷貝到HDFS集群上的作業提交目錄。

第4步:

通過調用資源管理器的submitApplication()來提交作業。

第5步:

當資源管理器收到submitApplciation()的請求時,就將該請求發給調度器 (scheduler),調度器向NodeManager發送一個啟動container的請求。

第6步:

節點管理器NodeManager啟動container,內部運行着一個主類為 MRAppMaster的Java應用。其通過創造一些對象來監控作業的進度,得到各個task的進度和完成報告 。

第7步:

然后其通過分布式文件系統HDFS來獲取由客戶端提前計算好的輸入split,然后為每個輸入split創建一個map任務,根據mapreduce.job.reduces創建 reduce任務對象。

第8步:

如果不是小作業,那應用管理器向資源管理器請求container來運行所有的map和reduce任務 。

這些請求是通過心跳來傳輸的,包括每個map任務的數據位置。比如:存放輸入split的主機名和機架(rack),調度器利用這些信息來調度任務,盡量將任務分配給存儲數據的節點或相同機架的節點。

第9步:

當一個任務由資源管理器的調度器分配給一個container后,AppMaster通過聯系NodeManager來啟動container。

第10步:

任務由一個主類為YarnChild的Java應用執行,在運行任務之前首先本地化任務需要的資源。比如:作業配置、JAR文件以及分布式緩存的所有依賴文件 。

第11步:

最后,啟動並運行map或reduce任務 。

同理在向yarn提交spark程序時也會按這種方式進行。這就讓資源的調度與程序本身分離。

六、數倉工具——Hive

Hive是基於Hadoop的一個數據倉庫工具(離線),可以將結構化的數據文件映射為一張數據庫表,並提供類SQL查詢功能。

Hive解決了MapReduce的復雜研發問題,采用類SQL語法學習成本低。

Hive需要有一個存儲元數據的數據庫,可以用mysql等等。

簡單來說,通過Hive就可以與hdfs文件建立映射關系。我們只需要通過開發hivesql語句,就可以對hdfs上的文件進行操作了。

Hive基本操作

hive中有一個默認的庫:

庫名: default

庫目錄:hdfs://ip:9000/user/hive/warehouse

新建庫:

create database db_order;

庫建好后,在hdfs中會生成一個庫目錄:

hdfs://hdp20-01:9000/user/hive/warehouse/db_order.db

建表:

use db_order;

create table t_order(id string,create_time string,amount float,uid string);

表建好后,會在所屬的庫目錄中生成一個表目錄

/user/hive/warehouse/db_order.db/t_order

只是,這樣建表的話,hive會認為表數據文件中的字段分隔符為 ^A

正確的建表語句為:

create table t_order(id string,create_time string,amount float,uid string)

row format delimited

fields terminated by ',';

這樣就指定了,我們的表數據文件中的字段分隔符為 ","

刪除表:

drop table t_order;

刪除表的效果是:

hive會從元數據庫中清除關於這個表的信息;

hive還會從hdfs中刪除這個表的表目錄;

內部表與外部表

內部表(MANAGED_TABLE):表目錄按照hive的規范來部署,位於hive的倉庫目錄/user/hive/warehouse中

外部表(EXTERNAL_TABLE):表目錄由建表用戶自己指定

create external table t_access(ip string,url string,access_time string)

row format delimited

fields terminated by ','

location '/access/log';

外部表和內部表的特性差別:

1、內部表的目錄在hive的倉庫目錄中 VS 外部表的目錄由用戶指定

2、drop一個內部表時:hive會清除相關元數據,並刪除表數據目錄

3、drop一個外部表時:hive只會清除相關元數據;

一個hive的數據倉庫,最底層的表,一定是來自於外部系統,為了不影響外部系統的工作邏輯,在hive中可建external表來映射這些外部系統產生的數據目錄;

然后,后續的etl操作,產生的各種表建議用managed_table

分區表

分區表的實質是:在表目錄中為數據文件創建分區子目錄,以便於在查詢時,MR程序可以針對分區子目錄中的數據進行處理,縮減讀取數據的范圍。

比如,網站每天產生的瀏覽記錄,瀏覽記錄應該建一個表來存放,但是,有時候,我們可能只需要對某一天的瀏覽記錄進行分析

這時,就可以將這個表建為分區表,每天的數據導入其中的一個分區;

當然,每日的分區目錄,應該有一個目錄名(分區字段)

示例:

create table t_access(ip string,url string,access_time string)
partitioned by(dt string)
row format delimited
fields terminated by ',';

數據導入導出

方式1:導入數據的一種方式:

手動用hdfs命令,將文件放入表目錄;

方式2:在hive的交互式shell中用hive命令來導入本地數據到表目錄

hive>load data local inpath '/root/order.data.2' into table t_order;

方式3:用hive命令導入hdfs中的數據文件到表目錄

hive>load data inpath '/access.log' into table t_access partition(dt='20210806');

文件格式

HIVE支持很多種文件格式: SEQUENCE FILE | TEXT FILE | PARQUET FILE | RC FILE

create table t_pq(movie string,rate int) stored as textfile;

create table t_pq(movie string,rate int) stored as sequencefile;

create table t_pq(movie string,rate int) stored as parquetfile;

七、任務調度——azkaban

azkaban是一個工作流調度系統。與之類似的還有oozie,airflow等等。

一個完整的數據分析系統通常都是由大量任務單元組成:

shell腳本程序,java程序,mapreduce程序、hive腳本等;

各任務單元之間存在時間先后及前后依賴關系;

為了很好地組織起這樣的復雜執行計划,需要一個工作流調度系統來調度執行。

在實際工作中,絕不是一個程序就能搞定一切的。需要分為多個程序運行,還有前后順序,所以任務調度系統一直存在。也在不斷的發展。

簡單的任務調度:直接使用linux的crontab來定義;

復雜的任務調度:開發調度平台

或使用現成的開源調度系統,比如ooize、azkaban等。

Azkaban介紹

Azkaban是由Linkedin開源的一個批量工作流任務調度器。用於在一個工作流內以一個特定的順序運行一組工作和流程。Azkaban定義了一種KV文件格式來建立任務之間的依賴關系,並提供一個易於使用的web用戶界面維護和跟蹤你的工作流。

地址:https://github.com/azkaban/azkaban

Azkaban使用

Azkaba內置的任務類型支持command、java

1、創建job描述文件

vi command.job

command.jobtype=command command=echo 'hello'

2、將job資源文件打包成zip文件

zip command.job

3、通過azkaban的web管理平台創建project並上傳job壓縮包

首先創建project

上傳zip包

4、啟動執行該job

Command類型多job工作流flow

1、創建有依賴關系的多個job描述

第一個job:foo.job

foo.jobtype=commandcommand=echo foo

第二個job:bar.job依賴foo.job

bar.jobtype=commanddependencies=foocommand=echo bar

2、將所有job資源文件打到一個zip包中

3、在azkaban的web管理界面創建工程並上傳zip包

4、啟動工作流flow

HDFS操作任務

1、創建job描述文件

fs.jobtype=commandcommand=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz

2、將job資源文件打包成zip文件

3、通過azkaban的web管理平台創建project並上傳job壓縮包

4、啟動執行該job

八、數據傳輸Sqoop

Sqoop是一個用於在Hadoop\和關系型數據庫之間流轉數據的一個工具。可以使用Sqoop將數據從關系型數據庫系統(RDBMS)比如MySQL或者Oracle導入到hadoop分布式文件系統(HDFS)上,然后數據在Hadoop MapReduce上轉換,以及將數據導出到RDBMS中。
  Sqoop自動實現了上面提到的很多過程,Sqoop使用MapReduce來導入和導出數據,這樣既可以提供並行化操作又可以提高容錯能力。

Sqoop是Apache軟件基金會的一個開源項目。可以訪問http://Sqoop.apache.org獲取,sqoop目前已經趨於穩定,從apache退休了。

在每天定時定時調度把mysql數據傳到大數據集群中,或者把hive中數據傳走時會用到。不過隨時數據實時化的要求變高,sqoop的作用小了很多。但是一些歷史數據的導入還是需要的。

Sqoop使用

Sqoop提供了一系列的操作工具,使用Sqoop需要指定你想要使用的具體工具,以及提供對應的一些參數,使用方式如下。

sqoop tool-name [tool-arguments]

可以使用sqoop help命令查看幫助信息

sqoop help
Available commands:
  codegen            生成Java代碼
  create-hive-table  根據表結構生成hive表
  eval               執行SQL語句並返回結果
  export             導出HDFS文件到數據庫表
  help               幫助
  import             從數據庫導入數據到HDFS
  import-all-tables  導入數據庫所有表到HDFS
  list-databases     列舉所有的database
  list-tables        列舉數據庫中的所有表
  version            查看版本信息

可以看到,sqoop提供的操作工具有10個。具體工具的使用幫助可以sqoop help (tool-name)或者sqoop tool-name --help進行查看。

sqoop-import

import工具可以用於從RDBMS中導入一張表到HDFS。表中的每一條記錄對應生成HDFS文件中的每一行。這些記錄可以以text files或者Avro或者SequenceFiles格式進行存儲。

使用方法如下

$ sqoop-import (generic-args) (import-args)

參數列表-import基本參數

參數 描述
–connect < jdbc-uri > JDBC連接串
–connection-manager < class-name > 連接管理類
–driver < class-name > 手動指定JDBC驅動類
–hadoop-mapred-home < dir > 可以覆蓋$HADOOP_MAPRED_HOME
–help 使用幫助
–password-file 指定包含密碼的文件
-P 執行import時會暫停,等待用戶手動輸入密碼
–password < password > 直接將密碼寫在命令行中
–username < username > 指定用戶名
–verbose 顯示Sqoop任務更多執行信息
–connection-param-file < filename > 可選的參數,用於提供連接參數
–relaxed-isolation 設置每個mapmer的連接事務隔離

Hive參數

以下是導入到 Hive 中時可選的參數:

--hive-home <dir>:覆蓋 $HIVE_HOME。
--hive-import:將表導入Hive(如果沒有設置,則使用Hive的默認分隔符。)
--hive-overwrite:覆蓋Hive表中的現有數據。
--create-hive-table:如果設置,那么如果存在目標hivetable,作業將失敗。默認情況下,此屬性為false。
--hive-table <table-name>:設置導入到Hive時要使用的表名。
--hive-drop-import-delims:導入到Hive時,從字符串字段中刪除\n、\r和\01。
--hive-delims-replacement:在導入到Hive時,將字符串字段中的\n、\r和\01替換為用戶定義的字符串。
--hive-partition-key:分配到分區的Hive字段的名稱。
--hive-partition-value <v>:作為該任務導入到Hive中的分區鍵的字符串值。

示例:

bin/sqoop import \
--connect jdbc:mysql://hostname:3306/mydb \
--username root \
--password root \
--table mytable \
--num-mappers 1 \
--hive-import \
--hive-database mydb \
--hive-table mytable \
--fields-terminated-by "\t" \
--delete-target-dir \
--hive-overwrite 

sqoop-export

Sqoop的export工具可以從HDFS同步一系列文件數據到RDBMS中。使用這個工具的前提是導出目標表在數據庫必須存在。導出文件根據用戶指定的分隔符轉化成一系列的輸出記錄。
  默認的導出操作會將這些記錄轉化成一系列的INSERT語句,根據這些語句將記錄插入到關系型數據庫中。而在update模式下,Sqoop會生成一系列的UPDATE語句,將數據庫中已經存在的記錄進行更新。在call模式下,Sqoop會為每一條記錄調用一個存儲過程來處理。

$ sqoop-export (generic-args) (export-args)

基本參數

*參數* *描述*
–connect < jdbc-uri > JDBC連接串
–connection-manager < class-name > 連接管理類
–driver < class-name > 手動指定JDBC驅動類
–hadoop-mapred-home < dir > 可以覆蓋$HADOOP_MAPRED_HOME
–help 使用幫助
–password-file 指定包含密碼的文件
-P 執行import時會暫停,等待用戶手動輸入密碼
–password < password > 直接將密碼寫在命令行中
–username < username > 指定用戶名

示例:

$ bin/sqoop export \
--connect jdbc:mysql://hostname:3306/mydb \
--username root \
--password root \
--table mytable \
--num-mappers 1 \
--export-dir /user/hive/warehouse/mydb.db/mytable \
--input-fields-terminated-by "\t"

九、數據收集-Flume

  • Flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統
  • 支持在日志系統中定制各類數據發送方,用於收集數據
  • Flume提供對數據進行簡單處理,並寫到各種數據接收方

Flume是成熟的開源日志采集系統,且本身就是hadoop生態體系中的一員,與hadoop體系中的各種框架組件具有天生的親和力,可擴展性強。

相對於用Shell腳本和Java的收集方式,規避了對日志采集過程中的容錯處理不便控制,減少了開發工作量。

例如對於實時的日志分析這種場景中,對數據采集部分的可靠性、容錯能力要求通常不會非常嚴苛,因此使用通用的flume日志采集框架完全可以滿足需求。

Flume的配置

安裝好flume以后需要對其進行配置。

flume通過事件(agent)進行運作,事件下包含如下的概念。

Source: 用來定義采集系統的源頭

Channel: 把Source采集到的日志進行傳輸,處理

Sink:定義數據的目的地

下面是一個示例。

有一個概念就是,我們定義了agent1這個agent。

定義了agent1.sources的系列設置去執行tail -F實時的采集日志數據。

通過Channel傳輸,最后指定Sink將日志存入hdfs。

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure tail -F source1
#使用exec作為數據源source組件
agent1.sources.source1.type = exec 
#使用tail -F命令實時收集新產生的日志數據
agent1.sources.source1.command = tail -F /var/logs/nginx/access_log
agent1.sources.source1.channels = channel1

#configure host for source
#配置一個攔截器插件
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
#使用攔截器插件獲取agent所在服務器的主機名
agent1.sources.source1.interceptors.i1.hostHeader = hostname

#配置sink組件為hdfs
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
#agent1.sinks.sink1.hdfs.path=hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H%M%S
#指定文件sink到hdfs上的路徑
agent1.sinks.sink1.hdfs.path=
hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M_%hostname
#指定文件名前綴
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 
#指定每批下沉數據的記錄條數
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#指定下沉文件按1G大小滾動
agent1.sinks.sink1.hdfs.rollSize = 1024*1024*1024
#指定下沉文件按1000000條數滾動
agent1.sinks.sink1.hdfs.rollCount = 1000000
#指定下沉文件按30分鍾滾動
agent1.sinks.sink1.hdfs.rollInterval = 30
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
#使用memory類型channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

隨后將flume用指定的配置文件啟動即可。

bin/flume-ng agent --conf ./conf -f ./conf/weblog.properties.2 -n agent

注意:啟動命令中的 -n 參數要給配置文件中配置的agent名稱

目前市面針對日志采集的有 Flume,Logstash,Filebeat,Fluentd ,rsyslog 很多種。但基本的原理是相同的,要根據公司的情況進行選擇。

本文從大數據理論到常用的基礎組件進行的筆記的整理,更深入的hadoop理論知識建議通過書籍進行深入的閱讀學習。而Spark,Flink等組件的學習將會通過單獨的文章進行筆記整理。希望對大家有所幫助,更多大數據相關知識,請關注 大數據流動~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM