hadoop之HDFS學習筆記(一)


主要內容:hdfs的整體運行機制,DATANODE存儲文件塊的觀察,hdfs集群的搭建與配置,hdfs命令行客戶端常見命令;業務系統中日志生成機制,HDFS的java客戶端api基本使用。

1、什么是大數據

基本概念

《數據處理》

在互聯網技術發展到現今階段,大量日常、工作等事務產生的數據都已經信息化,人類產生的數據量相比以前有了爆炸式的增長,以前的傳統的數據處理技術已經無法勝任,需求催生技術,一套用來處理海量數據的軟件工具應運而生,這就是大數據!

 

處理海量數據的核心技術:

海量數據存儲:分布式

海量數據運算:分布式

大數據的海量數據的存儲和運算,核心技術就是分布式。

 

這些核心技術的實現是不需要用戶從零開始造輪子的

存儲和運算,都已經有大量的成熟的框架來用

 

存儲框架:

HDFS——分布式文件存儲系統(HADOOP中的存儲框架)

HBASE——分布式數據庫系統

KAFKA——分布式消息緩存系統(實時流式數據處理場景中應用廣泛)

 

文件系統中的數據以非結構化居多,沒有直觀的結構,數據庫中的信息多以表的形式存在,具有結構化,存在規律;

查詢的時候文本文件只能一行一行掃描,而數據庫效率高很多,可以利用sql查詢語法,數據庫在存和取方便的多。

數據庫和文件系統相比,數據庫相當於在特定的文件系統上的軟件封裝。其實HBASE就是對HDFS的進一層封裝,它的底層文件系統就是HDFS。

分布式消息緩存系統,既然是分布式,那就意味着橫跨很多機器,意味着容量可以很大。和前兩者相比它的數據存儲形式是消息(不是表,也不是文件),消息可以看做有固定格式的一條數據,比如消息頭,消息體等,消息體可以是json,數據庫的一條記錄,一個序列化對象等。消息最終存放在kafaka內部的特定的文件系統里。

 

運算框架:(要解決的核心問題就是幫用戶將處理邏輯在很多機器上並行

MAPREDUCE—— 離線批處理/HADOOP中的運算框架

SPARK —— 離線批處理/實時流式計算

STORM —— 實時流式計算

 

離線批處理:數據是靜態的,一次處理一大批數據。

實時流式:數據在源源不斷的生成,邊生成,邊計算

這些運算框架的思想都差不多,特別是mapreduce和spark,簡單來看spark是對mapreduce的進一步封裝;

運算框架和存儲框架之間沒有強耦合關系,spark可以讀HDFS,HBASE,KAFKA里的數據,當然需要存儲框架提供訪問接口。

 

輔助類的工具(解放大數據工程師的一些繁瑣工作):

HIVE —— 數據倉庫工具:可以接收sql,翻譯成mapreduce或者spark程序運行

FLUME——數據采集

SQOOP——數據遷移

ELASTIC SEARCH —— 分布式的搜索引擎

flume用於自動采集數據源機器上的數據到大數據集群中。

HIVE看起來像一個數據庫,但其實不是,Hive中存了一些需要分析的數據,然后在直接寫sql進行分析,hive接收sql,翻譯成mapreduce或者spark程序運行;

hive本質是mapreduce或spark,我們只需要寫sql邏輯而不是mapreduce邏輯,Hive自動完成對sql的翻譯,而且還是在海量數據集上。

.......

 

換個角度說,大數據是:

1、有海量的數據

2、有對海量數據進行挖掘的需求

3、有對海量數據進行挖掘的軟件工具(hadoop、spark、storm、flink、tez、impala......)

大數據在現實生活中的具體應用

數據處理的最典型應用:公司的產品運營情況分析

 

電商推薦系統:基於海量的瀏覽行為、購物行為數據,進行大量的算法模型的運算,得出各類推薦結論,以供電商網站頁面來為用戶進行商品推薦

 

精准廣告推送系統:基於海量的互聯網用戶的各類數據,統計分析,進行用戶畫像(得到用戶的各種屬性標簽),然后可以為廣告主進行有針對性的精准的廣告投放

2、什么是hadoop

hadoop中有3個核心組件:

分布式文件系統:HDFS —— 實現將文件分布式存儲在很多的服務器上

分布式運算編程框架:MAPREDUCE —— 實現在很多機器上分布式並行運算

分布式資源調度平台:YARN —— 幫用戶調度大量的mapreduce程序,並合理分配運算資源

3、hdfs整體運行機制

hdfs:分布式文件系統

hdfs有着文件系統共同的特征:

1、有目錄結構,頂層目錄是:  /

2、系統中存放的就是文件

3、系統可以提供對文件的:創建、刪除、修改、查看、移動等功能

 

hdfs跟普通的單機文件系統有區別:

1、單機文件系統中存放的文件,是在一台機器的操作系統中

2、hdfs的文件系統會橫跨N多的機器

3、單機文件系統中存放的文件,是在一台機器的磁盤上

4、hdfs文件系統中存放的文件,是落在n多機器的本地單機文件系統中(hdfs是一個基於linux本地文件系統之上的文件系統)

 

hdfs的工作機制:

1、客戶把一個文件存入hdfs,其實hdfs會把這個文件切塊后,分散存儲在N台linux機器系統中(負責存儲文件塊的角色:data node)<准確來說:切塊的行為是由客戶端決定的>

2、一旦文件被切塊存儲,那么,hdfs中就必須有一個機制,來記錄用戶的每一個文件的切塊信息,及每一塊的具體存儲機器(負責記錄塊信息的角色是:name node)

3、為了保證數據的安全性,hdfs可以將每一個文件塊在集群中存放多個副本(到底存幾個副本,是由當時存入該文件的客戶端指定的)

 

綜述:一個hdfs系統,由一台運行了namenode的服務器,和N台運行了datanode的服務器組成!

4、搭建hdfs分布式集群

4.1 hdfs集群組成結構:

4.2 安裝hdfs集群的具體步驟:

4.2.1、首先需要准備N台linux服務器

學習階段,用虛擬機即可!

先准備4台虛擬機:1個namenode節點  + 3 個datanode 節點

 

4.2.2、修改各台機器的主機名和ip地址

主機名:hdp-01  對應的ip地址:192.168.33.11

主機名:hdp-02  對應的ip地址:192.168.33.12

主機名:hdp-03  對應的ip地址:192.168.33.13

主機名:hdp-04  對應的ip地址:192.168.33.14

4.2.3、從windows中用CRT軟件進行遠程連接

在windows中將各台linux機器的主機名配置到的windows的本地域名映射文件中:

c:/windows/system32/drivers/etc/hosts

192.168.33.11 hdp-01

192.168.33.12 hdp-02

192.168.33.13 hdp-03

192.168.33.14 hdp-04

用crt連接上后,修改一下crt的顯示配置(字號,編碼集改為UTF-8):

 

 

 

4.2.3、配置linux服務器的基礎軟件環境

 

  •  防火牆

關閉防火牆:service iptables stop 

關閉防火牆自啟: chkconfig iptables off

 

  •  安裝jdk:(hadoop體系中的各軟件都是java開發的)

  1)        利用alt+p 打開sftp窗口,然后將jdk壓縮包拖入sftp窗口

  2)         然后在linux中將jdk壓縮包解壓到/root/apps 下

  3)         配置環境變量:JAVA_HOME   PATH

  vi /etc/profile   在文件的最后,加入:

export JAVA_HOME=/root/apps/jdk1.8.0_60

export PATH=$PATH:$JAVA_HOME/bin

  4)         修改完成后,記得 source /etc/profile使配置生效

  5)         檢驗:在任意目錄下輸入命令: java -version 看是否成功執行

  6)         將安裝好的jdk目錄用scp命令拷貝到其他機器

  7)         將/etc/profile配置文件也用scp命令拷貝到其他機器並分別執行source命令

  •   集群內主機的域名映射配置

在hdp-01上,vi /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.33.11   hdp-01

192.168.33.12   hdp-02

192.168.33.13   hdp-03

192.168.33.14   hdp-04

然后,將hosts文件拷貝到集群中的所有其他機器上

scp /etc/hosts hdp-02:/etc/

scp /etc/hosts hdp-03:/etc/

scp /etc/hosts hdp-04:/etc/

 

提示:

如果在執行scp命令的時候,提示沒有scp命令,則可以配置一個本地yum源來安裝

1、先在虛擬機中配置cdrom為一個centos的安裝鏡像iso文件

2、在linux系統中將光驅掛在到文件系統中(某個目錄)

3、mkdir /mnt/cdrom

4、mount -t iso9660 -o loop /dev/cdrom /mnt/cdrom

5、檢驗掛載是否成功: ls /mnt/cdrom

6、3、配置yum的倉庫地址配置文件

7、yum的倉庫地址配置文件目錄: /etc/yum.repos.d

8、先將自帶的倉庫地址配置文件批量更名:

 

9、然后,拷貝一個出來進行修改

 

 

10、修改完配置文件后,再安裝scp命令:

11、yum install openssh-clients -y

4.2.4、安裝hdfs集群

 

1、上傳hadoop安裝包到hdp-01

bin文件為hadoop功能命令,sbin中為集群管理命令。

2、修改配置文件

 

核心配置參數:

1)         指定hadoop的默認文件系統為:hdfs

2)         指定hdfs的namenode節點為哪台機器

3)         指定namenode軟件存儲元數據的本地目錄

4)         指定datanode軟件存放文件塊的本地目錄

hadoop的配置文件在:/root/apps/hadoop安裝目錄/etc/hadoop/

hadoop中的其他組件如mapreduce,yarn等,這些組將會去讀數據,指定hadoop的默認文件系統為:hdfs,就是告訴這些組件去hdfs中讀數據;該項配置意味dadoop中的組件可以訪問各種文件系統。

若不指定數據的存放目錄,hadoop默認將數據存放在/temp下。

可以參考官網的默認配置信息。

1) 修改hadoop-env.sh
export JAVA_HOME=/root/apps/jdk1.8.0_60
2) 修改core-site.xml
<configuration>

<property>

<name>fs.defaultFS</name>

<value>hdfs://hdp-01:9000/</value>

</property>

</configuration>

 

<value>hdfs://hdp-01:9000</value>包含兩層意思:

  1、指定默認的文件系統。

  2、指明了namenode是誰。

value中的值是URI風格

3) 修改hdfs-site.xml

配置namenode和datanode的工作目錄,添加secondary name node。

<configuration>

<property>

<name>dfs.namenode.name.dir</name>

<value>/root/hdpdata/name/</value>

</property>

 

<property>

<name>dfs.datanode.data.dir</name>

<value>/root/hdpdata/data</value>

</property>

 

<property>

<name>dfs.namenode.secondary.http-address</name>

<value>hdp-02:50090</value>

</property>

 

</configuration>

 

4) 拷貝整個hadoop安裝目錄到其他機器 
  scp -r /root/apps/hadoop-2.8.1  hdp-02:/root/apps/

  scp -r /root/apps/hadoop-2.8.1  hdp-03:/root/apps/

  scp -r /root/apps/hadoop-2.8.1  hdp-04:/root/apps/

 

5) 啟動HDFS

 

所謂的啟動HDFS,就是在對的機器上啟動對的軟件

提示:

要運行hadoop的命令,需要在linux環境中配置HADOOP_HOME和PATH環境變量

vi /etc/profile

 


export JAVA_HOME=/root/apps/jdk1.8.0_60 export HADOOP_HOME=/root/apps/hadoop-2.8.1 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

 

 

首先,初始化namenode的元數據目錄

要在hdp-01上執行hadoop的一個命令來初始化namenode的元數據存儲目錄

hadoop namenode -format

  創建一個全新的元數據存儲目錄

  生成記錄元數據的文件fsimage

  生成集群的相關標識:如:集群id——clusterID

該步驟叫做namenode的初始化也叫格式化,本質是建立namenode運行所需要的目錄以及一些必要的文件,所以該操作一般只在集群第一次啟動之前執行。

 

 

然后,啟動namenode進程(在hdp-01上)

hadoop-daemon.sh start namenode

啟動完后,首先用jps查看一下namenode的進程是否存在

namenode就是一個java軟件,我們知道啟動一個java軟件需要主類的main方法 java xxx.java - 若干參數,處於方便的考慮,hadoop中提供了一個通用的軟件啟動腳本hadoop-daemon.sh,腳本可以接受參數,專門用來啟動hadoop中的軟件。

 

可以看到namenode在監聽兩個端口,9000用來和客戶端通信(9000為RPC端口號,內部進程之間互相通信的端口,datanode和namenode的通信),接受hdfs客戶端的請求,50070是web服務端口,也就是說namenode內置一個web服務器,http客戶端可以通過次端口發送請求。

然后,在windows中用瀏覽器訪問namenode提供的web端口:50070

http://hdp-01:50070

然后,啟動眾datanode們(在任意地方)

hadoop-daemon.sh start datanode

 

下圖是datanode的一下信息展示,可以看到datanode內部通信的端口號是50010,而且datanode也提供了問訪問端口50075.

6) 用自動批量啟動腳本來啟動HDFS

hdfs其實就是一堆java軟件,我們可以自己手動hadoop-daemon.sh逐個啟動,也可以使用hadoop提供的批量啟動腳本。

1)         先配置hdp-01到集群中所有機器(包含自己)的免密登陸

 

2)         配完免密后,可以執行一次  ssh 0.0.0.0

3)         修改hadoop安裝目錄中/etc/hadoop/slaves(把需要啟動datanode進程的節點列入)

hdp-01

hdp-02

hdp-03

hdp-04

core-site.xml中配置過namenode,但是需要批量啟動那些datanode呢,該文件/etc/hadoop/slaves的配置就是解決這個問題的,該文件就是給啟動腳本看的。

4)         在hdp-01上用腳本:start-dfs.sh 來自動啟動整個集群

5)         如果要停止,則用腳本:stop-dfs.sh

start-dfs.sh、stop-dfs.sh會啟動、關閉namenode,datanode和secondnamenode

當然你也可以自己寫腳本來做上述的事情 ,如下所示。

 

 

5、hdfs的客戶端操作

hdfs裝好之后,接下來的工作就是hdfs里傳東西,去東西,由客戶端來完成。

5.1、客戶端的理解

hdfs的客戶端有多種形式:

1、網頁形式

2、命令行形式

3、客戶端在哪里運行,沒有約束,只要運行客戶端的機器能夠跟hdfs集群聯網們

   對於客戶端來講,hdfs是一個整體,網頁版的客戶端主要是用來查看hdfs信息的,可以創建目錄,但是需要權限

命令行客戶端 

bin命令中的 hadoophdfs  都可以啟動 hdfs 客戶端,hadoop和hdfs都是腳本,都會去啟動一個hdfs的java客戶端。java客戶端在安裝包的jar包中

./hadoop fs -ls /

 

表示hadoop要訪問hdfs,該腳本就會去啟動hdfs客戶端,客戶端可以接收參數,比如查看hdfs根目錄。

 

 

文件的切塊大小和存儲的副本數量,都是由客戶端決定!

所謂的由客戶端決定,是通過配置參數來定的

hdfs的客戶端會讀以下兩個參數,來決定切塊大小(默認128M)、副本數量(默認3):

切塊大小的參數: dfs.blocksize

副本數量的參數: dfs.replication

如果使用命令行客戶端時,上面兩個參數應該配置在客戶端機器的hadoop目錄中的hdfs-site.xml中配置,(命令行客戶端本質就是啟動了一個java客戶端,這個客戶端在啟動的時候會將它依賴的所有jar包加入classpath中,客戶端會從jar包中,加載xx-default.xml來獲得默認的配置文件,也可以在hadoop/etc/xxx-site.xml中配置具體的參數來覆蓋默認值。此時的/etc下的配置文件就是客戶自定義的配置文件,也會被java客戶端加載【客戶端可以運行在任何地方】);

當然也可以在具體代碼中指定,見6節中的核心代碼

<property>
<name>dfs.blocksize</name>
<value>64m</value>
</property>

<property>
<name>dfs.replication</name>
<value>2</value>
</property>

 5.1.1、上傳過程

下圖為datanode中的數據,.meta是該block的校驗和信息。我們可以通過linux cat命令將兩個塊合並,會發現與原來的文件是一樣的。

 

 5.1.2、下載過程

客戶端首先回去namenode上去查找,有沒有請求的hdfs路徑下的文件,有的話都有該文件被切割成幾塊,每塊有幾個副本,這些副本都存放在集群中的哪些機器上,然后去存放了第一塊數據的某一台機器上去下載第一塊數據,將數據追加到本地,然后去下載下一塊數據,繼續追加到本地文件,知道下載完所有的塊。

 

 

5.2、hdfs客戶端的常用操作命令

1、上傳文件到hdfs中

hadoop fs -put /本地文件  /aaa

 

2、下載文件到客戶端本地磁盤

hadoop fs -get /hdfs中的路徑   /本地磁盤目錄

 

3、在hdfs中創建文件夾

hadoop fs -mkdir  -p /aaa/xxx

 

4、移動hdfs中的文件(更名)

hadoop fs -mv /hdfs的路徑1  /hdfs的另一個路徑2

 

復制hdfs中的文件到hdfs的另一個目錄

hadoop fs -cp /hdfs路徑_1  /hdfs路徑_2

 

5、刪除hdfs中的文件或文件夾

hadoop fs -rm -r /aaa

 

6、查看hdfs中的文本文件內容

hadoop fs -cat /demo.txt

hadoop fs -tail /demo.txt

hadoop fs -tail -f /demo.txt

hadoop fs -text /demo.txt

 

7、查看hdfs目錄下有哪些文件

hadoop fs –ls /

 

8、追加本地文件到hdfs中的文件

hadoop fs -appendToFile 本地路徑 /hdfs路徑

 

9、權限修改

 hadoop fs -chmod username1:usergroup1 /hdfs路徑

要說明的是,hdfs中的用戶和用戶組這是一個名字稱呼,與linux不一樣,linux中不能將選線分配給一個不存在的用戶。

 

 可以查看hadoop fs 不帶任何參數,來查看hdfs所支持的命令

Usage: hadoop fs [generic options]
        [-appendToFile <localsrc> ... <dst>]
        [-cat [-ignoreCrc] <src> ...]
        [-checksum <src> ...]
        [-chgrp [-R] GROUP PATH...]
        [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
        [-chown [-R] [OWNER][:[GROUP]] PATH...]
        [-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
        [-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
        [-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...]
        [-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
        [-createSnapshot <snapshotDir> [<snapshotName>]]
        [-deleteSnapshot <snapshotDir> <snapshotName>]
        [-df [-h] [<path> ...]]
        [-du [-s] [-h] [-x] <path> ...]
        [-expunge]
        [-find <path> ... <expression> ...]
        [-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
        [-getfacl [-R] <path>]
        [-getfattr [-R] {-n name | -d} [-e en] <path>]
        [-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
        [-help [cmd ...]]
        [-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]]
        [-mkdir [-p] <path> ...]
        [-moveFromLocal <localsrc> ... <dst>]
        [-moveToLocal <src> <localdst>]
        [-mv <src> ... <dst>]
        [-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
        [-renameSnapshot <snapshotDir> <oldName> <newName>]
        [-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
        [-rmdir [--ignore-fail-on-non-empty] <dir> ...]
        [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
        [-setfattr {-n name [-v value] | -x name} <path>]
        [-setrep [-R] [-w] <rep> <path> ...]
        [-stat [format] <path> ...]
        [-tail [-f] <file>]
        [-test -[defsz] <path>]
        [-text [-ignoreCrc] <src> ...]
        [-touchz <path> ...]
        [-truncate [-w] <length> <path> ...]
        [-usage [cmd ...]]

 

 

6、hdfs的java客戶端編程

HDFS客戶端編程應用場景:數據采集

業務系統中日志生成機制

數據采集程序其實就是通過對java客戶端編程,將數據不斷的上傳到hdfs。

在windows開發環境中做一些准備工作:

1、在windows的某個路徑中解壓一份windows版本的hadoop安裝包

2、將解壓出的hadoop目錄配置到windows的環境變量中:HADOOP_HOME

原因:若不配置環境變量,會在下載hdfs文件是出錯,是由於使用hadoop的FileSystem保存文件到本地的時候出於效率的考慮,會使用hadoop安裝包中的c語言庫,顯然沒有配置hadoop環境變量時是找不到該c語言類庫中的文件的;然而上傳文件到hdfs沒有類似問題;

 

 6.1、核心代碼

1、將hdfs客戶端開發所需的jar導入工程(jar包可在hadoop安裝包中找到common和hdfs)

2、寫代碼

6.1.1、獲取hdfs客戶端

要點:要對hdfs中的文件進行操作,代碼中首先需要獲得一個hdfs的客戶端對象

Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");

 

 完整代碼如下:

/**
         * Configuration參數對象的機制:
         *    構造時,會加載jar包中的默認配置 xx-default.xml(core-default.xmlhdfs-default.xml)
         *    再加載 用戶配置xx-site.xml  ,覆蓋掉默認參數
         *    構造完成之后,還可以conf.set("p","v"),會再次覆蓋用戶配置文件中的參數值
         */
        // new Configuration()會從項目的classpath中加載core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件
        Configuration conf = new Configuration();
        
        // 指定本客戶端上傳文件到hdfs時需要保存的副本數為:2
        conf.set("dfs.replication", "2");
        // 指定本客戶端上傳文件到hdfs時切塊的規格大小:64M
        conf.set("dfs.blocksize", "64m");
        
        // 構造一個訪問指定HDFS系統的客戶端對象: 參數1:——HDFS系統的URI,參數2:——客戶端要特別指定的參數,參數3:客戶端的身份(用戶名)
        FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root");
        
        // 上傳一個文件到HDFS中
        fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/"));
        
        fs.close();
View Code

 6.1.2、對文件進行操作

上傳、下載文件;文件夾的創建和刪除、文件的移動和復制、查看文件夾和文件等。

3、利用fs對象的方法進行文件操作

方法均與命令行方法對應,比如:

上傳文件

fs.copyFromLocalFile(new Path("本地路徑"),new Path("hdfs的路徑"));

 

下載文件

fs.copyToLocalFile(new Path("hdfs的路徑"),new Path("本地路徑"))

 對文件的增刪改查如下,對文件數據的操作后續介紹。

FileSystem fs = null;
    
    @Before
    public void init() throws Exception{
        Configuration conf = new Configuration();
        conf.set("dfs.replication", "2");
        conf.set("dfs.blocksize", "64m");
        
        fs = FileSystem.get(new URI("hdfs://hdp-01:9000/"), conf, "root");
        
    }
    
    
    /**
     * 從HDFS中下載文件到客戶端本地磁盤
     * @throws IOException 
     * @throws IllegalArgumentException 
     */
    @Test
    public void testGet() throws IllegalArgumentException, IOException{
        
        fs.copyToLocalFile(new Path("/hdp20-05.txt"), new Path("f:/"));
        fs.close();
        
    }
    
    
    /**
     * 在hdfs內部移動文件\修改名稱
     */
    @Test
    public void testRename() throws Exception{
        
        fs.rename(new Path("/install.log"), new Path("/aaa/in.log"));
        
        fs.close();
        
    }
    
    /**
     * 在hdfs中創建文件夾
     */
    @Test
    public void testMkdir() throws Exception{
        
        fs.mkdirs(new Path("/xx/yy/zz"));
        
        fs.close();
    }
    
    
    /**
     * 在hdfs中刪除文件或文件夾
     */
    @Test
    public void testRm() throws Exception{
        
        fs.delete(new Path("/aaa"), true);
        
        fs.close();
    }
    
    
    
    /**
     * 查詢hdfs指定目錄下的文件信息
     */
    @Test
    public void testLs() throws Exception{
        // 只查詢文件的信息,不返回文件夾的信息
        RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true);
        
        while(iter.hasNext()){
            LocatedFileStatus status = iter.next();
            System.out.println("文件全路徑:"+status.getPath());
            System.out.println("塊大小:"+status.getBlockSize());
            System.out.println("文件長度:"+status.getLen());
            System.out.println("副本數量:"+status.getReplication());
            System.out.println("塊信息:"+Arrays.toString(status.getBlockLocations()));
            
            System.out.println("--------------------------------");
        }
        fs.close();
    }
    
    /**
     * 查詢hdfs指定目錄下的文件和文件夾信息
     */
    @Test
    public void testLs2() throws Exception{
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        
        for(FileStatus status:listStatus){
            System.out.println("文件全路徑:"+status.getPath());
            System.out.println(status.isDirectory()?"這是文件夾":"這是文件");
            System.out.println("塊大小:"+status.getBlockSize());
            System.out.println("文件長度:"+status.getLen());
            System.out.println("副本數量:"+status.getReplication());
            
            System.out.println("--------------------------------");
        }
        fs.close();
    }
View Code

 6.1.3、對文件數據進行操作

 同過客戶端使用open打開流對象來讀取hdfs中文件的具體數據,包括指定偏移量來讀取特定范圍的數據;通過客戶端向hdfs文件追加數據。

/**
     * 讀取hdfs中的文件的內容
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testReadData() throws IllegalArgumentException, IOException {

        FSDataInputStream in = fs.open(new Path("/test.txt"));

        BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));

        String line = null;
        while ((line = br.readLine()) != null) {
            System.out.println(line);
        }

        br.close();
        in.close();
        fs.close();

    }

    /**
     * 讀取hdfs中文件的指定偏移量范圍的內容
     * 
     * 
     * 作業題:用本例中的知識,實現讀取一個文本文件中的指定BLOCK塊中的所有數據
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testRandomReadData() throws IllegalArgumentException, IOException {

        FSDataInputStream in = fs.open(new Path("/xx.dat"));

        // 將讀取的起始位置進行指定
        in.seek(12);

        // 讀16個字節
        byte[] buf = new byte[16];
        in.read(buf);

        System.out.println(new String(buf));

        in.close();
        fs.close();

    }
View Code

 

 寫數據,create提供了豐富的重載函數,輕松實現覆蓋,追加,以及指定緩存大小,副本數量等等信息。

/**
     * 往hdfs中的文件寫內容
     * 
     * @throws IOException
     * @throws IllegalArgumentException
     */

    @Test
    public void testWriteData() throws IllegalArgumentException, IOException {

        FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);

        // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg

        FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");

        byte[] buf = new byte[1024];
        int read = 0;
        while ((read = in.read(buf)) != -1) {
            out.write(buf,0,read);
        }
        
        in.close();
        out.close();
        fs.close();

    }
View Code

 

7、HDFS實例

hdfs版本wordcount程序。

任務描述:

1、從hdfs文件中讀取數據,每次讀取一行數據;

2、將數據交給具體的單詞統計業務去作業(使用面向接口編程,當業務邏輯改變時,無需修改主程序代碼);

3、並將該行數據產生的結果存入緩存中(可以用hashmap模擬)

數據采集設計:

1、流程
啟動一個定時任務:
——定時探測日志源目錄
——獲取需要采集的文件
——移動這些文件到一個待上傳臨時目錄
——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄

啟動一個定時任務:
——探測備份目錄中的備份數據,檢查是否已超出最長備份時長,如果超出,則刪除


2、規划各種路徑
日志源路徑: d:/logs/accesslog/
待上傳臨時目錄: d:/logs/toupload/
備份目錄: d:/logs/backup/日期/

HDFS存儲路徑: /logs/日期
HDFS中的文件的前綴:access_log_
HDFS中的文件的后綴:.log

將路徑配置寫入屬性文件

MAPPER_CLASS=cn.edu360.hdfs.wordcount.CaseIgnorWcountMapper
INPUT_PATH=/wordcount/input
OUTPUT_PATH=/wordcount/output2
View Code

 

主程序代碼示例:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

public class HdfsWordcount {
    
    public static void main(String[] args) throws Exception{
        
        /**
         * 初始化工作
         */
        Properties props = new Properties();
        props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties"));
        
        Path input = new Path(props.getProperty("INPUT_PATH"));
        Path output = new Path(props.getProperty("OUTPUT_PATH"));
        
        
        
        Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS"));
        Mapper mapper = (Mapper) mapper_class.newInstance();
        
        Context context  =  new Context();
        
        /**
         * 處理數據
         */
        
        FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root");
        RemoteIterator<LocatedFileStatus> iter = fs.listFiles(input, false);
        
        while(iter.hasNext()){
            LocatedFileStatus file = iter.next();
            FSDataInputStream in = fs.open(file.getPath());
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            String line = null;
            // 逐行讀取
            while ((line = br.readLine()) != null) {
                // 調用一個方法對每一行進行業務處理
                mapper.map(line, context);
                
            }
            
            br.close();
            in.close();
            
        }
        
    
        
        
        
        /**
         * 輸出結果
         */
        HashMap<Object, Object> contextMap = context.getContextMap();
        
        if(fs.exists(output)){
            throw new RuntimeException("指定的輸出目錄已存在,請更換......!");
        }
        
        
        FSDataOutputStream out = fs.create(new Path(output,new Path("res.dat")));
        
        Set<Entry<Object, Object>> entrySet = contextMap.entrySet();
        for (Entry<Object, Object> entry : entrySet) {
            out.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes());
        }
        
        out.close();
        
        fs.close();
        
        
        System.out.println("恭喜!數據統計完成.....");
        
    }
    
    
    

}
View Code

 

自定義的業務接口

public interface Mapper {
    
    
    public void map(String line,Context context);
    

}
View Code

 

業務實現類1

public class WordCountMapper implements Mapper{

    @Override
    public void map(String line, Context context) {
        
        String[] words = line.split(" ");
        
        for (String word : words) {
            
            Object value = context.get(word);
            if(null==value){
                context.write(word, 1);
                
            }else{
                int v = (int)value;
                context.write(word, v+1);
            }    
            
        }
    }
}
View Code

 

業務實現類2

public class CaseIgnorWcountMapper implements Mapper {

    @Override
    public void map(String line, Context context) {

        String[] words = line.toUpperCase().split(" ");

        for (String word : words) {

            Object value = context.get(word);
            if (null == value) {
                context.write(word, 1);

            } else {
                int v = (int) value;
                context.write(word, v + 1);
            }
        }
    }
}
View Code

 

緩存模擬

import java.util.HashMap;

public class Context {
    
    private HashMap<Object,Object> contextMap = new HashMap<>();
    
    public void write(Object key,Object value){
        
        contextMap.put(key, value);
        
    }
    
    public Object get(Object key){
        
        return contextMap.get(key);
        
    }
    
    public HashMap<Object,Object> getContextMap(){
        return contextMap;
    }
}
View Code

 

8、實戰描述

 需求描述:

在業務系統的服務器上,業務程序會不斷生成業務日志(比如網站的頁面訪問日志)

業務日志是用log4j生成的,會不斷地切出日志文件

需要定期(比如每小時)從業務服務器上的日志目錄中,探測需要采集的日志文件(access.log,不是直接采集數據),發往HDFS

 

注意點:業務服務器可能有多台(hdfs上的文件名不能直接用日志服務器上的文件名)

當天采集到的日志要放在hdfs的當天目錄中

采集完成的日志文件,需要移動到到日志服務器的一個備份目錄中

定期檢查(一小時檢查一次)備份目錄,將備份時長超出24小時的日志文件清除

Timer timer = new Timer()

timer.schedual()

 簡易版日志采集主程序

import java.util.Timer;

public class DataCollectMain {
    
    public static void main(String[] args) {
        
        Timer timer = new Timer();

        timer.schedule(new CollectTask(), 0, 60*60*1000L);
        
        timer.schedule(new BackupCleanTask(), 0, 60*60*1000L);
        
    }
}
View Code

 

日志收集定時任務類

import java.io.File;
import java.io.FilenameFilter;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.TimerTask;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

public class CollectTask extends TimerTask {

    @Override
    public void run() {

        /**
         * ——定時探測日志源目錄 ——獲取需要采集的文件 ——移動這些文件到一個待上傳臨時目錄
         * ——遍歷待上傳目錄中各文件,逐一傳輸到HDFS的目標路徑,同時將傳輸完成的文件移動到備份目錄
         * 
         */
        try {
            // 獲取配置參數
            Properties props = PropertyHolderLazy.getProps();

            // 構造一個log4j日志對象
            Logger logger = Logger.getLogger("logRollingFile");

            // 獲取本次采集時的日期
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
            String day = sdf.format(new Date());

            File srcDir = new File(props.getProperty(Constants.LOG_SOURCE_DIR));

            // 列出日志源目錄中需要采集的文件
            File[] listFiles = srcDir.listFiles(new FilenameFilter() {

                @Override
                public boolean accept(File dir, String name) {
                    if (name.startsWith(props.getProperty(Constants.LOG_LEGAL_PREFIX))) {
                        return true;
                    }
                    return false;
                }
            });

            // 記錄日志
            logger.info("探測到如下文件需要采集:" + Arrays.toString(listFiles));

            // 將要采集的文件移動到待上傳臨時目錄
            File toUploadDir = new File(props.getProperty(Constants.LOG_TOUPLOAD_DIR));
            for (File file : listFiles) {
                FileUtils.moveFileToDirectory(file, toUploadDir, true);
            }

            // 記錄日志
            logger.info("上述文件移動到了待上傳目錄" + toUploadDir.getAbsolutePath());

            // 構造一個HDFS的客戶端對象

            FileSystem fs = FileSystem.get(new URI(props.getProperty(Constants.HDFS_URI)), new Configuration(), "root");
            File[] toUploadFiles = toUploadDir.listFiles();

            // 檢查HDFS中的日期目錄是否存在,如果不存在,則創建
            Path hdfsDestPath = new Path(props.getProperty(Constants.HDFS_DEST_BASE_DIR) + day);
            if (!fs.exists(hdfsDestPath)) {
                fs.mkdirs(hdfsDestPath);
            }

            // 檢查本地的備份目錄是否存在,如果不存在,則創建
            File backupDir = new File(props.getProperty(Constants.LOG_BACKUP_BASE_DIR) + day + "/");
            if (!backupDir.exists()) {
                backupDir.mkdirs();
            }

            for (File file : toUploadFiles) {
                // 傳輸文件到HDFS並改名access_log_
                Path destPath = new Path(hdfsDestPath + "/" + UUID.randomUUID() + props.getProperty(Constants.HDFS_FILE_SUFFIX));
                fs.copyFromLocalFile(new Path(file.getAbsolutePath()), destPath);

                // 記錄日志
                logger.info("文件傳輸到HDFS完成:" + file.getAbsolutePath() + "-->" + destPath);

                // 將傳輸完成的文件移動到備份目錄
                FileUtils.moveFileToDirectory(file, backupDir, true);

                // 記錄日志
                logger.info("文件備份完成:" + file.getAbsolutePath() + "-->" + backupDir);

            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
View Code

 

定期清理過時備份日志

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;

import org.apache.commons.io.FileUtils;

public class BackupCleanTask extends TimerTask {

    @Override
    public void run() {

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
        long now = new Date().getTime();
        try {
            // 探測本地備份目錄
            File backupBaseDir = new File("d:/logs/backup/");
            File[] dayBackDir = backupBaseDir.listFiles();

            // 判斷備份日期子目錄是否已超24小時
            for (File dir : dayBackDir) {
                long time = sdf.parse(dir.getName()).getTime();
                if(now-time>24*60*60*1000L){
                    FileUtils.deleteDirectory(dir);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
View Code

 

配置信息提取到屬性配置文件中,並寫成常量,以單例設計模式去加載配置信息。

LOG_SOURCE_DIR=d:/logs/accesslog/
LOG_TOUPLOAD_DIR=d:/logs/toupload/
LOG_BACKUP_BASE_DIR=d:/logs/backup/
LOG_BACKUP_TIMEOUT=24
LOG_LEGAL_PREFIX=access.log.

HDFS_URI=hdfs://hdp-01:9000/
HDFS_DEST_BASE_DIR=/logs/
HDFS_FILE_PREFIX=access_log_
HDFS_FILE_SUFFIX=.log
View Code
public class Constants {

    /**
     * 日志源目錄參數key
     */
    public static final String LOG_SOURCE_DIR = "LOG_SOURCE_DIR";
    
    /**
     * 日志待上傳目錄參數key
     */
    public static final String LOG_TOUPLOAD_DIR = "LOG_TOUPLOAD_DIR";
    
    
    public static final String LOG_BACKUP_BASE_DIR = "LOG_BACKUP_BASE_DIR";
    
    
    public static final String LOG_BACKUP_TIMEOUT = "LOG_BACKUP_TIMEOUT";
    
    
    public static final String LOG_LEGAL_PREFIX = "LOG_LEGAL_PREFIX";
    
    
    public static final String HDFS_URI = "HDFS_URI";
    
    
    public static final String HDFS_DEST_BASE_DIR = "HDFS_DEST_BASE_DIR";
    
    
    public static final String HDFS_FILE_PREFIX = "HDFS_FILE_PREFIX";
    
    
    public static final String HDFS_FILE_SUFFIX = "HDFS_FILE_SUFFIX";

}
View Code
import java.util.Properties;

/**
 * 單例模式:懶漢式——考慮了線程安全
 * @author ThinkPad
 *
 */

public class PropertyHolderLazy {

    private static Properties prop = null;

    public static Properties getProps() throws Exception {
        if (prop == null) {
            synchronized (PropertyHolderLazy.class) {
                if (prop == null) {
                    prop = new Properties();
                    prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties"));
                }
            }
        }
        return prop;
    }

}
View Code

 

import java.util.Properties;

/**
 * 單例設計模式,方式一: 餓漢式單例
 * @author ThinkPad
 *
 */
public class PropertyHolderHungery {

    private static Properties prop = new Properties();

    static {
        try {
            prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties"));
        } catch (Exception e) {

        }
    }

    public static Properties getProps() throws Exception {

        return prop;
    }

}
View Code

9、總結

hdfs有服務端和客戶端;

服務端:

  成員:namenode 管理元數據,datanode存儲數據

  配置:需要指定使用的文件系統(默認的配置在core-default.xml,為本地文件系統,需要修改服務器core-site.xml修改為hdfs文件系統,並指定namenode),namenode和datanode的工作目錄(服務器的默認配置在hdfs-default.xml中,默認是在/temp下,需要就該hdfs-site.xml來覆蓋默認值。);

  細節:第一次啟動集群時需要格式化namenode

客戶端:

  形式:網頁端,命令行,java客戶端api;客戶端可以運行在任何地方。

  功能:指定上傳文件的信息報括切塊大小(hdfs-default.xml中默認值128m,可以在hdfs-site.xml中修改,也可以咋java api 中創建客戶端對象的時候指定,總之由客戶端來指定),副本數量(hdfs-default.xml中默認值3,同樣可以修改覆蓋);完成hdfs中文件的系列操作,如上傳,下載

雖然服務端和客戶端的共用配置 core-default.xml core-site.xml;hdfs-default.xml hdfs-site.xml,但是不同的程序所需要的參數不同,只不過為了方便,所有參數都寫在一個文件中了。即是在服務器的hdfs-site.xml中配置了切塊大小和副本數量,服務器的namenode和datanode根本不關心也不使用這些參數,只有啟動服務器上的命令行客戶端時,該參數才可能起作用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM