主要內容:hdfs的整體運行機制,DATANODE存儲文件塊的觀察,hdfs集群的搭建與配置,hdfs命令行客戶端常見命令;業務系統中日志生成機制,HDFS的java客戶端api基本使用。
1、什么是大數據
基本概念
《數據處理》
在互聯網技術發展到現今階段,大量日常、工作等事務產生的數據都已經信息化,人類產生的數據量相比以前有了爆炸式的增長,以前的傳統的數據處理技術已經無法勝任,需求催生技術,一套用來處理海量數據的軟件工具應運而生,這就是大數據!
處理海量數據的核心技術:
海量數據存儲:分布式
海量數據運算:分布式
大數據的海量數據的存儲和運算,核心技術就是分布式。
這些核心技術的實現是不需要用戶從零開始造輪子的
存儲和運算,都已經有大量的成熟的框架來用
存儲框架:
HDFS——分布式文件存儲系統(HADOOP中的存儲框架)
HBASE——分布式數據庫系統
KAFKA——分布式消息緩存系統(實時流式數據處理場景中應用廣泛)
文件系統中的數據以非結構化居多,沒有直觀的結構,數據庫中的信息多以表的形式存在,具有結構化,存在規律;
查詢的時候文本文件只能一行一行掃描,而數據庫效率高很多,可以利用sql查詢語法,數據庫在存和取方便的多。
數據庫和文件系統相比,數據庫相當於在特定的文件系統上的軟件封裝。其實HBASE就是對HDFS的進一層封裝,它的底層文件系統就是HDFS。
分布式消息緩存系統,既然是分布式,那就意味着橫跨很多機器,意味着容量可以很大。和前兩者相比它的數據存儲形式是消息(不是表,也不是文件),消息可以看做有固定格式的一條數據,比如消息頭,消息體等,消息體可以是json,數據庫的一條記錄,一個序列化對象等。消息最終存放在kafaka內部的特定的文件系統里。
運算框架:(要解決的核心問題就是幫用戶將處理邏輯在很多機器上並行)
MAPREDUCE—— 離線批處理/HADOOP中的運算框架
SPARK —— 離線批處理/實時流式計算
STORM —— 實時流式計算
離線批處理:數據是靜態的,一次處理一大批數據。
實時流式:數據在源源不斷的生成,邊生成,邊計算
這些運算框架的思想都差不多,特別是mapreduce和spark,簡單來看spark是對mapreduce的進一步封裝;
運算框架和存儲框架之間沒有強耦合關系,spark可以讀HDFS,HBASE,KAFKA里的數據,當然需要存儲框架提供訪問接口。
輔助類的工具(解放大數據工程師的一些繁瑣工作):
HIVE —— 數據倉庫工具:可以接收sql,翻譯成mapreduce或者spark程序運行
FLUME——數據采集
SQOOP——數據遷移
ELASTIC SEARCH —— 分布式的搜索引擎
flume用於自動采集數據源機器上的數據到大數據集群中。
HIVE看起來像一個數據庫,但其實不是,Hive中存了一些需要分析的數據,然后在直接寫sql進行分析,hive接收sql,翻譯成mapreduce或者spark程序運行;
hive本質是mapreduce或spark,我們只需要寫sql邏輯而不是mapreduce邏輯,Hive自動完成對sql的翻譯,而且還是在海量數據集上。
.......
換個角度說,大數據是:
1、有海量的數據
2、有對海量數據進行挖掘的需求
3、有對海量數據進行挖掘的軟件工具(hadoop、spark、storm、flink、tez、impala......)
大數據在現實生活中的具體應用
數據處理的最典型應用:公司的產品運營情況分析
電商推薦系統:基於海量的瀏覽行為、購物行為數據,進行大量的算法模型的運算,得出各類推薦結論,以供電商網站頁面來為用戶進行商品推薦
精准廣告推送系統:基於海量的互聯網用戶的各類數據,統計分析,進行用戶畫像(得到用戶的各種屬性標簽),然后可以為廣告主進行有針對性的精准的廣告投放
2、什么是hadoop
hadoop中有3個核心組件:
分布式文件系統:HDFS —— 實現將文件分布式存儲在很多的服務器上
分布式運算編程框架:MAPREDUCE —— 實現在很多機器上分布式並行運算
分布式資源調度平台:YARN —— 幫用戶調度大量的mapreduce程序,並合理分配運算資源
3、hdfs整體運行機制
hdfs:分布式文件系統
hdfs有着文件系統共同的特征:
1、有目錄結構,頂層目錄是: /
2、系統中存放的就是文件
3、系統可以提供對文件的:創建、刪除、修改、查看、移動等功能
hdfs跟普通的單機文件系統有區別:
1、單機文件系統中存放的文件,是在一台機器的操作系統中
2、hdfs的文件系統會橫跨N多的機器
3、單機文件系統中存放的文件,是在一台機器的磁盤上
4、hdfs文件系統中存放的文件,是落在n多機器的本地單機文件系統中(hdfs是一個基於linux本地文件系統之上的文件系統)
hdfs的工作機制:
1、客戶把一個文件存入hdfs,其實hdfs會把這個文件切塊后,分散存儲在N台linux機器系統中(負責存儲文件塊的角色:data node)<准確來說:切塊的行為是由客戶端決定的>
2、一旦文件被切塊存儲,那么,hdfs中就必須有一個機制,來記錄用戶的每一個文件的切塊信息,及每一塊的具體存儲機器(負責記錄塊信息的角色是:name node)
3、為了保證數據的安全性,hdfs可以將每一個文件塊在集群中存放多個副本(到底存幾個副本,是由當時存入該文件的客戶端指定的)
綜述:一個hdfs系統,由一台運行了namenode的服務器,和N台運行了datanode的服務器組成!
4、搭建hdfs分布式集群
4.1 hdfs集群組成結構:
4.2 安裝hdfs集群的具體步驟:
4.2.1、首先需要准備N台linux服務器
學習階段,用虛擬機即可!
先准備4台虛擬機:1個namenode節點 + 3 個datanode 節點
4.2.2、修改各台機器的主機名和ip地址
主機名:hdp-01 對應的ip地址:192.168.33.11
主機名:hdp-02 對應的ip地址:192.168.33.12
主機名:hdp-03 對應的ip地址:192.168.33.13
主機名:hdp-04 對應的ip地址:192.168.33.14
4.2.3、從windows中用CRT軟件進行遠程連接
在windows中將各台linux機器的主機名配置到的windows的本地域名映射文件中:
c:/windows/system32/drivers/etc/hosts
192.168.33.11 hdp-01 192.168.33.12 hdp-02 192.168.33.13 hdp-03 192.168.33.14 hdp-04 |
用crt連接上后,修改一下crt的顯示配置(字號,編碼集改為UTF-8):
4.2.3、配置linux服務器的基礎軟件環境
- 防火牆
關閉防火牆:service iptables stop
關閉防火牆自啟: chkconfig iptables off
- 安裝jdk:(hadoop體系中的各軟件都是java開發的)
1) 利用alt+p 打開sftp窗口,然后將jdk壓縮包拖入sftp窗口
2) 然后在linux中將jdk壓縮包解壓到/root/apps 下
3) 配置環境變量:JAVA_HOME PATH
vi /etc/profile 在文件的最后,加入:
export JAVA_HOME=/root/apps/jdk1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin
4) 修改完成后,記得 source /etc/profile使配置生效
5) 檢驗:在任意目錄下輸入命令: java -version 看是否成功執行
6) 將安裝好的jdk目錄用scp命令拷貝到其他機器
7) 將/etc/profile配置文件也用scp命令拷貝到其他機器並分別執行source命令
- 集群內主機的域名映射配置
在hdp-01上,vi /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.33.11 hdp-01 192.168.33.12 hdp-02 192.168.33.13 hdp-03 192.168.33.14 hdp-04 |
然后,將hosts文件拷貝到集群中的所有其他機器上
scp /etc/hosts hdp-02:/etc/
scp /etc/hosts hdp-03:/etc/
scp /etc/hosts hdp-04:/etc/
補 充 提示: |
如果在執行scp命令的時候,提示沒有scp命令,則可以配置一個本地yum源來安裝 1、先在虛擬機中配置cdrom為一個centos的安裝鏡像iso文件 2、在linux系統中將光驅掛在到文件系統中(某個目錄) 3、mkdir /mnt/cdrom 4、mount -t iso9660 -o loop /dev/cdrom /mnt/cdrom 5、檢驗掛載是否成功: ls /mnt/cdrom 6、3、配置yum的倉庫地址配置文件 7、yum的倉庫地址配置文件目錄: /etc/yum.repos.d 8、先將自帶的倉庫地址配置文件批量更名: 9、然后,拷貝一個出來進行修改 10、修改完配置文件后,再安裝scp命令: 11、yum install openssh-clients -y |
4.2.4、安裝hdfs集群
1、上傳hadoop安裝包到hdp-01
bin文件為hadoop功能命令,sbin中為集群管理命令。
2、修改配置文件
要 點 提 示 |
核心配置參數: 1) 指定hadoop的默認文件系統為:hdfs 2) 指定hdfs的namenode節點為哪台機器 3) 指定namenode軟件存儲元數據的本地目錄 4) 指定datanode軟件存放文件塊的本地目錄 |
hadoop的配置文件在:/root/apps/hadoop安裝目錄/etc/hadoop/
hadoop中的其他組件如mapreduce,yarn等,這些組將會去讀數據,指定hadoop的默認文件系統為:hdfs,就是告訴這些組件去hdfs中讀數據;該項配置意味dadoop中的組件可以訪問各種文件系統。
若不指定數據的存放目錄,hadoop默認將數據存放在/temp下。
可以參考官網的默認配置信息。
1) 修改hadoop-env.sh
export JAVA_HOME=/root/apps/jdk1.8.0_60
2) 修改core-site.xml
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://hdp-01:9000/</value> </property> </configuration>
<value>hdfs://hdp-01:9000</value>包含兩層意思:
1、指定默認的文件系統。
2、指明了namenode是誰。
value中的值是URI風格
3) 修改hdfs-site.xml
配置namenode和datanode的工作目錄,添加secondary name node。
<configuration> <property> <name>dfs.namenode.name.dir</name> <value>/root/hdpdata/name/</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/root/hdpdata/data</value> </property> <property> <name>dfs.namenode.secondary.http-address</name> <value>hdp-02:50090</value> </property> </configuration>
4) 拷貝整個hadoop安裝目錄到其他機器
scp -r /root/apps/hadoop-2.8.1 hdp-02:/root/apps/ scp -r /root/apps/hadoop-2.8.1 hdp-03:/root/apps/ scp -r /root/apps/hadoop-2.8.1 hdp-04:/root/apps/
5) 啟動HDFS
所謂的啟動HDFS,就是在對的機器上啟動對的軟件
要 點 提示: |
要運行hadoop的命令,需要在linux環境中配置HADOOP_HOME和PATH環境變量 vi /etc/profile
|
首先,初始化namenode的元數據目錄
要在hdp-01上執行hadoop的一個命令來初始化namenode的元數據存儲目錄
hadoop namenode -format
創建一個全新的元數據存儲目錄
生成記錄元數據的文件fsimage
生成集群的相關標識:如:集群id——clusterID
該步驟叫做namenode的初始化也叫格式化,本質是建立namenode運行所需要的目錄以及一些必要的文件,所以該操作一般只在集群第一次啟動之前執行。
然后,啟動namenode進程(在hdp-01上)
hadoop-daemon.sh start namenode
啟動完后,首先用jps查看一下namenode的進程是否存在
namenode就是一個java軟件,我們知道啟動一個java軟件需要主類的main方法 java xxx.java - 若干參數,處於方便的考慮,hadoop中提供了一個通用的軟件啟動腳本hadoop-daemon.sh,腳本可以接受參數,專門用來啟動hadoop中的軟件。
可以看到namenode在監聽兩個端口,9000用來和客戶端通信(9000為RPC端口號,內部進程之間互相通信的端口,datanode和namenode的通信),接受hdfs客戶端的請求,50070是web服務端口,也就是說namenode內置一個web服務器,http客戶端可以通過次端口發送請求。
然后,在windows中用瀏覽器訪問namenode提供的web端口:50070
http://hdp-01:50070
然后,啟動眾datanode們(在任意地方)
hadoop-daemon.sh start datanode
下圖是datanode的一下信息展示,可以看到datanode內部通信的端口號是50010,而且datanode也提供了問訪問端口50075.
6) 用自動批量啟動腳本來啟動HDFS
hdfs其實就是一堆java軟件,我們可以自己手動hadoop-daemon.sh逐個啟動,也可以使用hadoop提供的批量啟動腳本。
1) 先配置hdp-01到集群中所有機器(包含自己)的免密登陸
2) 配完免密后,可以執行一次 ssh 0.0.0.0
3) 修改hadoop安裝目錄中/etc/hadoop/slaves(把需要啟動datanode進程的節點列入)
hdp-01 hdp-02 hdp-03 hdp-04 |
core-site.xml中配置過namenode,但是需要批量啟動那些datanode呢,該文件/etc/hadoop/slaves的配置就是解決這個問題的,該文件就是給啟動腳本看的。
4) 在hdp-01上用腳本:start-dfs.sh 來自動啟動整個集群
5) 如果要停止,則用腳本:stop-dfs.sh
start-dfs.sh、stop-dfs.sh會啟動、關閉namenode,datanode和secondnamenode
當然你也可以自己寫腳本來做上述的事情 ,如下所示。
5、hdfs的客戶端操作
hdfs裝好之后,接下來的工作就是hdfs里傳東西,去東西,由客戶端來完成。
5.1、客戶端的理解
hdfs的客戶端有多種形式:
1、網頁形式
2、命令行形式
3、客戶端在哪里運行,沒有約束,只要運行客戶端的機器能夠跟hdfs集群聯網們
對於客戶端來講,hdfs是一個整體,網頁版的客戶端主要是用來查看hdfs信息的,可以創建目錄,但是需要權限
命令行客戶端
bin命令中的 hadoop 和 hdfs 都可以啟動 hdfs 客戶端,hadoop和hdfs都是腳本,都會去啟動一個hdfs的java客戶端。java客戶端在安裝包的jar包中
./hadoop fs -ls /
表示hadoop要訪問hdfs,該腳本就會去啟動hdfs客戶端,客戶端可以接收參數,比如查看hdfs根目錄。
文件的切塊大小和存儲的副本數量,都是由客戶端決定!
所謂的由客戶端決定,是通過配置參數來定的
hdfs的客戶端會讀以下兩個參數,來決定切塊大小(默認128M)、副本數量(默認3):
切塊大小的參數: dfs.blocksize
副本數量的參數: dfs.replication
如果使用命令行客戶端時,上面兩個參數應該配置在客戶端機器的hadoop目錄中的hdfs-site.xml中配置,(命令行客戶端本質就是啟動了一個java客戶端,這個客戶端在啟動的時候會將它依賴的所有jar包加入classpath中,客戶端會從jar包中,加載xx-default.xml來獲得默認的配置文件,也可以在hadoop/etc/xxx-site.xml中配置具體的參數來覆蓋默認值。此時的/etc下的配置文件就是客戶自定義的配置文件,也會被java客戶端加載【客戶端可以運行在任何地方】);
當然也可以在具體代碼中指定,見6節中的核心代碼
<property> <name>dfs.blocksize</name> <value>64m</value> </property> <property> <name>dfs.replication</name> <value>2</value> </property>
5.1.1、上傳過程
下圖為datanode中的數據,.meta是該block的校驗和信息。我們可以通過linux cat命令將兩個塊合並,會發現與原來的文件是一樣的。
5.1.2、下載過程
客戶端首先回去namenode上去查找,有沒有請求的hdfs路徑下的文件,有的話都有該文件被切割成幾塊,每塊有幾個副本,這些副本都存放在集群中的哪些機器上,然后去存放了第一塊數據的某一台機器上去下載第一塊數據,將數據追加到本地,然后去下載下一塊數據,繼續追加到本地文件,知道下載完所有的塊。
5.2、hdfs客戶端的常用操作命令
1、上傳文件到hdfs中
hadoop fs -put /本地文件 /aaa
2、下載文件到客戶端本地磁盤
hadoop fs -get /hdfs中的路徑 /本地磁盤目錄
3、在hdfs中創建文件夾
hadoop fs -mkdir -p /aaa/xxx
4、移動hdfs中的文件(更名)
hadoop fs -mv /hdfs的路徑1 /hdfs的另一個路徑2
復制hdfs中的文件到hdfs的另一個目錄
hadoop fs -cp /hdfs路徑_1 /hdfs路徑_2
5、刪除hdfs中的文件或文件夾
hadoop fs -rm -r /aaa
6、查看hdfs中的文本文件內容
hadoop fs -cat /demo.txt
hadoop fs -tail /demo.txt
hadoop fs -tail -f /demo.txt
hadoop fs -text /demo.txt
7、查看hdfs目錄下有哪些文件
hadoop fs –ls /
8、追加本地文件到hdfs中的文件
hadoop fs -appendToFile 本地路徑 /hdfs路徑
9、權限修改
hadoop fs -chmod username1:usergroup1 /hdfs路徑
要說明的是,hdfs中的用戶和用戶組這是一個名字稱呼,與linux不一樣,linux中不能將選線分配給一個不存在的用戶。
可以查看hadoop fs 不帶任何參數,來查看hdfs所支持的命令
Usage: hadoop fs [generic options] [-appendToFile <localsrc> ... <dst>] [-cat [-ignoreCrc] <src> ...] [-checksum <src> ...] [-chgrp [-R] GROUP PATH...] [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...] [-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>] [-createSnapshot <snapshotDir> [<snapshotName>]] [-deleteSnapshot <snapshotDir> <snapshotName>] [-df [-h] [<path> ...]] [-du [-s] [-h] [-x] <path> ...] [-expunge] [-find <path> ... <expression> ...] [-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] [-getfacl [-R] <path>] [-getfattr [-R] {-n name | -d} [-e en] <path>] [-getmerge [-nl] [-skip-empty-file] <src> <localdst>] [-help [cmd ...]] [-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]] [-mkdir [-p] <path> ...] [-moveFromLocal <localsrc> ... <dst>] [-moveToLocal <src> <localdst>] [-mv <src> ... <dst>] [-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>] [-renameSnapshot <snapshotDir> <oldName> <newName>] [-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...] [-rmdir [--ignore-fail-on-non-empty] <dir> ...] [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]] [-setfattr {-n name [-v value] | -x name} <path>] [-setrep [-R] [-w] <rep> <path> ...] [-stat [format] <path> ...] [-tail [-f] <file>] [-test -[defsz] <path>] [-text [-ignoreCrc] <src> ...] [-touchz <path> ...] [-truncate [-w] <length> <path> ...] [-usage [cmd ...]]
6、hdfs的java客戶端編程
HDFS客戶端編程應用場景:數據采集
業務系統中日志生成機制
數據采集程序其實就是通過對java客戶端編程,將數據不斷的上傳到hdfs。
在windows開發環境中做一些准備工作:
1、在windows的某個路徑中解壓一份windows版本的hadoop安裝包
2、將解壓出的hadoop目錄配置到windows的環境變量中:HADOOP_HOME
原因:若不配置環境變量,會在下載hdfs文件是出錯,是由於使用hadoop的FileSystem保存文件到本地的時候出於效率的考慮,會使用hadoop安裝包中的c語言庫,顯然沒有配置hadoop環境變量時是找不到該c語言類庫中的文件的;然而上傳文件到hdfs沒有類似問題;
6.1、核心代碼
1、將hdfs客戶端開發所需的jar導入工程(jar包可在hadoop安裝包中找到common和hdfs)
2、寫代碼
6.1.1、獲取hdfs客戶端
要點:要對hdfs中的文件進行操作,代碼中首先需要獲得一個hdfs的客戶端對象
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
完整代碼如下:

/** * Configuration參數對象的機制: * 構造時,會加載jar包中的默認配置 xx-default.xml(core-default.xmlhdfs-default.xml) * 再加載 用戶配置xx-site.xml ,覆蓋掉默認參數 * 構造完成之后,還可以conf.set("p","v"),會再次覆蓋用戶配置文件中的參數值 */ // new Configuration()會從項目的classpath中加載core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件 Configuration conf = new Configuration(); // 指定本客戶端上傳文件到hdfs時需要保存的副本數為:2 conf.set("dfs.replication", "2"); // 指定本客戶端上傳文件到hdfs時切塊的規格大小:64M conf.set("dfs.blocksize", "64m"); // 構造一個訪問指定HDFS系統的客戶端對象: 參數1:——HDFS系統的URI,參數2:——客戶端要特別指定的參數,參數3:客戶端的身份(用戶名) FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); // 上傳一個文件到HDFS中 fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/")); fs.close();
6.1.2、對文件進行操作
上傳、下載文件;文件夾的創建和刪除、文件的移動和復制、查看文件夾和文件等。
3、利用fs對象的方法進行文件操作
方法均與命令行方法對應,比如:
上傳文件
fs.copyFromLocalFile(new Path("本地路徑"),new Path("hdfs的路徑"));
下載文件
fs.copyToLocalFile(new Path("hdfs的路徑"),new Path("本地路徑"))
對文件的增刪改查如下,對文件數據的操作后續介紹。

FileSystem fs = null; @Before public void init() throws Exception{ Configuration conf = new Configuration(); conf.set("dfs.replication", "2"); conf.set("dfs.blocksize", "64m"); fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root"); } /** * 從HDFS中下載文件到客戶端本地磁盤 * @throws IOException * @throws IllegalArgumentException */ @Test public void testGet() throws IllegalArgumentException, IOException{ fs.copyToLocalFile(new Path("/hdp20-05.txt"), new Path("f:/")); fs.close(); } /** * 在hdfs內部移動文件\修改名稱 */ @Test public void testRename() throws Exception{ fs.rename(new Path("/install.log"), new Path("/aaa/in.log")); fs.close(); } /** * 在hdfs中創建文件夾 */ @Test public void testMkdir() throws Exception{ fs.mkdirs(new Path("/xx/yy/zz")); fs.close(); } /** * 在hdfs中刪除文件或文件夾 */ @Test public void testRm() throws Exception{ fs.delete(new Path("/aaa"), true); fs.close(); } /** * 查詢hdfs指定目錄下的文件信息 */ @Test public void testLs() throws Exception{ // 只查詢文件的信息,不返回文件夾的信息 RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true); while(iter.hasNext()){ LocatedFileStatus status = iter.next(); System.out.println("文件全路徑:"+status.getPath()); System.out.println("塊大小:"+status.getBlockSize()); System.out.println("文件長度:"+status.getLen()); System.out.println("副本數量:"+status.getReplication()); System.out.println("塊信息:"+Arrays.toString(status.getBlockLocations())); System.out.println("--------------------------------"); } fs.close(); } /** * 查詢hdfs指定目錄下的文件和文件夾信息 */ @Test public void testLs2() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status:listStatus){ System.out.println("文件全路徑:"+status.getPath()); System.out.println(status.isDirectory()?"這是文件夾":"這是文件"); System.out.println("塊大小:"+status.getBlockSize()); System.out.println("文件長度:"+status.getLen()); System.out.println("副本數量:"+status.getReplication()); System.out.println("--------------------------------"); } fs.close(); }
6.1.3、對文件數據進行操作
同過客戶端使用open打開流對象來讀取hdfs中文件的具體數據,包括指定偏移量來讀取特定范圍的數據;通過客戶端向hdfs文件追加數據。

/** * 讀取hdfs中的文件的內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/test.txt")); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 讀取hdfs中文件的指定偏移量范圍的內容 * * * 作業題:用本例中的知識,實現讀取一個文本文件中的指定BLOCK塊中的所有數據 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testRandomReadData() throws IllegalArgumentException, IOException { FSDataInputStream in = fs.open(new Path("/xx.dat")); // 將讀取的起始位置進行指定 in.seek(12); // 讀16個字節 byte[] buf = new byte[16]; in.read(buf); System.out.println(new String(buf)); in.close(); fs.close(); }
寫數據,create提供了豐富的重載函數,輕松實現覆蓋,追加,以及指定緩存大小,副本數量等等信息。

/** * 往hdfs中的文件寫內容 * * @throws IOException * @throws IllegalArgumentException */ @Test public void testWriteData() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false); // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg"); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) != -1) { out.write(buf,0,read); } in.close(); out.close(); fs.close(); }
7、HDFS實例
hdfs版本wordcount程序。
任務描述:
1、從hdfs文件中讀取數據,每次讀取一行數據;
2、將數據交給具體的單詞統計業務去作業(使用面向接口編程,當業務邏輯改變時,無需修改主程序代碼);
3、並將該行數據產生的結果存入緩存中(可以用hashmap模擬)
數據采集設計:
1、流程
啟動一個定時任務:
——定時探測日志源目錄
——獲取需要采集的文件
——移動這些文件到一個待上傳臨時目錄
——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄
啟動一個定時任務:
——探測備份目錄中的備份數據,檢查是否已超出最長備份時長,如果超出,則刪除
2、規划各種路徑
日志源路徑: d:/logs/accesslog/
待上傳臨時目錄: d:/logs/toupload/
備份目錄: d:/logs/backup/日期/
HDFS存儲路徑: /logs/日期
HDFS中的文件的前綴:access_log_
HDFS中的文件的后綴:.log
將路徑配置寫入屬性文件

MAPPER_CLASS=cn.edu360.hdfs.wordcount.CaseIgnorWcountMapper INPUT_PATH=/wordcount/input OUTPUT_PATH=/wordcount/output2
主程序代碼示例:

import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; public class HdfsWordcount { public static void main(String[] args) throws Exception{ /** * 初始化工作 */ Properties props = new Properties(); props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties")); Path input = new Path(props.getProperty("INPUT_PATH")); Path output = new Path(props.getProperty("OUTPUT_PATH")); Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS")); Mapper mapper = (Mapper) mapper_class.newInstance(); Context context = new Context(); /** * 處理數據 */ FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root"); RemoteIterator<LocatedFileStatus> iter = fs.listFiles(input, false); while(iter.hasNext()){ LocatedFileStatus file = iter.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; // 逐行讀取 while ((line = br.readLine()) != null) { // 調用一個方法對每一行進行業務處理 mapper.map(line, context); } br.close(); in.close(); } /** * 輸出結果 */ HashMap<Object, Object> contextMap = context.getContextMap(); if(fs.exists(output)){ throw new RuntimeException("指定的輸出目錄已存在,請更換......!"); } FSDataOutputStream out = fs.create(new Path(output,new Path("res.dat"))); Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for (Entry<Object, Object> entry : entrySet) { out.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes()); } out.close(); fs.close(); System.out.println("恭喜!數據統計完成....."); } }
自定義的業務接口

public interface Mapper { public void map(String line,Context context); }
業務實現類1

public class WordCountMapper implements Mapper{ @Override public void map(String line, Context context) { String[] words = line.split(" "); for (String word : words) { Object value = context.get(word); if(null==value){ context.write(word, 1); }else{ int v = (int)value; context.write(word, v+1); } } } }
業務實現類2

public class CaseIgnorWcountMapper implements Mapper { @Override public void map(String line, Context context) { String[] words = line.toUpperCase().split(" "); for (String word : words) { Object value = context.get(word); if (null == value) { context.write(word, 1); } else { int v = (int) value; context.write(word, v + 1); } } } }
緩存模擬

import java.util.HashMap; public class Context { private HashMap<Object,Object> contextMap = new HashMap<>(); public void write(Object key,Object value){ contextMap.put(key, value); } public Object get(Object key){ return contextMap.get(key); } public HashMap<Object,Object> getContextMap(){ return contextMap; } }
8、實戰描述
需求描述:
在業務系統的服務器上,業務程序會不斷生成業務日志(比如網站的頁面訪問日志)
業務日志是用log4j生成的,會不斷地切出日志文件
需要定期(比如每小時)從業務服務器上的日志目錄中,探測需要采集的日志文件(access.log,不是直接采集數據),發往HDFS
注意點:業務服務器可能有多台(hdfs上的文件名不能直接用日志服務器上的文件名)
當天采集到的日志要放在hdfs的當天目錄中
采集完成的日志文件,需要移動到到日志服務器的一個備份目錄中
定期檢查(一小時檢查一次)備份目錄,將備份時長超出24小時的日志文件清除
Timer timer = new Timer() timer.schedual()
簡易版日志采集主程序

import java.util.Timer; public class DataCollectMain { public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new CollectTask(), 0, 60*60*1000L); timer.schedule(new BackupCleanTask(), 0, 60*60*1000L); } }
日志收集定時任務類

import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class CollectTask extends TimerTask { @Override public void run() { /** * ——定時探測日志源目錄 ——獲取需要采集的文件 ——移動這些文件到一個待上傳臨時目錄 * ——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄 * */ try { // 獲取配置參數 Properties props = PropertyHolderLazy.getProps(); // 構造一個log4j日志對象 Logger logger = Logger.getLogger("logRollingFile"); // 獲取本次采集時的日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR)); // 列出日志源目錄中需要采集的文件 File[] listFiles = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) { return true; } return false; } }); // 記錄日志 logger.info("探測到如下文件需要采集:" + Arrays.toString(listFiles)); // 將要采集的文件移動到待上傳臨時目錄 File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR)); for (File file : listFiles) { FileUtils.moveFileToDirectory(file, toUploadDir, true); } // 記錄日志 logger.info("上述文件移動到了待上傳目錄" + toUploadDir.getAbsolutePath()); // 構造一個HDFS的客戶端對象 FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root"); File[] toUploadFiles = toUploadDir.listFiles(); // 檢查HDFS中的日期目錄是否存在,如果不存在,則創建 Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day); if (!fs.exists(hdfsDestPath)) { fs.mkdirs(hdfsDestPath); } // 檢查本地的備份目錄是否存在,如果不存在,則創建 File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/"); if (!backupDir.exists()) { backupDir.mkdirs(); } for (File file : toUploadFiles) { // 傳輸文件到HDFS並改名access_log_ Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX)); fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath); // 記錄日志 logger.info("文件傳輸到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath); // 將傳輸完成的文件移動到備份目錄 FileUtils.moveFileToDirectory(file, backupDir, true); // 記錄日志 logger.info("文件備份完成:" + file.getAbsolutePath() + "-->" + backupDir); } } catch (Exception e) { e.printStackTrace(); } } }
定期清理過時備份日志

import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask; import org.apache.commons.io.FileUtils; public class BackupCleanTask extends TimerTask { @Override public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); long now = new Date().getTime(); try { // 探測本地備份目錄 File backupBaseDir = new File("d:/logs/backup/"); File[] dayBackDir = backupBaseDir.listFiles(); // 判斷備份日期子目錄是否已超24小時 for (File dir : dayBackDir) { long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ FileUtils.deleteDirectory(dir); } } } catch (Exception e) { e.printStackTrace(); } } }
配置信息提取到屬性配置文件中,並寫成常量,以單例設計模式去加載配置信息。

LOG_SOURCE_DIR=d:/logs/accesslog/ LOG_TOUPLOAD_DIR=d:/logs/toupload/ LOG_BACKUP_BASE_DIR=d:/logs/backup/ LOG_BACKUP_TIMEOUT=24 LOG_LEGAL_PREFIX=access.log. HDFS_URI=hdfs://hdp-01:9000/ HDFS_DEST_BASE_DIR=/logs/ HDFS_FILE_PREFIX=access_log_ HDFS_FILE_SUFFIX=.log

public class Constants { /** * 日志源目錄參數key */ public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR"; /** * 日志待上傳目錄參數key */ public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR"; public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR"; public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT"; public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX"; public static final String HDFS_URI = "HDFS_URI"; public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR"; public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX"; public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX"; }

import java.util.Properties; /** * 單例模式:懶漢式——考慮了線程安全 * @author ThinkPad * */ public class PropertyHolderLazy { private static Properties prop = null; public static Properties getProps() throws Exception { if (prop == null) { synchronized (PropertyHolderLazy.class) { if (prop == null) { prop = new Properties(); prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties")); } } } return prop; } }

import java.util.Properties; /** * 單例設計模式,方式一: 餓漢式單例 * @author ThinkPad * */ public class PropertyHolderHungery { private static Properties prop = new Properties(); static { try { prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties")); } catch (Exception e) { } } public static Properties getProps() throws Exception { return prop; } }
9、總結
hdfs有服務端和客戶端;
服務端:
成員:namenode 管理元數據,datanode存儲數據
配置:需要指定使用的文件系統(默認的配置在core-default.xml,為本地文件系統,需要修改服務器core-site.xml修改為hdfs文件系統,並指定namenode),namenode和datanode的工作目錄(服務器的默認配置在hdfs-default.xml中,默認是在/temp下,需要就該hdfs-site.xml來覆蓋默認值。);
細節:第一次啟動集群時需要格式化namenode
客戶端:
形式:網頁端,命令行,java客戶端api;客戶端可以運行在任何地方。
功能:指定上傳文件的信息報括切塊大小(hdfs-default.xml中默認值128m,可以在hdfs-site.xml中修改,也可以咋java api 中創建客戶端對象的時候指定,總之由客戶端來指定),副本數量(hdfs-default.xml中默認值3,同樣可以修改覆蓋);完成hdfs中文件的系列操作,如上傳,下載
雖然服務端和客戶端的共用配置 core-default.xml core-site.xml;hdfs-default.xml hdfs-site.xml,但是不同的程序所需要的參數不同,只不過為了方便,所有參數都寫在一個文件中了。即是在服務器的hdfs-site.xml中配置了切塊大小和副本數量,服務器的namenode和datanode根本不關心也不使用這些參數,只有啟動服務器上的命令行客戶端時,該參數才可能起作用。