HIve總結:
首先要學習Hive,第一步是了解Hive,Hive是基於Hadoop的一個數據倉庫,可以將結構化的數據文件映射為一張表,並提供類sql查詢功能,Hive底層將sql語句轉化為mapreduce任務運行。相對於用java代碼編寫mapreduce來說,Hive的優勢明顯:快速開發,人員成本低,可擴展性(自由擴展集群規模),延展性(支持自定義函數)。
Hive的構架:
Hive提供了三種用戶接口:CLI、HWI和客戶端。客戶端是使用JDBC驅動通過thrift,遠程操作Hive。HWI即提供Web界面遠程訪問Hive。但是最常見的使用方式還是使用CLI方式。(在linux終端操作Hive)
Hive有三種安裝方式:
1、內嵌模式(元數據保村在內嵌的derby種,允許一個會話鏈接,嘗試多個會話鏈接時會報錯,不適合開發環境)
2、本地模式(本地安裝mysql 替代derby存儲元數據)
3、遠程模式(遠程安裝mysql 替代derby存儲元數據)
安裝Hive:(本地模式)
首先Hive的安裝是在Hadoop集群正常安裝的基礎上,並且集群啟動
安裝Hive之前我們要先安裝mysql,
查看是否安裝過mysql:rpm -qa|grep mysql*
查看有沒有安裝包:yum list mysql*
安裝mysql客戶端:yum install -y mysql
安裝服務器端:yum install -y mysql-server
yum install -y mysql-devel
啟動數據庫 service mysqld start或者/etc/init.d/mysqld start
創建hadoop用戶並賦予權限:
mysql>grant all on *.* to hadoop@'%' identified by 'hadoop';
mysql>grant all on *.* to hadoop@'localhost' identified by 'hadoop';
mysql>grant all on *.* to hadoop@'master' identified by 'hadoop';
mysql>flush privileges;
解壓:tar -zxvf apache-hive-1.2.1-bin.tar.gz
配置:cd /apache-hive-1.2.1-bin/conf/ vim hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.local</name>
<value>true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master:3306/hive?characterEncoding=UTF-8</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hadoop</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hadoop</value>
</property>
</configuration>
復制依賴包:cp mysql-connector-java-5.1.43-bin.jar apache-hive-1.2.1-bin/lib/
配置環境變量:
export HIVE_HOME=$PWD/apache-hive-1.2.1-bin
export PATH=$PATH:$HIVE_HOME/bin
啟動hive:hive
hive中可以運行shell命令:! shell命令
hive中可以運行hadoop命令:
hive中的數據類型:
原子數據類型:TINYINT SMALLINT INT BIGINT FLOAT DOUBLE BOOLEAN STRING
復雜數據類型:STRUCT MAP ARRAY
hive的使用:
建表語句:
DDL:
創建內部表:
create table mytable(
id int,
name string)
row format delimited fields terminated by '\t' stored as textfile;
常見外部表:關鍵字 external
create external table mytable2(
id int,
name string)
row format delimited fields terminated by '\t' location '/user/hive/warehouse/mytable2';
創建分區表:分區字段要寫在partiton by()
create table mytable3(
id int,
name string)
partitioned by(sex string) row format delimited fields terminated by '\t'stored as textfile;
靜態分區插入數據
load data local inpath '/root/hivedata/boy.txt' overwrite into table mytable3 partition(sex='boy');
增加分區:
alter table mytable3 add partition (sex='unknown') location '/user/hive/warehouse/mytable3/sex=unknown';
刪除分區:alter table mytable3 drop if exists partition(sex='unknown');
分區表默認為靜態分區,可轉換為自動套分區
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
給分區表灌入數據:
insert into table mytable3 partition (sex) select id,name,'boy' from student_mdf;
查詢表分區:show partitions mytable3;
查詢分區表數據:select * from mytable3;
查詢表結構:desc mytable3;
DML:
重命名表:alter table student rename to student_mdf
增加列:alter table student_mdf add columns (sex string);
修改列名:alter table student_mdf change sex gender string;
替換列結構:alter table student_mdf replace columns (id string, name string);
裝載數據:(本地數據)load data local inpath '/home/lym/zs.txt' overwrite into student_mdf;
(HDFS數據)load data inpath '/zs.txt' into table student_mdf;
插入一條數據:insert into table student_mdf values('1','zhangsan');
創建表接收查詢結果:create table mytable5 as select id, name from mytable3;
導出數據:(導出到本地)insert overwrite local directory '/root/hivedata/mytable5.txt' select * from mytable5;
(導出到HDFS)
insert overwrite directory 'hdfs://master:9000/user/hive/warehouse/mytable5_load' select * from mytable5;
數據查詢:
select * from mytable3; 查詢全表
select uid,uname from student; 查詢學生表中的學生姓名與學號字段
select uname,count(*) from student group by uname; 統計學生表中每個名字的個數
常用的功能還有 having、order by、sort by、distribute by、cluster by;等等
關聯查詢中有
內連接:將符合兩邊連接條件的數據查詢出來
select * from t_a a inner join t_b b on a.id=b.id;
左外連接:以左表數據為匹配標准,右邊若匹配不上則數據顯示null
select * from t_a a left join t_b b on a.id=b.id;
右外連接:與左外連接相反
select * from t_a a right join t_b b on a.id=b.id;
左半連接:左半連接會返回左邊表的記錄,前提是其記錄對於右邊表滿足on語句中的判定條件。
select * from t_a a left semi join t_b b on a.id=b.id;
全連接(full outer join):
select * from t_a a full join t_b b on a.id=b.id;
in/exists關鍵字(1.2.1之后新特性):效果等同於left semi join
select * from t_a a where a.id in (select id from t_b);
select * from t_a a where exists (select * from t_b b where a.id = b.id);
shell操作Hive指令:
-e:從命令行執行指定的HQL:
-f:執行HQL腳本
-v:輸出執行的HQL語句到控制台
內置函數
查看內置函數:show functions;
顯示函數的詳細信息:DESC FUNCTION abs;
重要常用內置函數:sum()--求和 count()--求數據量 avg()--求平均值
distinct--去重
min--求最小值
max--求最大值
自定義函數:
1.先開發一個簡單的Java類,org.apache.hadoop.hive.ql.exec.UDF,重載evaluate方法
import org.apache.hadoop.hive.ql.exec.UDF;
public final class AddUdf extends UDF {
public Integer evaluate(Integer a, Integer b) {
if (null == a || null == b) {
return null;
} return a + b;
}
public Double evaluate(Double a, Double b) {
if (a == null || b == null)
return null;
return a + b;
}
}
2.打成jar包上傳到服務器
3.將jar包添加到hive add jar /home/lan/jar/addudf.jar;
4.創建臨時函數與開發好的class關聯起來
CREATE TEMPORARY FUNCTION add_example AS 'org.day0914.AddUdf';
5.使用自定義函數 SELECT add_example(scores.math, scores.art) FROM scores;
銷毀臨時函數:DROP TEMPORARY FUNCTION add_example;
Hive相關工具:Sqoop Azkaban Flume
Sqoop
Sqoop是一個開源工具,它允許用戶將數據從關系型數據庫抽取到Hadoop中,用於進一步的處理。抽取出的數據可以被MapReduce程序使用,也可以被其他類似於Hive的工具使用。一旦形成分析結果,Sqoop便可以將這些結果導回數據庫,供其他客戶端使用
用戶向 Sqoop 發起一個命令之后,這個命令會轉換為一個基於 Map Task 的 MapReduce 作業。Map Task 會訪問數據庫的元數據信息,通過並行的 Map Task 將數據庫的數據讀取出來,然后導入 Hadoop 中。 將 Hadoop 中的數據,導入傳統的關系型數據庫中。它的核心思想就是通過基於 Map Task (只有 map)的 MapReduce 作業,實現數據的並發拷貝和傳輸,這樣可以大大提高效率
數據導入:首先用戶輸入一個 Sqoop import 命令,Sqoop 會從關系型數據庫中獲取元數據信息,比如要操作數據庫表的 schema是什么樣子,這個表有哪些字段,這些字段都是什么數據類型等。它獲取這些信息之后,會將輸入命令轉化為基於 Map 的 MapReduce作業。這樣 MapReduce作業中有很多 Map 任務,每個 Map 任務從數據庫中讀取一片數據,這樣多個 Map 任務實現並發的拷貝,把整個數據快速的拷貝到 HDFS 上
數據導出:首先用戶輸入一個 Sqoop export 命令,它會獲取關系型數據庫的 schema,建立 Hadoop 字段與數據庫表字段的映射關系。 然后會將輸入命令轉化為基於 Map 的 MapReduce作業,這樣 MapReduce作業中有很多 Map 任務,它們並行的從 HDFS 讀取數據,並將整個數據拷貝到數據庫中
sqoop查詢語句
進入sqoop安裝主目錄:cd /home/lanou/sqoop-1.4.5.bin__hadoop-2.0.4-alpha
將HDFS上的數據導入到MySQL中:bin/sqoop export --connect jdbc:mysql://192.168.2.136:3306/test --username hadoop --password hadoop --table name_cnt --export-dir '/user/hive/warehouse/mydb.db/name_cnt' --fields-terminated-by '\t'
將MySQL中的數據導入到HDFS上:
bin/sqoop import --connect jdbc:mysql://192.168.2.136:3306/test --username hadoop --password hadoop --table name_cnt -m 1
sqoop:表示sqoop命令
export:表示導出
--connect jdbc:mysql://192.168.2.136:3306/test :表示告訴jdbc,連接mysql的url。
--username hadoop: 連接mysql的用戶名
--password hadoop: 連接mysql的密碼
--table name_cnt: 從mysql要導出數據的表名稱
--export-dir '/user/hive/warehouse/mydb.db/name_cnt' hive中被導出的文件
--fields-terminated-by '\t': 指定輸出文件中的行的字段分隔符
Azkaban
Azkaban是由Linkedin公司推出的一個批量工作流任務調度器,用於在一個工作流內以一個特定的順序運行一組工作和流程。Azkaban使用job配置文件建立任務之間的依賴關系,並提供一個易於使用的web用戶界面維護和跟蹤你的工作流。
Azkaban功能特點:1.兼容所有Hadoop版本
2.可以通過WebUI來進行管理配置,操作方便
3.更容易設置job的依賴關系
4.可以配置定時任務調度
5.郵件警告失敗或成功 等等
Azkaban架構:
Azkaban使用MySQL去做一些狀態的存儲
Azkaban Web服務:Azkaban使用Jetty作為Web服務器,用作控制器以及提供Web界面
AzkabanWebServer對數據庫的使用:項目管理:對項目權限和上傳文件的管理;執行流程狀態:對正在執行的程序進行跟蹤;查看任務執行結果以及歷史日志;調度程序:保持預定的工作狀態。
Azkaban執行服務器,執行提交的工作流
AzkabanExecutorServer 對數據庫的使用:獲取項目:從數據庫中檢索項目文件;執行工作流或Jobs:從數據庫獲取要執行的任務流;Logs:存儲作業的輸出日志,並將其流入數據庫;不同的依賴進行交流:如果一個流在不同的執行器上運行,它將從數據庫中獲取狀態
安裝Azkaban:
1.要先配置mysql。
1.1修改mysql的編碼,vim /etc/my.cnf

1.2重啟mysql,service mysqld restart 然后進入mysql,創建azkaban數據庫並授權,刷新權限。與創建hive數據庫相同。
2.配置Azkaban Web Server
2.1解壓Azkaban壓縮包 unzip azkaban-web-2.5.0.zip
2.2上傳mysql驅動包
cp mysql-connector-java-5.1.43/mysql-connector-java-5.1.43-bin.jar azkaban-web-2.5.0/extlib/
2.3修改配置文件
修改azkaban.properties文件 cd azkaban-web-2.5.0/conf/ vim azkaban.properties
#默認時區改為亞洲/上海 默認為美國
default.timezone.id=Asia/Shanghai
#數據庫連接IP
mysql.host=master
修改文件權限: chmod 755 /home/lanou/azkaban/azkaban-web-2.5.0/bin/*
配置jetty ssl:keytool -keystore keystore -alias jetty -genkey -keyalg RSA
Enter keystore password: password
What is your first and last name? 您的名字與姓氏是什么?
[Unknown]: jetty.mortbay.org
What is the name of your organizational unit?您的組織單位名稱是什么?
[Unknown]: Jetty
What is the name of your organization?您的組織名稱是什么?
[Unknown]: Mort Bay Consulting Pty. Ltd.
What is the name of your City or Locality?您所在的城市或區域名稱是什么?
[Unknown]:
What is the name of your State or Province?您所在的州或省份名稱是什么?
[Unknown]:
What is the two-letter country code for this unit?該單位的兩字母國家代碼是什么
[Unknown]:
Is CN=jetty.mortbay.org, OU=Jetty, O=Mort Bay Consulting Pty. Ltd.,
L=Unknown, ST=Unknown, C=Unknown correct?正確嗎?
[no]: yes
Enter key password for <jetty>
(RETURN if same as keystore password): password
這里的密碼要與 azkaban-web-2.5.0/conf/azkaban.properties 中設置的密碼相同,否則會報錯Keystore was tampered with, or password was incorrect。
1.job:
type=command
command=hadoop fs -mkdir /test
command.1=hadoop fs -put /home/lym /hadoop-2.7.1/README.txt /test
2.job:
type=command
command=hadoop jar /home/lym/hadoop-2.7.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar wordcount /test /test-out
dependencies=1
實現上傳/README.txt並且用wordcount計算
把需要運行的job放在同一個文件下面打成.zip格式的包,注意 Azkaban目前只支持.zip格式
頁面操作:首先是登陸azkaban;創建一個工程;上傳job;執行job;查看job執行情況
綠色代表成功,藍色是運行,紅色是失敗。可以查看job運行時間,依賴和日志,點擊details可以查看各個job運行情況
Flume
Flume是Cloudera提供的日志收集系統,具有分布式、高可靠、高可用性等特點,對海量日志采集、聚合和傳輸,Flume支持在日志系統中制定各類數據發送,同時,Flume提供對數據進行簡單處理,並寫到各種數接受方的能力。其設計的原理也是基於將數據流,如日志數據從各種網站服務器上匯集起來存儲到HDFS,HBase等集中存儲器中。
Flume的核心是把數據從數據源收集過來,在送到目的地,為了保證輸送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地后,刪除自己緩存的數據
Flume傳輸的數據基本單位是Event,如果是文本文件,通常是一行記錄,這也是事務的基本單位。Event從Source,流向Channel,再到Sink,本身為一個byte數組,並可攜帶headers信息。Event代表着一個數據流的最小完整單元,從外部數據源來,向外部的目的地去
Flume運行的核心是Agent。它是一個完整的數據收集工具,含有三個核心組件,分別是source、channel、sink。通過這些組件,Event可以從一個地方流向另外一個地方
Flume允許多個agent連在一起,形成前后相連的多級跳:
核心組件:source channel sink
source:source負責接收event或通過特殊機制產生event,並將events批量的放到一個或多個channel;source必須至少和一個channel關聯
channel:channel位於source和sink之間,用於緩存進來的event;當Sink成功的將event發送到下一跳的channel或最終目的時候,event從Channel移除
sink:Sink負責將event傳輸到下一跳或最終目的;sink在設置存儲數據時,可以向文件系統、數據庫、Hadoop存數據,在日志數據較少時,可以將數據存儲在文件系中,並且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到hadoop中,便於日后進行相應的數據分析;必須作用於一個確切的channel
安裝部署:
1.下載 http://mirror.bit.edu.cn/apache/flume/1.6.0/
2.解壓 tar -xvf apache-flume-1.6.0-bin.tar.gz tar -xvf apache-flume-1.6.0-src.tar.gz
3.將源碼合並至安裝目錄apache-flume-1.6.0-bin下:
cp -r apache-flume-1.6.0-src apache-flume-1.6.0-bin/
3.配置環境變量 vim ~/.bash_profile
export FLUME_HOME=/home/lan/apache-flume-1.6.0-bin/
export PATH=$PATH:$FLUME_HOME/bin
4.測試flume-ng是否安裝成功:flume-ng version
測試flume-ng功能:將收集到的日志輸出到hdfs上為
新建一個flume代理agent1的配置文件example.conf:
cd apache-flume-1.6.0-bin/conf/ vim example.conf
#agent1
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = c1
#source1
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/lan/agent1log
agent1.sources.source1.channels = c1
agent1.sources.source1.fileHeader = false
#sink1
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://master:9000/agentlog
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = TEXT
agent1.sinks.sink1.hdfs.rollInteval = 4
agent1.sinks.sink1.channel = c1
#channel1
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/lan/agent1_tmp1
agent1.channels.c1.dataDirs = /home/lan/agent1_tmpdata
#agent1.channels.channel1.capacity = 10000
#agent1.channels.channel.transactionCapacity = 1000
新建agent1log :mkdir agent1log
啟動flume-ng:
cd apache-flume-1.6.0-bin
flume-ng agent -n agent1 -c conf -f /home/lan/apache-flume-1.6.0-bin/conf/example.conf -Dflume.root.logger=DEBUG,console
另啟一個terminal,在監測目錄下創建新的文件test2.txt
cd ~/agent1log
vim test2.txt
查看sink1的輸出,發現目標路徑下有一個以FlumeData開始,產生文件的時間戳為后綴的文件,說明flume能監測到目標目錄變化,將產生變化的部分實時地收集到sink的輸出中。