hadoop--大數據生態圈中最基礎、最重要的組件


hadoop是什么?

hadoop是一個由Apache基金會所開發的分布式系統基礎架構,hdfs分布式文件存儲、MapReduce並行計算。主要是用來解決海量數據的存儲和海量數據的分析計算問題,這是狹義上的hadoop。廣義上來講,hadoop通常指的是一個更廣泛的概念--hadoop生態圈

hadoop三大發行版本

hadoop三大發型版本:Apache、Cloudera、Hortonworks

  • Apache版本,也成為社區版,是最原始的版本,對入門學習較好
  • Cloudera版本在大型互聯網企業中用的較多,是對社區版進行了封裝,主要是解決了和其它大數據組件(比如:hive)的兼容性問題。但是出了問題幫你解決要收費
  • Hortonworks版本文檔較好

公司一般更常用的是Cloudera版本的hadoop,另外Cloudera版本的hadoop也簡稱為CDH。

hadoop優勢

1)高可靠性:hadoop底層維護多個數據副本,所以即使某個機器出現故障,也不會導致數據的丟失。

比如我現在有1G的數據,要存在5台機器上,hadoop默認會先將整個數據進行切割,默認是128M/塊,當然這個數值也可以自己改。然后是以三副本進行存儲,也就是說每個128M的塊,都會被存儲三份在不同的機器上。這樣即使一台服務器宕機了,數據也不會丟失。

2)高擴展性:在集群間分配任務數據,可方便的擴展數以千計的節點。很好理解,如果容量不夠了,直接橫向擴展,加機器就行。

3)高效性:在MapReduce的思想下,hadoop是並行工作的,以加快任務處理速度。實際上如果學了spark,會發現hadoop自己所描述的易用性、高效性實在是不敢恭維哈。但是hadoop作為大數據生態圈中非常重要的組件,我們是有必要學好的,而且學了hadoop之后,再學spark會輕松很多。而且學習了hadoop,再學spark也會明白為什么spark會比hadoop在效率上高出幾十倍、甚至上百倍。

4)高容錯性:能夠自動將失敗的任務重新分配,如果某台機器掛掉了,那么會自動將任務分配到其他的機器上執行

hadoop 1.x和hadoop 2.x的區別

hadoop組成

hadoop的組成上面已經說了,主要由四部分組成,但是哪個common,我們一般不用管。因此從下往上只需要關注,hdfsyarnMapReduce即可。

hdfs

hdfs:hadoop distributed file system,hadoop分布式文件系統,它由哪幾部分組成呢?

1.NameNode(nn):存儲文件的元數據,如文件名,文件目錄結構,文件屬性(生成時間、副本數、文件權限),以及每個文件的塊列表和塊所在的DataNode等等。

2.DataNode(dn):真正用來存儲文件塊數據以及塊數據的校驗和,之前說了大文件是要分成多塊的,每一塊存在不同的DataNode節點(說白了就是服務器、或者電腦)上,每一個節點存儲了哪些文件、以及文件被切分了幾份都存在哪些DataNode上、文件名、屬性等等,這些都叫做元數據,統一交給NameNode管理,而DataNode只負責真正的存儲數據。

3. Secondary NameNode(2nn):用來監控hdfs狀態的后台輔助程序,每個一段時間獲取hdfs元數據的快照。

yarn

圖中有一個Resource Manager和三個Node Manager,相當於有四個節點。

Resource Manager

1.處理客戶端請求。客戶端想訪問集群,比如提交一個作業,要經過Resource Manager,它是整個資源的管理者,管理整個集群的CPU、內存、磁盤

2.監控Node Manager

3.啟動或監控Application Master

4.資源的分配和調度

Node Manager

1.管理單個節點上的資源,Node Manager是當前節點資源的管理者,當然需要跟Resource Manager匯報

2.處理來自Resource Manager的命令

3.處理來自Application Master的命令

Application Master

1.某個任務的管理者。當任務在Node Manager上運行的時候,就是由Application Master負責管理

2.負責數據的切分

3.為應用程序申請資源並分配給內部的任務

4.任務的監控與容錯

Container

Container是yarn中資源的抽象,它封裝了節點上的多維度資源,如內存、CPU、磁盤、網絡等等。

其實Container是為Application Master服務的,因為任務在運行的時候,是不是需要內存、cpu,這些資源都被虛擬化到Container里面了。

MapReduce

1. map階段並行處理輸入數據

2. reduce階段對map結果進行匯總

幾張圖讓你理解hdfs工作原理(細節內容后面介紹,先看幾張漫畫感受一下)

大數據技術生態體系

安裝

下面我們來配置一下環境,我的所有大數據組件都安裝在/opt目錄下面

hadoop是java語言編寫的,因此需要安裝jdk,我這里已經安裝了,安裝jdk非常簡單,可以自行查找方法,如果還不會的話,那么hadoop也可以不用學了。

下面安裝hadoop,這個安裝java一樣簡單。這里我們使用社區版,直接去hadoop.apache.org網站下載即可,然后拷貝到我的阿里雲、解壓、配置環境變量即可。

然后在家目錄中輸入hdfs dfs,如果出現如下內容,證明安裝並且環境變量配置都沒問題

hadoop目錄結構

hadoop目錄結構如下,我們依次來看。

bin目錄

bin目錄主要放一些有關服務的文件,比如hadoop、hdfs、yarn等等

etc目錄

里面是有一個hadoop目錄,但是hadoop目錄里面有很多配置文件,未來我們會改大概七八個左右,當然不用怕

include目錄

這是與C語言有關的一些頭文件,我們不用管

lib目錄

一些本地庫,.so文件,相當於windows的.dll文件,這個不需要關注

libexec目錄

和lib目錄類似

sbin目錄

非常重要的一個目錄,存放了大量的啟動文件,比如啟動、關閉集群,啟動、關閉yarn等等

share目錄

存放了一些手冊、案例等等

偽分布式環境搭建以及配置、啟動hdfs

hadoop的運行模式有三種,單機模式、偽分布式、完全分布式。

  • 單機模式:基本不用,不用管
  • 偽分布式:按照完全分布式來進行搭建、配置,但是機器只有一台
  • 完全分布式:真正意義上的多台機器

下面就來搭建偽分布式環境,首先要修改配置文件,之前我們說要修改七八個,但是目前先配三個就行,慢慢來,所有的配置文件都在hadoop安裝目錄/etc/hadoop下面

hadoop-env.sh

export JAVA_HOME=/opt/java/jdk1.8.0_221(默認是${JAVA_HOME},需要手動改成java的安裝路徑)

core-site.xml

<!--指定hdfs中NameNode的地址-->
<!--這里之所以是localhost,是因為我們只有一台機器,多台機器就要換成相應的主機名-->
<property>
	<name>fs.defaultFS</name>
	<value>hdfs://localhost:9000</value>
</property>

<!--指定hadoop運行時產生文件的存儲目錄,如果不指定,那么重啟之后就丟失了-->
<!--data目錄會自動創建-->
<property>
	<name>hadoop.tmp.dir</name>
	<value>/opt/hadoop/hadoop-2.7.7/data</value>
</property>

所有更改完的配置,都放在configuration標簽里面

hdfs-site.xml

<!--指定hdfs副本的數量,默認是3,我們是偽分布式,所以改成1-->
<property>
	<name>dfs.replication</name>
	<value>1</value>
</property>

配置完畢,下面啟動集群(偽)

格式化namenode(第一次啟動時格式化,以后不需要總格式化)

hdfs namenode -format

查看之前的data目錄,自動幫我們創建了,而且里面也有東西了

啟動namenode

如果把sbin目錄也配置了環境變量,那么sbin/也不需要加
sbin/hadoop-daemon.sh start namenode

啟動datanode

sbin/hadoop-daemon.sh start datanode

查看集群是否啟動成功

輸入jps

注意jps是java的一個命令,必須安裝jdk之后才可使用,否則會提示命令未找到,出現如下內容表示安裝成功

輸入ip:50070

如果能訪問,也表示安裝成功

NameNode格式化注意事項

思考:為什么不能一直格式化NameNode,格式化NameNode需要注意什么?

格式化NameNode會產生新的集群id,導致NameNode和DataNode的集群id不一致,集群找不到以往的數據。所以格式化NameNode的時候,務必要先刪除data數據和log日志,然后再格式化NameNode。因為兩者需要有一個共同的id,這樣才能交互。

配置、啟動yarn

老規矩,先修改配置文件

yarn-env.sh

和hadoop-env.sh一樣,配置java的路徑
加上export JAVA_HOME=/opt/java/jdk1.8.0_221

yarn-site.xml

<!--reducer獲取數據的方式-->
<property>
	<name>yarn.nodemanager.aux-services</name>
	<value>mapreduce_shuffle</value>
</property>

<!--指定yarn的ResourceManager的地址-->
<property>
	<name>yarn.resourcemanager.hostname</name>
	<value>主機名</value>
</property>

mapred-env.sh

老規矩,遇到env.sh都是配JAVA_HOME

mapred-site.xml

但是會發現沒有這個文件,不過有一個mapred-site.xml.template
可以cp mapred-site.xml.template mapred-site.xml

<!--指定MR運行在yarn上-->
<property>
	<name>mapreduce.framework.name</name>
	<value>yarn</value>
</property>

修改完畢,下面啟動集群

yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager

通過網頁查看一下,輸入ip:8088,我們之前好像輸入過50070,那個是查看hdfs的,8088是查看運行MapReduce程序的進程的。

配置運行歷史服務

有時候我們想看一下程序的歷史運行情況,那么我們可以配置一下。

插一句,目前一直在搭建、配置環境,當然后面會詳細介紹hdfs、MapReduce的相關操作,目前先把環境打好。

當然如果對運行歷史不感興趣的話就跳過,當然最好還是關注一下。

mapred-site.xml

<!--還是這個配置文件,指定歷史服務端地址-->
<property>
	<name>mapreduce.jobhistory.address</name>
	<value>主機名:10020</value>
</property>

<!--歷史web端地址-->
<property>
	<name>mapreduce.jobhistory.webapp.address</name>
	<value>主機名:19888</value>
</property>

啟動歷史服務器:sbin/mr-jobhistory-daemon.sh start historyserver

jps查看歷史服務器,或者http://ip:19888/jobhistory

hdfs介紹

下面就開始着重講解hadoop組件之一的hdfs,之前的幾張漫畫只是提前感受一下。還有目前我們搭建的都是偽分布式,至於完全分布式,由於我阿里雲機器只有一台所以就不演示怎么搭建了。最主要的是筆者是python和golang系的,集群的搭建什么的應該用不到,環境什么的會有其他人負責,我只負責連接、計算什么的。后面也會介紹如何使用python和golang連接hdfs,所以完全分布式如何搭建,這里就不介紹了。

我們下面的演示都是基於偽分布式的

hdfs產生背景

隨着數據量越來越大,在一台機器上無法存下所有的數據,那么就分配到更多的機器上,但是不方便管理和維護,因此迫切需要一種系統來管理多台機器上的文件,這就是分布式文件管理系統。hdfs只是分布式文件管理系統中的一種

hdfs定義

hdfs(hadoop distributed file system),它是一個文件系統,用於存儲文件,通過目錄樹來定位文件;其次它是分布式的,由很多服務器聯合起來實現其功能,集群中的服務器有各自的角色

hdfs的使用場景

適合一次寫入,多次讀出的場景,且不支持文件的修改。適合用來做數據分析,並不適合做網盤應用。

hdfs的優缺點

優點:

  • 高容錯性
    • 數據自動保存多個副本,它通過增加副本的形式,提高容錯性
    • 某一個副本丟失后,它可以自動恢復
  • 適合處理大數據
    • 數據規模:能夠處理TB、甚至PB級別的數據
    • 文件規模:能夠處理百萬規模以上的文件數量,數量相當之大
  • 可構建在廉價的機器之上,通過多副本機制,提高可靠性

缺點:

  • 不適合低延時數據訪問,如果你想做到毫秒級存儲,別想了,做不到的

  • 無法高效地對大量小文件進行存儲,存一個1G的數據比存10個100MB加上一個24MB的數據要高效很多

    至於為什么,因為NameNode是不是唯一的啊,這就意味着空間是有限的,不可能像DataNode一樣,容量不夠了就加機器。而NameNode要記錄文件的元數據,不管你是1KB,還是1GB,都需要150字節的空間進行記錄,如果全是小文件的話,是不是很耗費NameNode所在機器的空間呢?

    而且小文件存儲的尋址時間會超過讀取時間,它違反了hdfs的設計目標。

  • 不支持並發寫入、文件隨機修改

    一個文件只能有一個寫,不允許多個線程同時寫

    僅支持數據的append,不支持文件的隨機修改

hdfs的架構

先看官網給的一張圖

NameNode(nn):就是master,它是一個主管、管理者

  • 管理hdfs的名稱空間
  • 配置副本策略
  • 管理數據塊(block)映射信息
  • 處理客戶端讀寫請求

DataNode(nn):就是slave,NameNode下達命令,DataNode執行實際的操作

  • 存儲實際的數據塊
  • 執行數據塊的讀/寫操作

client:就是客戶端

  • 文件切分,文件上傳到hdfs的時候,client將文件切分成一個個的block,然后上傳
  • 與NameNode交互,獲取文件的位置信息
  • 與DataNode交互,讀取或者寫入數據
  • 客戶端提供一些命令來管理hdfs,比如NameNode的格式化
  • 客戶端可以通過一些命令來訪問hdfs,比如對hdfs的增刪改查操作

secondary NameNode:它不是NameNode的替補,當NameNode掛掉時,並不能馬上替換NameNode並提供服務。

  • 輔助NameNode,分擔其工作量,比如定期合並Fsimage和Edits,並推送給NameNode
  • 緊急情況下,可輔助恢復NameNode(可以恢復一部分)

強烈建議結合之前的漫畫來理解,會有更好的體驗

hdfs塊的大小設置

hdfs中的文件在物理上是分塊存儲(block),塊的大小可以通過配置參數(df.blocksize)指定,默認大小在hadoop 2.x版本中是128M,老版本中是64M

思考:為什么塊不能設置太小,也不能設置太大?

1.hdfs塊設置太小,會增加尋址時間,程序一直在找塊的開始位置

2.hdfs塊設置太大,從磁盤傳輸的時間會明顯大於定位這個塊的開始位置所需要的時間。導致程序在處理這塊數據時,會非常慢。

總結:hdfs塊的大小設置主要取決於磁盤的傳輸速率

hdfs shell命令(重點)

基本語法:hdfs dfs 命令 參數

這個hdfs的shell命令和linux是非常類似的,比如查看某個目錄下的文件,linux中是ls,那么hdfs shell中就是hdfs dfs -ls,查看文件內容,hdfs dfs -cat filename,可以看到是非常相似的,只不過在hdfs shell中需要加上一個橫杠。另外hdfs dfs還可以寫成hadoop fs,對於shell操作來說兩者區別不大

啟動集群

sbin/start-dfs.sh sbin/start-yarn.sh

之前我們說過hadoop-daemon.sh和yarn-daemon.sh,那是對於偽分布式也就是單節點來說的,如果是啟動集群需要使用sbin/start-dfs.sh sbin/start-yarn.sh

hdfs dfs -help 命令

從linux命令也能看出來,這是一個查看命令使用方法的命令

hdfs dfs -ls 目錄路徑

查看某個目錄有哪些文件,加上-R表示遞歸顯示

hdfs dfs -mkdir 目錄

在hdfs上面創建目錄,加上-p表示遞歸創建,和linux是一樣的

hdfs dfs -moveFromLocal 本地路徑 hdfs路徑

將本地文件或目錄移動到hdfs上面,注意是移動,移完之后本地就沒了

hdfs dfs -cat 文件

查看一個文件的內容

hdfs dfs -appendToFile 追加的文件 追加到哪個文件

將一個文件的內容追加到另一個文件里面去,比如本地有一個file.txt,那么hdfs dfs -appendToFile file.txt /a.txt表示將本地的file.txt文件里面的內容追加到hdfs上的/a.txt文件里面去

-chgrp、-chmod、-chown

更改組、更改權限、更改所有者,這個和linux中用法一樣

hdfs dfs -copyFromLocal 本地路徑 hdfs路徑

將文件從本地拷貝到hdfs上面去,這個和剛才moveFromLocal就類似於linux中cp和mv

hdfs dfs -copyToLocal hdfs路徑 本地路徑

將hdfs上的文件拷貝到本地,這個路徑是hdfs路徑在前、本地路徑在后。

hdfs dfs -cp hdfs路徑 hdfs路徑

copyFromLocal是針對本地和hdfs來說了,cp是hdfs路徑和hdfs路徑之間的拷貝

hdfs dfs -mv hdfs路徑 hdfs路徑

不用說也能明白

hdfs dfs -get hdfs路徑 本地路徑

等同於copyToLocal

hdfs dfs -put 本地路徑 hdfs路徑

等同於copyFromLocal

hdfs dfs -getmerge hdfs路徑(通配符) 本地路徑

將hdfs上面的多個文件合並下載到本地

hdfs dfs -tail 文件名

顯示文件的結尾

hdfs dfs -rm 文件

刪除文件,如果是文件夾需要加上-r

hdfs dfs -rmdir 空目錄

刪除一個空目錄,不常用,一般使用-rm

hdfs dfs -du 目錄

統計目錄的大小信息

hdfs dfs -du -h /:加上-h人性化顯示

hdfs dfs -du -h -s / :查看當前目錄的總大小

hdfs dfs -setrep 數值 文件

設置文件的副本數量,hdfs dfs -setrep 5 /file.txt:表示將file.txt的副本設置成5

python連接hdfs進行相關操作

下面我們來介紹如何使用python操作hdfs,首先python若想操作hdfs,需要下載一個第三方庫,也叫hdfs,直接pip install hdfs即可。

import hdfs
from pprint import pprint

# 導入相關模塊,輸入http://ip:50070,創建客戶端
client = hdfs.Client("http://ip:50070")

client.list:查看當前目錄的內容

print(client.list("/"))  # ['黑色相簿.txt', 'a.txt', 'b.txt', 'test']

# status默認為False,表示是否顯示文件的相關屬性
# 返回數據的格式為:[("", {}), ("", {}), ("", {}), ...]
pprint(client.list("/", status=True))
"""
[('黑色相簿.txt',
  {'accessTime': 1570347361399,
   'blockSize': 134217728,
   'childrenNum': 0,
   'fileId': 16393,
   'group': 'supergroup',
   'length': 0,
   'modificationTime': 1570343722271,
   'owner': 'root',
   'pathSuffix': '黑色相簿.txt',
   'permission': '644',
   'replication': 1,
   'storagePolicy': 0,
   'type': 'FILE'}),
 ('a.txt',
  {'accessTime': 1570347222071,
   'blockSize': 134217728,
   'childrenNum': 0,
   'fileId': 16386,
   'group': 'supergroup',
   'length': 26,
   'modificationTime': 1570344172155,
   'owner': 'root',
   'pathSuffix': 'a.txt',
   'permission': '755',
   'replication': 1,
   'storagePolicy': 0,
   'type': 'FILE'}),
 ('b.txt',
  {'accessTime': 1570347263315,
   'blockSize': 134217728,
   'childrenNum': 0,
   'fileId': 16396,
   'group': 'supergroup',
   'length': 10,
   'modificationTime': 1570347263628,
   'owner': 'root',
   'pathSuffix': 'b.txt',
   'permission': '644',
   'replication': 1,
   'storagePolicy': 0,
   'type': 'FILE'}),
 ('test',
  {'accessTime': 0,
   'blockSize': 0,
   'childrenNum': 2,
   'fileId': 16387,
   'group': 'supergroup',
   'length': 0,
   'modificationTime': 1570346913737,
   'owner': 'root',
   'pathSuffix': 'test',
   'permission': '755',
   'replication': 0,
   'storagePolicy': 0,
   'type': 'DIRECTORY'})]
"""

client.status:獲取指定路徑的狀態信息

pprint(client.status("/"))
"""
{'accessTime': 0,
 'blockSize': 0,
 'childrenNum': 4,
 'fileId': 16385,
 'group': 'supergroup',
 'length': 0,
 'modificationTime': 1570347630036,
 'owner': 'root',
 'pathSuffix': '',
 'permission': '755',
 'replication': 0,
 'storagePolicy': 0,
 'type': 'DIRECTORY'}
"""

# 里面還有一個strict=True,表示嚴格模式
# 如果改為False,那么如果輸入的路徑不存在就返回None
# 為True的話,路徑不存在,報錯

client.makedirs:創建目錄

print(client.list("/"))  # ['黑色相簿.txt', 'a.txt', 'b.txt', 'test']
# 會自動遞歸創建,如果想創建的時候給目錄賦予權限,可以使用permission參數,默認為None
client.makedirs("/a/b/c", permission=777)
print(client.list("/"))  # ['黑色相簿.txt', 'a', 'a.txt', 'b.txt', 'test']
print(client.list("/a"))  # ['b']
print(client.list("/a/b"))  # ['c']

client.rename:重命名

print(client.list("/"))  # ['黑色相簿.txt', 'a', 'a.txt', 'b.txt', 'test']
client.rename("/黑色相簿.txt", "/白色相簿")
print(client.list("/"))  # ['白色相簿', 'a', 'a.txt', 'b.txt', 'test']

client.write:往文件里面寫內容

client.read:往文件里面讀內容

關於寫、讀、上傳、下載,如果報錯,出現了requests.exceptions.ConnectionError:xxxxx,那么解決辦法就是在你當前使用python的Windows機器上的hosts文件中增加如下內容:部署hadoop的服務器ip 部署hadoop的服務器主機名

如果出現了hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE,······異常,那么需要在hdfs-site.xml中加入如下內容

<property>
  <name>dfs.permissions</name>
  <value>false</value>
</property>

下面就開始寫數據、讀數據

with client.write("/這是一個不存在的文件.txt") as writer:
    # 需要傳入字節
    writer.write(bytes("this file not exists", encoding="utf-8"))


with client.read("/這是一個不存在的文件.txt") as reader:
    # 讀取出來也是字節類型
    print(reader.read())  # b'this file not exists'

write方法,如果不指定額外的參數,那么需要文件不能存在,否則會報錯,提示文件已經存在。如果要對已存在的文件進行操作,那么需要顯式的指定參數:overwrite(重寫)或者append(追加)

with client.write("/白色相簿", append=True) as writer:
    writer.write(bytes("令人討厭的冬天又來了,", encoding="utf-8"))


with client.read("/白色相簿") as reader:
    print(str(reader.read(), encoding="utf-8"))  # 令人討厭的冬天又來了,
with client.write("/白色相簿", append=True) as writer:
    writer.write(bytes("冬天的街道,戀人們的微笑,讓人想一把火全燒了", encoding="utf-8"))


with client.read("/白色相簿") as reader:
    # 由於是追加,之前的內容也讀取出來了
    print(str(reader.read(), encoding="utf-8"))  # 令人討厭的冬天又來了,冬天的街道,戀人們的微笑,讓人想一把火全燒了
with client.write("/白色相簿", overwrite=True) as writer:
    writer.write(bytes("暖かい日差しが降り注いできて、眩しすぎ、目が見えない", encoding="utf-8"))


with client.read("/白色相簿") as reader:
    # 如果是overwrite,那么之前的內容就全沒了
    print(str(reader.read(), encoding="utf-8"))  # 暖かい日差しが降り注いできて、眩しすぎ、目が見えない

注意:overwrite和append不能同時出現,否則報錯

# 如果write里面傳入了encoding參數,那么writer.write則需要寫入str,因為會自動按照傳入的encoding進行編碼
with client.write("/白色相簿", overwrite=True, encoding="utf-8") as writer:
    girls = ["古明地覺", "古明地戀", "八重櫻"]
    writer.write(str(girls))


# 同理如果傳入了encoding參數,reader.read會讀出str,因為會自動按照傳入的encoding進行解碼
with client.read("/白色相簿", encoding="utf-8") as reader:
    print(reader.read())  # ['古明地覺', '古明地戀', '八重櫻']

client.content:查看目錄的匯總情況

比如:當前目錄下有多少個子目錄、多少文件等等

print(client.content("/", strict=True))
# {'directoryCount': 6, 'fileCount': 6, 'length': 95, 'quota': 9223372036854775807, 'spaceConsumed': 95, 'spaceQuota': -1}

client.set_owner:設置所有者

client.set_permission:設置權限

client.set_replication:設置副本系數

client.set_times:設置時間

"""
def set_owner(self, hdfs_path, owner=None, group=None):
def set_permission(self, hdfs_path, permission):
def set_replication(self, hdfs_path, replication):
def set_times(self, hdfs_path, access_time=None, modification_time=None):
"""

client.resolve: 將帶有符號的路徑,轉換成絕對、規范化路徑

# 當然並不要求路徑真實存在
print(client.resolve("/白色相簿/白色相簿/.."))  # /白色相簿

client.walk:遞歸遍歷目錄

# 遞歸遍歷文件,類似於os.walk,會返回一個生成器,可以進行迭代
# 每一步迭代的內容是一個三元組,("路徑", ["目錄1", "目錄2"], ["文件1", "文件2", "文件3"])
for file in client.walk("/"):
    print(file)
"""
('/', ['a', 'test'], ['白色相簿', '這是一個不存在的文件.txt', 'a.txt', 'b.txt'])
('/a', ['b'], [])
('/a/b', ['c'], [])
('/a/b/c', [], [])
('/test', ['test1'], ['黑色相簿.txt'])
('/test/test1', [], ['b.txt'])
"""

client.upload:上傳文件

print("2.py" in client.list("/"))  # False
client.upload(hdfs_path="/", local_path="2.py")
print("2.py" in client.list("/"))  # True

client.download:下載文件

client.download(hdfs_path="/白色相簿", local_path="白色相簿")
print(open("白色相簿", "r", encoding="utf-8").read())  # ['古明地覺', '古明地戀', '八重櫻']

client.checksum:獲取文件的校驗和

# 獲取文件的校驗和
print(client.checksum("/白色相簿"))
# {'algorithm': 'MD5-of-0MD5-of-512CRC32C', 'bytes': '00000200000000000000000095b1c9929656ce2b779093c67c95b76000000000', 'length': 28}

client.delete:刪除文件或目錄

# recursive表示是否遞歸刪除,默認為False
try:
    client.delete("/test")
except Exception as e:
    print(e)  # `/test is non empty': Directory is not empty

print("test" in client.list("/"))  # True
client.delete("/test", recursive=True)
print("test" in client.list("/"))  # False

golang操作hdfs

golang連接hdfs同樣需要一個第三方驅動,golang連接hdfs的驅動推薦兩個,一個也叫hdfs,另一個叫gowfs。先來看看怎么安裝。

安裝hdfs稍微有點費勁,首先可以通過go get github.com/colinmarc/hdfs下載,因為總所周知的原因,不出意外會失敗,報出如下錯誤

那么我們需要先在gopath(第三方包安裝的位置,一般是進入C盤,點擊用戶,再點擊你的用戶名對應的目錄,然后就會看到一個go目錄,我這里是C:\Users\satori\go)的src目錄下新建golang.org/x目錄,然后進入到golang.org/x下,在此處打開命令窗口,然后執行git clone https://github.com/golang/crypto

然后再執行go get github.com/colinmarc/hdfs就沒問題了

至於安裝gowfs就簡單多了,直接go get github.com/vladimirvivien/gowfs就沒問題了

但是我們使用哪一個呢?個人推薦使用gowfs,下面我們就來看看怎么用。

讀取文件

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
	"io/ioutil"
)

func main() {
	//這是配置,傳入Addr: "ip: 50070", User: "隨便寫一個英文名就行"
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	//返回一個客戶端(這里面叫文件系統)和error
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:",err))
	}
	
	//這里不能直接傳入文件名,而是需要作為gowfs.Path結構體的Name參數的值
	//然后將Path傳進去,我們后面的api都是這樣做的
	path := gowfs.Path{Name:"/whitealbum.txt"}
	
	//接收如下參數:gowfs.Path,offset(偏移量),長度(顯然是字節的長度), 容量(自己的cap)
	//返回一個io.ReadCloser,這是需要實現io.Reader和io.Closer的接口
	reader, _ := client.Open(path, 0, 512, 2048)
	
	//可以使用reader.Read(buf)的方式循環讀取,也可以丟給ioutil。ReadAll,一次性全部讀取
	data, _ := ioutil.ReadAll(reader)
	fmt.Println(string(data))
	/*
	白色相簿什么的,已經無所謂了。
	因為已經不再有歌,值得去唱了。
	傳達不了的戀情,已經不需要了。
	因為已經不再有人,值得去愛了。
	 */
}

查看目錄有哪些內容

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	path := gowfs.Path{Name: "/"}
	// 返回[]FileStatus和error
	//這個FileStatus是什么?我們看一下源碼
	/*
		type FileStatus struct {
			AccesTime        int64  訪問時間
			BlockSize        int64  塊大小,只針對文件(134217728 Bytes,128 MB),目錄的話為0
			Group            string 所屬組
			Length           int64  文件的字節數(目錄為0)
			ModificationTime int64  修改時間
			Owner            string 所有者
			PathSuffix       string 文件后綴,說白了就是文件名
			Permission       string 權限
			Replication      int64  副本數
			Type             string 類型,文本的話是FILE,目錄的話是DIRECTORY
		}
	*/
	fs_arr, _ := client.ListStatus(path)
	fmt.Println(fs_arr)
	// [{0 0 supergroup 0 1570359570447 dr.who tkinter 755 0 DIRECTORY} {0 134217728 supergroup 184 1570359155457 root whitealbum.txt 644 1 FILE}]

	for _, fs := range fs_arr {
		fmt.Println("文件名:", fs.PathSuffix)
		/*
		文件名: tkinter
		文件名: whitealbum.txt
		 */
	}
	
    //FileStatus里面包含了文件的詳細信息,如果想查看某個文件的詳細信息
    //可以使用fs, err := client.GetFileStatus(path)
}

創建文件

package main

import (
	"bytes"
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	path := gowfs.Path{Name: "/黑色相簿.txt"}

	/*
	Create函數接收如下參數。
	data:io.Reader,一個實現了io.Reader接口的struct
	Path:很簡單,就是我們這里的path
	overwrite:是否覆蓋,如果為false表示不覆蓋,那么要求文件不能存在,否則報錯
	blocksize:塊大小
	replication:副本
	permission:權限
	buffersize:緩存大小
	contenttype:內容類型

	返回一個bool和error
	 */
	if flag, err :=client.Create(
		bytes.NewBufferString("這是黑色相簿,不是白色相簿"), //如果不指定內容,就直接bytes.NewBufferString()即可
		path, //路徑
		false,//不覆蓋
		0,
		0,
		0666,
		0,
		"text/html",  //純文本格式
		); err != nil {
			fmt.Println("創建文件出錯,錯誤為:", err)
	} else {
		fmt.Println("創建文件成功, flag =", flag)  //創建文件成功, flag = true
	}
}

查看一下

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
	"io/ioutil"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	path := gowfs.Path{Name: "/黑色相簿.txt"}

	reader , _ := client.Open(path, 0, 512, 2048)
	data, _ := ioutil.ReadAll(reader)
	fmt.Println(string(data)) // 這是黑色相簿,不是白色相簿
}

創建目錄

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	path := gowfs.Path{Name: "/a/b/c"}
	
	//遞歸創建
	flag, err := client.MkDirs(path, 0666)
	fmt.Println(flag) // true

	fs_arr, _ := client.ListStatus(gowfs.Path{Name:"/"})
	for _, fs := range fs_arr{
		fmt.Println(fs.PathSuffix)
		/*
		黑色相簿.txt
		a
		tkinter
		whitealbum.txt
		 */
	}
}

重命名

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	fs_arr, _ := client.ListStatus(gowfs.Path{Name:"/"})
	for _, fs := range fs_arr{
		fmt.Println(fs.PathSuffix)
		/*
		黑色相簿.txt
		a
		tkinter
		whitealbum.txt
		 */
	}

	flag, err := client.Rename(gowfs.Path{Name:"/黑色相簿.txt"}, gowfs.Path{Name:"/blackalbum.txt"})
	fmt.Println(flag) // true
	fs_arr, _ = client.ListStatus(gowfs.Path{Name:"/"})
	for _, fs := range fs_arr{
		fmt.Println(fs.PathSuffix)
		/*
			a
			blackalbum.txt
			tkinter
			whitealbum.txt
		*/
	}
}

向已經存在的文件追加內容

package main

import (
	"bytes"
	"fmt"
	"github.com/vladimirvivien/gowfs"
	"io/ioutil"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	path := gowfs.Path{Name: "/whitealbum.txt"}
	reader, _ := client.Open(path, 0, 512, 2048)
	data, _ := ioutil.ReadAll(reader)
	fmt.Println(string(data))
	/*
	白色相簿什么的,已經無所謂了。
	因為已經不再有歌,值得去唱了。
	傳達不了的戀情,已經不需要了。
	因為已經不再有人,值得去愛了。
	 */

	//參數1:內容,必須是實現了io.Reader接口
	//參數2:路徑
	//參數3:緩存大小
	flag, err := client.Append(bytes.NewBufferString("\n讓人討厭的冬天又來了"), path, 2048)
	fmt.Println(flag) // true

	reader, _ = client.Open(path, 0, 512, 2048)
	data, _ = ioutil.ReadAll(reader)
	fmt.Println(string(data))
	/*
	白色相簿什么的,已經無所謂了。
	因為已經不再有歌,值得去唱了。
	傳達不了的戀情,已經不需要了。
	因為已經不再有人,值得去愛了。
	
	讓人討厭的冬天又來了。
	 */
}

設置文件或目錄的所有者

func (fs *FileSystem) SetOwner(path Path, owner string, group string) (bool, error)

設置文件或目錄的權限

func (fs *FileSystem) SetPermission(path Path, permission os.FileMode) (bool, error)

設置文件或目錄的副本系數

func (fs *FileSystem) SetReplication(path Path, replication uint16) (bool, error)

設置文件或目錄的訪問時間和修改時間

func (fs *FileSystem) SetTimes(path Path, accesstime int64, modificationtime int64) (bool, error)

獲取文件的校驗和

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	path := gowfs.Path{Name: "/whitealbum.txt"}
	f, _ := client.GetFileChecksum(path)
	fmt.Println(f)  // {MD5-of-0MD5-of-512CRC32C 0000020000000000000000001255073187d3e801940eee180acebe4e00000000 28}
	fmt.Println(f.Algorithm, f.Length, f.Bytes) // MD5-of-0MD5-of-512CRC32C 28 0000020000000000000000001255073187d3e801940eee180acebe4e00000000
}

刪除文件

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	path := gowfs.Path{Name:"/blackalbum.txt"}
	//路徑,是否遞歸
	flag, _ := client.Delete(path, true)
	fmt.Println(flag) // true
}

判斷文件是否存在

為什么這里用藍色了,因為之后的用法就不一樣了

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, err := gowfs.NewFileSystem(config)
	if err != nil {
		panic(fmt.Sprintln("出現異常,異常信息為:", err))
	}

	//創建一個shell,可以使用下面shell進行操作
	shell := gowfs.FsShell{FileSystem: client}

	//直接傳字符串即可,不需要傳Path了
	flag, _ := shell.Exists("/whitealbum.txt")
	fmt.Println(flag) // true
	flag, _ = shell.Exists("/whitealbum.txt1")
	fmt.Println(flag) // false
}

改變所有者

flag, _ := shell.Chown([]string{"/file1", "/file2", "/file3"}, "owner")

改變所屬組

flag, _ := shell.Chgrp([]string{"/file1", "/file2", "/file3"}, "groupName")

改變權限

flag, _ := shell.Chmod([]string{"/file1", "/file2", "/file3"}, 0666)

查看文件內容

package main

import (
	"bytes"
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, _ := gowfs.NewFileSystem(config)

	shell := gowfs.FsShell{FileSystem: client}

	buf := bytes.Buffer{}
	if err := shell.Cat([]string{"/whitealbum.txt"}, &buf); err != nil {
		fmt.Println("err =", err)
	} else {
		fmt.Println(buf.String())
		/*
			白色相簿什么的,已經無所謂了。
			因為已經不再有歌,值得去唱了。
			傳達不了的戀情,已經不需要了。
			因為已經不再有人,值得去愛了。

			讓人討厭的冬天又來了
		*/
	}
}

追加文件內容

package main

import (
	"bytes"
	"fmt"
	"github.com/vladimirvivien/gowfs"
	"io/ioutil"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, _ := gowfs.NewFileSystem(config)

	shell := gowfs.FsShell{FileSystem: client}

	_ = ioutil.WriteFile("aaa.txt", []byte("\n冬馬小三\n"), 0666)
	_ = ioutil.WriteFile("bbb.txt", []byte("雪菜碧池\n"), 0666)
	_ = ioutil.WriteFile("ccc.txt", []byte("打死春哥\n"), 0666)
	_ = ioutil.WriteFile("ddd.txt", []byte("抱走冬馬雪菜"), 0666)

	_, _ = shell.AppendToFile([]string{"aaa.txt", "bbb.txt", "ccc.txt", "ddd.txt"}, "/whitealbum.txt")

	buf := bytes.Buffer{}
	_ = shell.Cat([]string{"/whitealbum.txt"}, &buf)
	fmt.Println(buf.String())
	/*
		白色相簿什么的,已經無所謂了。
		因為已經不再有歌,值得去唱了。
		傳達不了的戀情,已經不需要了。
		因為已經不再有人,值得去愛了。

		讓人討厭的冬天又來了
		冬馬小三
		雪菜碧池
		打死春哥
		抱走冬馬雪菜
	*/
}

上傳文件

package main

import (
	"fmt"
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, _ := gowfs.NewFileSystem(config)

	shell := gowfs.FsShell{FileSystem: client}

	//本地路徑,hdfs路徑,是否重寫
	_, _ = shell.Put("aaa.txt", "/aaa.txt", false)

	path := gowfs.Path{Name: "/"}
	fs_arr, _ := client.ListStatus(path)
	for _, fs := range fs_arr {
		fmt.Println(fs.PathSuffix)
		/*
			黑色相簿.txt
			a
			aaa.txt
			tkinter
			whitealbum.txt
		*/
	}
}

下載文件

package main

import (
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, _ := gowfs.NewFileSystem(config)

	shell := gowfs.FsShell{FileSystem: client}

	_, _ = shell.Get("/whitealbum.txt", "白色album.txt")
}

刪除文件

package main

import (
	"github.com/vladimirvivien/gowfs"
)

func main() {
	config := gowfs.Configuration{Addr: "xx.xx.xx.xx:50070", User: "satori"}
	client, _ := gowfs.NewFileSystem(config)

	shell := gowfs.FsShell{FileSystem: client}

	_, _ = shell.Rm("/whitealbum.txt")
}

MapReduce定義

MapReduce是一個分布式運算程序的編程框架,是用戶開發"基於hadoop的數據分析應用"的核心框架

MapReduce的核心功能是將用戶編寫的業務邏輯代碼自帶默認組件組合成一個完整的分布式運算程序,並發運行在一個hadoop集群上。

MapReduce優缺點

優點

  • MapReduce易於編程

    它簡單地實現一些接口,就可以完成一個分布式應用程序,這個分布式應用程序可以分布到大量廉價的pc機器上運行。也就是說,你寫一個分布式應用程序,跟寫一個簡單的串行程序是一模一樣的。就是因為這個特點,使得MapReduce編程非常流行

  • 良好的擴展性

    當你的計算資源不足時,你可以通過簡單的增加機器來擴展計算能力

  • 高容錯性

    MapReduce設計的初衷就是使程序能夠運行在廉價的PC機器上,這就要求它具有很高的容錯性。比如其中一台機器掛了,它可以把上面的計算任務轉移到另一個節點上運行,不至於這個任務完全失敗。而且這個過程不需要人工參與,是由hadoop內部完成的。

  • 適合PB級以上海量數據的離線處理

    可以實現上千台服務器集群並發工作,提高數據處理能力

缺點

  • 不擅長實時計算

    MapReduce無法像mysql一樣,可以在毫秒級或者秒級內返回結果

  • 不擅長流式計算

    流式計算輸入的數據是動態的,而MapReduce的數據數據必須是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了數據源必須是靜態的

  • 不支持DAG(有向無環圖)計算

    多個程序之間存在依賴,后一個應用程序的輸入依賴於上一個程序的輸出。在這種情況,MapReduce不是不能做,而是使用后,每個MapReduce作業的輸出結果都會寫入到磁盤,然后再從磁盤中讀取,進行下一個操作,這樣做會造成大量的磁盤IO,導致性能非常的低下。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM