Hadoop
Hadoop是一款開源的大數據通用處理平台,其提供了3個組件,分別是HDFS分布式文件系統、YARN分布式資源調度、MapReduce分布式離線計算。
MapReduce適合大規模的數據同時對實時性要求不高的場景,不適合大量的小文件以及頻繁修改的文件。
Hadoop的特點
1.水平擴展:Hadoop集群可以達到上千個節點,同時能夠動態的新增和刪除節點,能夠存儲和處理PB級的數據量。
2.低成本:不需要依賴機器的性能,只需要普通的PC機就能運行。
目前一般會使用HDFS作為文件存儲,使用YARN對資源進行管理。
1.HDFS
HDFS是分布式文件系統,可以存儲海量的文件。
HDFS由NameNode、DataNode、SecondaryNameNode節點組成。
1.1 關於Block數據塊
Block是HDFS中最小的存儲單元,每個Block的大小默認為128M。
一個大文件會被拆分成多個Block進行存儲,如果一個文件的大小小於Block的大小,那么Block實際占用的大小為文件本身的大小。
每個Block都會在不同的DataNode節點中存在備份。
1.2 DataNode節點
DataNode節點用於保存Block,同時負責數據的讀寫和復制操作。
DataNode節點啟動時會向NameNode節點匯報當前存儲的Block信息。
1.3 NameNode節點
NameNode節點用於存儲文件的元信息、文件與Block和DataNode的關系。
NameNode運行時的所有數據都保存在內存當中,因此整個HDFS可存儲的文件數量受限於NameNode節點的內存大小。
NameNode節點中的數據會定時保存到磁盤文件當中(只有文件的元信息),但不保存文件與Block和DataNode的關系,這部分數據由DataNode啟動時上報和運行時維護。
DataNode節點會定期向NameNode節點發送心跳請求,一旦NameNode節點在一定的時間內沒有收到DataNode節點發送的心跳則認為其已經宕機,不會再給該DataNode節點分配任何的IO請求。
每個Block在NameNode中都對應一條記錄,如果是大量的小文件將會消耗大量內存,因此HDFS適合存儲大文件。
1.4 SecondaryNameNode
SecondaryNameNode節點會定時與NameNode節點進行同步(HA)
往HDFS寫入文件的流程
1.HDFS Client向NameNode節點申請寫入文件。
2.NameNode節點根據文件的大小,返回文件要寫入的BlockId以及DataNode節點列表,同時存儲文件的元信息以及文件與Block和DataNode節點之間的關系。
3.HDFS Client接收到NameNode節點的返回之后,會將數據依次寫入到指定的DataNode節點當中,每個DataNode節點接收到數據之后會把數據寫入到磁盤文件,然后將數據同步給其他的DataNode節點進行備份(備份數-1個DataNode節點)
4.在進行備份的過程中,每一個DataNode節點接收到數據后都會向前一個DataNode節點進行響應,最終第一個DataNode節點返回HDFS Client成功。
5.當HDFS Client接收到DataNode節點的響應后,會向NameNode節點發送最終確認請求,此時NameNode節點才會提交文件。
在進行備份的過程中,如果某個DataNode節點寫入失敗,NameNode節點會重新尋找DataNode節點繼續復制,以保證數據的可靠性。
只有當向NameNode節點發送最終確認請求后文件才可見,如果在發送最終確認請求前NameNode就已經宕機,那么文件將會丟失。
從HDFS讀取文件的流程
1.HDFS Client向NameNode節點申請讀取文件。
2.NameNode節點返回文件所有對應的BlockId以及這些BlockId所在的DataNode節點列表(包括備份節點)
3.HDFS Client會優先從本地的DataNode中進行讀取Block,否則通過網絡從備份節點中進行讀取。
機架感知
分布式集群中通常會包含非常多的機器,由於受到機架槽位和交換機網口的限制,通常大型的分布式集群都會跨好幾個機架,由多個機架上的機器共同組成一個分布式集群。
機架內的機器之間的網絡速度通常高於跨機架機器之間的網絡速度,並且機架之間機器的網絡通信通常會受到上層交換機網絡帶寬的限制。
Hadoop默認沒有開啟機架感知功能,默認情況下每個Block都是隨機分配DataNode節點,當Hadoop開啟機架感知功能后,那么當NameNode節點啟動時,會將機器與機架之間的關系保存在內存中,當HDFS Client申請寫入文件時,能夠根據預先定義的機架關系合理的分配DataNode。
機架感知默認對Block的3個備份的存放策略
第1個Block備份存放在與HDFS Client同一個節點的DataNode節點中(若HDFS Client不在集群范圍內則隨機選取)
第2個Block備份存放在與第一個節點不同機架下的節點中。
第3個Block備份存放在與第2個備份所在節點的機架下的另一個節點中,如果還有更多的副本則隨機存放在集群的節點中。
使用此策略可以保證對文件的訪問能夠優先在本機架下找到,並且如果整個機架上發生了異常也可以在另外的機架上找到該Block的備份。
2 YARN
YARN是分布式資源調度框架,由ResourceManger、NodeManager以及ApplicationMaster組成。
2.1 ResourceManager
ResourceManager是集群的資源管理者,負責集群中資源的分配以及調度,同時管理各個NodeManager,同時負責處理客戶端的任務請求。
2.2 NodeManager
NodeManager是節點的管理者,負責處理來自ResourceManager和ApplicationMaster的請求。
2.3 ApplicationMaster
ApplicationMaster用於計算任務所需要的資源。
2.4 任務運行在YARN的流程
1.客戶端向ResourceManager提交任務請求。
2.ResourceManager生成一個ApplicationManager進程,用於任務的管理。
3.ApplicationManager創建一個Container容器用於存放任務所需要的資源。
4.ApplicationManager尋找其中一個NodeManager,在此NodeManager中啟動一個ApplicationMaster,用於任務的管理以及監控。
5.ApplicationMaster向ResourceManager進行注冊,並計算任務所需的資源匯報給ResourceManager(CPU與內存)
6.ResourceManager為此任務分配資源,資源封裝在Container容器中。
7.ApplicationMaster通知集群中相關的NodeManager進行任務的執行。
8.各個NodeManager從Container容器中獲取資源並執行Map、Reduce任務。
3 MapReduce
MapReduce是分布式離線計算框架,其原理是將數據拆分成多份,然后通過多個節點並行處理。
MapReduce執行流程
MapReduce分為Map任務以及Reduce任務兩部分。
3.1 Map任務
1.讀取文件中的內容,解析成Key Value的形式 (Key為偏移量,Value為每行的數據)
2.重寫map方法,生成新的Key和Value。
3.對輸出的Key和Value進行分區。
4.將數據按照Key進行分組,key相同的value放到一個集合中(數據匯總)
處理的文件必須要在HDFS中。
3.2 Reduce任務
1.對多個Map任務的輸出,按照不同的分區,通過網絡復制到不同的reduce節點。
2.對多個Map任務的輸出進行合並、排序。
3.將reduce的輸出保存到文件,存放在HDFS中。
4.搭建Hadoop
4.1 安裝
1.由於Hadoop使用Java語言進行編寫,因此需要安裝JDK。
2.從CDH中下載Hadoop 2.X並進行解壓,CDH是Cloudrea公司對各種開源框架的整合與優化(較穩定)
4.2 修改配置
1.修改環境配置
編輯etc/hadoop/hadoop-env.sh文件,修改JAVA_HOME配置(此文件是Hadoop啟動時加載的環境變量)
編輯/etc/hosts文件,添加主機名與IP的映射關系。
2.配置Hadoop公共屬性(core-site.xml)
<configuration> <!-- Hadoop工作目錄,用於存放Hadoop運行時產生的臨時數據 --> <property> <name>hadoop.tmp.dir</name> <value>/usr/hadoop/hadoop-2.9.0/data</value> </property> <!-- NameNode的通信地址,1.x默認9000,2.x可以使用8020 --> <property> <name>fs.default.name</name> <value>hdfs://192.168.1.80:8020</value> </property> </configuration>
3.配置HDFS(hdfs-site.xml)
<configuration> <!--指定block的備份數量(將block復制到集群中備份數-1個DataNode節點中)--> <property> <name>dfs.replication</name> <value>1</value> </property> <!-- 關閉HDFS的訪問權限 --> <property> <name>dfs.permissions.enabled</name> <value>false</value> </property> </configuration>
4.配置YARN(yarn-site.xml)
<configuration> <!-- 配置Reduce取數據的方式是shuffle(隨機) --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
5.配置MapReduce(mapred-site.xml)
<configuration> <!-- 讓MapReduce任務使用YARN進行調度 --> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
6.配置SSH
由於在啟動HDFS和YARN時都需要對用戶的身份進行驗證,因此可以配置SSH設置免密碼登錄。
//生成秘鑰
ssh-keygen -t rsa
//復制秘鑰到本機
ssh-copy-id 192.168.1.80
4.3 啟動HDFS
1.格式化NameNode
bin/hdfs namenode -format
2.啟動HDFS
sbin/start-dfs.sh
啟動HDFS后將會啟動NameNode、DataNode、SecondaryNameNode三個進程。
啟動時若出現錯誤可以進入logs目錄查看相應的日志文件。
3.訪問HDFS的可視化管理界面
當HDFS啟動完畢后,可以訪問http://localhost:50070進入HDFS的可視化管理界面,在此頁面中可以對整個HDFS集群進行監控以及文件的上傳和下載。
當下載文件時會進行請求的重定向,重定向的地址的host為NameNode的主機名,因此客戶端本地的host文件中需要配置NameNode主機名與IP的映射關系。
4.4 啟動YARN
sbin/start-yarn.sh
啟動YARN后,將會啟動ResourceManager以及NodeManager進程。
可以訪問http://localhost:8088進入YARN的可視化管理界面,可以在此頁面中查看任務的執行情況以及資源的分配。
4.5 使用Shell操作HDFS
HDFS與Linux類似,有/根目錄。
#顯示文件中的內容 bin/hadoop fs -cat <src> 將本地中的文件上傳到HDFS bin/hadoop fs -copyFromLocal <localsrc> <dst> #將本地中的文件上傳到HDFS bin/hadoop fs -put <localsrc> <dst> #將HDFS中的文件下載到本地 bin/hadoop fs -copyToLocal <src> <localdst> #將HDFS中的文件下載到本地 bin/hadoop fs -get <src> <localdst> #將本地中的文件剪切到HDFS中 bin/hadoop fs -moveFromLocal <localsrc> <dst> #將HDFS中的文件剪切到本地中 bin/hadoop fs -moveToLocal <src> <localdst> #在HDFS內對文件進行移動 bin/hadoop fs -mv <src> <dst> #在HDFS內對文件進行復制 bin/hadoop fs -cp <src> <dst> #刪除HDFS中的文件 bin/hadoop fs -rm <src> #創建目錄 bin/hadoop fs -mkdir <path> #查詢指定路徑下文件的個數 bin/hadoop fs -count <path> #顯示指定目錄下的內容 bin/hadoop fs -ls <path>
4.6 使用JAVA操作HDFS
/** * @Auther: ZHUANGHAOTANG * @Date: 2018/11/6 11:49 * @Description: */ public class HDFSUtils { private static Logger logger = LoggerFactory.getLogger(HDFSUtils.class); /** * NameNode URL */ private static final String NAMENODE_URL = "192.168.1.80:8020"; /** * HDFS文件系統連接對象 */ private static FileSystem fs = null; static { Configuration conf = new Configuration(); try { fs = FileSystem.get(URI.create(NAMENODE_URL), conf); } catch (IOException e) { logger.info("初始化HDFS連接失敗:{}", e); } } /** * 創建目錄 */ public static void mkdir(String dir) throws Exception { dir = NAMENODE_URL + dir; if (!fs.exists(new Path(dir))) { fs.mkdirs(new Path(dir)); } } /** * 刪除目錄或文件 */ public static void delete(String dir) throws Exception { dir = NAMENODE_URL + dir; fs.delete(new Path(dir), true); } /** * 遍歷指定路徑下的目錄和文件 */ public static List<String> listAll(String dir) throws Exception { List<String> names = new ArrayList<>(); dir = NAMENODE_URL + dir; FileStatus[] files = fs.listStatus(new Path(dir)); for (FileStatus file : files) { if (file.isFile()) { //文件 names.add(file.getPath().toString()); } else if (file.isDirectory()) { //目錄 names.add(file.getPath().toString()); } else if (file.isSymlink()) { //軟或硬鏈接 names.add(file.getPath().toString()); } } return names; } /** * 上傳當前服務器的文件到HDFS中 */ public static void uploadLocalFileToHDFS(String localFile, String hdfsFile) throws Exception { hdfsFile = NAMENODE_URL + hdfsFile; Path src = new Path(localFile); Path dst = new Path(hdfsFile); fs.copyFromLocalFile(src, dst); } /** * 通過流上傳文件 */ public static void uploadFile(String hdfsPath, InputStream inputStream) throws Exception { hdfsPath = NAMENODE_URL + hdfsPath; FSDataOutputStream os = fs.create(new Path(hdfsPath)); BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream); byte[] data = new byte[1024]; int len; while ((len = bufferedInputStream.read(data)) != -1) { if (len == data.length) { os.write(data); } else { //最后一次讀取 byte[] lastData = new byte[len]; System.arraycopy(data, 0, lastData, 0, len); os.write(lastData); } } inputStream.close(); bufferedInputStream.close(); os.close(); } /** * 從HDFS中下載文件 */ public static byte[] readFile(String hdfsFile) throws Exception { hdfsFile = NAMENODE_URL + hdfsFile; Path path = new Path(hdfsFile); if (fs.exists(path)) { FSDataInputStream is = fs.open(path); FileStatus stat = fs.getFileStatus(path); byte[] data = new byte[(int) stat.getLen()]; is.readFully(0, data); is.close(); return data; } else { throw new Exception("File Not Found In HDFS"); } } }
4.7 執行一個MapReduce任務
Hadoop提供了hadoop-mapreduce-examples-2.9.0.jar,其封裝了很多任務計算的方法,用戶可以直接進行調用。
#使用hadoop jar命令來執行JAR包
hadoop jar
1.創建一個文件同時將此文件上傳到HDFS中
2.使用Hadoop提供的hadoop-mapreduce-examples-2.9.0.jar執行wordcount詞頻統計功能
bin/hadoop jar /usr/hadoop/hadoop-2.0.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.0.jar wordcount /words /result
3.在YARN的可視化管理界面中可以查看任務的執行情況
4.當任務執行完畢后可以查看任務的執行結果
任務的執行結果將會保存到HDFS的文件中。