Hadoop基礎及演練
---第1章 初識大數據
- 大數據是一個概念也是一門技術,是在以Hadoop為代表的大數據平台框架上進行各種數據分析的技術.
---第2章 Hadoop核心HDFS
- Hadoop是一個開源的大數據框架,是一個分布式計算的解決方案,Hadoop=HDFS(分布式文件系統)+MapReduce(分布式計算)
- 存儲是大數據技術的基礎,分布式計算是大數據應用的解決方案
- HDFS基礎架構:
- 數據塊:是抽象塊,一般設置為128MB,備份3個.
- NameNode:主數據塊,管理文件系統的命名空間,存放文件元數據,維護文件系統的所有文件和目錄,文件與數據塊的映射,記錄每個文件各個塊所在數據節點的信息
- DataNode:從數據塊,存儲並檢索數據塊,向NameNode更新所存儲塊的列表
- HDFS優點:
- 適合大文件存儲,並有副本策略
- 可以構建在廉價的機器上,並有一定的容錯和恢復機制
- 支持流式數據訪問,一次寫入,多次讀取最高效
- HDFS缺點:
- 不適合大量小文件存儲
- 不適合並發寫入,不支持文件隨機修改
- 不支持隨機讀等低延時的訪問方式
- 數據塊的大小多少合適:64MB或128MB,太小會增加硬盤尋道時間,太大會影響MapReduce
- NameNode如果掛了怎么辦:設置備用節點,失效后自動激活
- HDFS寫流程:
- 客戶端向NameNode發起寫數據請求
- 分塊寫入DataNode節點,DataNode自動完成副本備份
- DataNode向NameNode匯報存儲完成,NameNode通知客戶端

- HDFS讀流程:
- 客戶端向NameNode發起讀數據請求
- NameNode找出距離最近的DataNode節點信息
- 客戶端從DataNode分塊下載文件

- 常用HDFS Shell命令:
- 類Linux系統:ls,cat,mkdir,rm,chmod,chown等
- HDFS文件交互:copyFromLocal,copyToLocal,get,put
- 上傳文件:hdfs dfs -copyFromLocal 目標文件 目標路徑
- 下載文件:hdfs dfs -copyToLocal 源文件 目標文件
- python的hdfs3庫可以用python進行文件的寫入和讀取
from hdfs3 import HDFileSystem test_host = 'localhost' test_port = 9000 def hdfs_exists(hdfs_client): path = '/tmp/test' if hdfs_client.exists(path): hdfs_client.rm(path) hdfs_client.makedirs(path) def hdfs_write_read(hdfs_client): data = b'hello hadoop' * 20 file_a = 'tmp/test/file_a' with hdfs_client.open(file_a, 'wb', replication=1) as f: f.write(data) with hdfs_client.open(file_a, 'rb') as f: out = f.read(len(data)) assert out == data def hdfs_readlines(hdfs_client): file_b = '/tmp/test/file_b' with hdfs_client.open(file_b, 'wb', replication=1) as f: f.write(b'hello\nhadoop') with hdfs_client.open(file_b, 'rb') as f: lines = f.readlines() assert len(lines) == 2 if __name__ == '__main__': #創建客戶端 hdfs_client = HDFileSystem(host=test_host, port=test_port) hdfs_exists(hdfs_client) hdfs_write_read(hdfs_client) hdfs_readlines(hdfs_client) hdfs_client.disconnect() print('-' * 20) print('hello hadoop')
---第3章 Hadoop核心MapReduce
- YARN:資源管理器,負責集群資源的管理和調度.
- ResourceManager:分配和調度資源,啟動並監控ApplicationMaster,監控NodeManager
- ApplicationMaster:為MR類型的程序申請資源並分配給內部任務,負責數據切分,監控任務的執行及容錯
- NodeManager:管理單個節點的資源,處理來自ResourceManager和ApplicationMaster的命令
- MapReduce(MR)是一種編程模型和方法:
- split:輸入一個大文件,將其分片
- map:每個分片由單獨的機器處理
- reduce:將各個機器計算的結果進行匯總並得到最終結果
import sys def read_input(file): for line in file: yield line.split() def main(): data = read_input(sys.stdin) for words in data: for word in words: print("%s%s%d"%(word, '\t', 1)) if __name__ == '__main__': main()
import sys from operator import itemgetter from itertools import groupby def read_mapper_output(file, separator='\t'): for line in file: yield line.rstrip().split(separator, 1) def main(): data = read_mapper_output(sys.stdin) for current_word, group in groupby(data, itemgetter(0)): total_count = sum(int(count) for current_word, count in group) print("%s%s%d"%(current_word, '\t', total_count)) if __name__ == '__main__': main()
---第4章 Hadoop生態圈介紹
- 如何通過Hadoop存儲小文件:可以利用HDFS將小文件合並為大文件,或者利用某種方式對文件進行分組(Hadoop Archives,SequenceFile,HBase)
- 當有節點故障時,集群如何繼續提供服務:NameNode會讓其他DataNode從現有副本復制這些數據塊.如果又恢復正常,則隨機選擇要刪除的副本
- 哪些是影響MapReduce性能的因素:算法,硬件,底層存儲系統,輸入數據的大小
- HBase是一個分布式數據庫.利用HDFS作為文件存儲系統,支持MR程序讀取數據.可以存儲非結構化和半結構化數據.核心概念是RowKey(數據唯一標識,按字典排序),Column Family(列族,多個列的集合,不能超過3個),TimeStamp(時間戳,支持多版本數據同時存在).
- Spark是一個基於內存計算的大數據並行計算框架.是MapReduce的替代方案,兼容HDFS,HIVE等數據源.優勢是抽象出分布式內存存儲數據結構(彈性分布式數據集RDD),基於事件驅動,通過線程池復用線程提高性能.
Hadoop進階
---第1章 概述
- 大數據可以用於精准營銷,用戶畫像,商品推薦等功能的實現.
- 一個NameNode對應多個DataNode.所有DataNode定期向NameNode發送心跳,如果NameNode沒有收到就認為該DataNode已經掛掉,會將數據發送到其他節點.還有一個Secondary NameNode用於備份NameNode.
- Hadoop2.0移除了JobTracker和TaskTracker,該由Yarn負責集群中所有資源的管理和分配,NodeManager管理單個計算節點.
---第2章 深入探索MapReduce過程
- 以WordCount為例的MapReduce:
- Split:將多個文本分為不同的分片
- Map:以<key,value>形式輸入,其中key是行號,value是文本,利用map將其拆分,key為單詞,value為1(代表該單詞出現一次).
- Shuffle:以map的輸出作為輸入,將相同的單詞歸到一起,但不進行次數累加.
- Reduce:以shuffle的輸出作為輸入,對單詞次數進行累加求和,輸出key為單詞,value為次數.
- 輸入文件保存在DataNode的block(數據塊)中,每一個文件都會增加分片數量,並映射在NameNode中,而NameNode內存有限,所以HDFS適合存放大文件.
- 節點Map任務的個數可以通過增大mapred.map.tasks來增加,也可以通過增大mapred.min.split.size或合並小文件來減少.
- Combine是指本地優化,在本地先按照key進行一輪排序和合並,在進行網絡混洗.在多數情況下Combine的邏輯和Reduce的邏輯是一致的,可以認為Combine是對本地數據的Reduce.
- map先將文件放入內存緩沖區,然后將其中的小文件合並為大文件,進行網絡傳輸.key值相同的文件會進行partition(合並),再進行reduce.
- 一個MapReduce中,以下三者的數量總是相等的:partitioner的數量,redue任務的數量,最終輸出文件.
- 在數據量大的情況下,應該將reduce任務數設為較大值.可以通過調節參數mapred.reduce.tasks和job.setNumReduceTasks(int n)方法進行設置
---第3章 Hadoop的分布式緩存
- 在執行MapReduce時,Mapper之間需要共享信息,如果信息量不大,可以將其從HDFS加載到內存中,這就是Hadoop分布式緩存機制(DistributedCache).如果共享數據太大,可以將共享數據分批緩存,重復執行作業.
- MapReduce進行矩陣相乘:
- 將右側矩陣轉置
- 將右矩陣載入分布式緩存
- 將左矩陣的行作為Map輸入
- 在Map執行之前將緩存的右矩陣以行為單位放入List
- 在Map計算時從List中取出所有行分別與輸入行相乘
---第4章 推薦算法
- 有余弦相似度,切比雪夫距離,歐氏距離,皮爾森系數等描述向量相似程度的度量方法
- 基於物品的推薦算法(ItemCF):
- 用戶行為與權重(點擊,搜索,收藏,付款)
- 算法思想是給用戶推薦那些和他們之前喜歡的物品相似的物品
- 根據用戶行為列表計算用戶,物品的評分矩陣
- 根據評分矩陣計算物品的相似度矩陣
- 相似度矩陣*評分矩陣=推薦列表
- 在推薦列表中將用戶產生過行為的物品置0
- 基於用戶的推薦算法(UserCF):
- 根據用戶行為列表計算物品,用戶的評分矩陣.
- 根據評分矩陣計算用戶的相似度矩陣.
- 相似度矩陣*評分矩陣=推薦列表
- 在推薦列表中將用戶產生過行為的物品置0
- 基於內容的推薦算法:
- 給用戶推薦和他們之前喜歡的物品在內容上相似的其他物品
- 物品特征建模
- 構建Item Profile矩陣(0,1矩陣)
- 構建Item User評分矩陣
- 將兩個矩陣相乘得到User Profile矩陣.表示用戶對特征的興趣權重.
- 對Item Profile和User Profile求余弦相似度
- 在推薦列表中將用戶產生過行為的物品置0
Spark從零開始
---第1章 Spark介紹
- Spark是一個快速(擴充了流行的MapReduce計算模型,基於內存計算)且通用(容納了其他分布式系統擁有的功能)的集群計算平台.
- Spark的組件:
- Spark Core:包含Spark的基本功能,比如任務調度,內存管理,容錯機制.定義了RDD(彈性分布式數據集),提供了API來創建和操作RDD.
- Spark SQL:處理結構化數據的庫.
- Spark Streaming:實時數據流組件.
- Mlib:包含通用機器學習功能的包.
- Graphx:處理圖的庫,並進行圖的並行計算.
- Cluster Managers:集群管理.
- 相比Hadoop,Spark可以用於時效性要求高的場景和機器學習等領域.
---第2章 Spark的下載和安裝
- Spark是Scala寫的,運行在JVM上.
- Spark的shell能夠處理分布在集群上的數據,把數據加載到節點的內存中.分為Python shells和Scala shells.
---第3章 開發第一個Spark程序
- WordCount:
- 創建一個Spark Context
- 加載數據
- 把每一行分割成單詞
- 轉換成pairs並且計數
- 打包程序-啟動集群-提交任務-執行任務
---第4章 RDDs
-
Driver program包含程序main方法,RDDs的定義和操作.它管理很多executors(節點).
-
通過SparkContext訪問Spark,它代表和一個集群的連接.在Shell中是自動創建好的.
- RDDs是彈性分布式數據集的簡寫.它們並行分布在整個集群中.不管整個數據集被切分成幾塊,都可以用它來訪問整個數據集.一個RDD是一個不可改變的分布式集合對象.所有計算都是通過RDDs的創建,轉換,操作完成的.一個RDD內部由很多partitions(分片)構成,每個分片包括一部分數據,是Spark並行處理的單元.RDD的創建可以用parelleilzie()或加載外部數據集.
- RDD基本操作:
- Transformations指的是從之前的RDD構建一個新RDD的過程.如map(接收函數,把函數應用到RDD,返回新RDD),filter(接收函數,返回只包含滿足filter函數的元素的RDD),flatMap(對每個輸入元素,輸出多個輸出元素).
- RDD支持數學集合計算,如並集,交集.
- Action是在RDD上計算出一個結果,把結果返回給driver program或保存在文件系統.如reduce(接收一個函數,作用在RDD兩個類型相同的元素上,返回新元素),collect(遍歷整個RDD,返回RDD的內容,注意內容需要單機內存能夠容納下),take(返回RDD的n個元素,返回結果無序),top(返回排序后的topN值),foreach(計算RDD中的每個元素,但不返回到本地).
- Spark維護着RDDs之間的依賴關系和創建關系,叫做血統關系圖,可以用於計算RDD的需求和恢復丟失數據.
- 延遲計算:Spark對RDD的計算是在第一次使用action操作的時候.這可以減少數據的傳輸.
- 使用map()函數可以創建KeyValue對RDDs.KeyValue對RDDs的Transformations操作有reduceByKey(把相同key的結合),groupByKey(把相同的key的values分組),mapValues(對value進行map操作).
- combineByKey是最常用的基於key的聚合函數,接收四個參數.返回類型可以與輸入類型不一樣.它遍歷元素的key,如果是新元素就使用createCombiner函數,如果已存在就使用mergeValue函數.合計每個分片結果的時候使用mergeCombiners函數.
