使用flume收集數據,將數據傳遞給kafka和hdfs,kafka上的數據可以使用storm構建實時計算,而hdfs上的數據,經過MR處理之后可以導入hive中進行處理。
環境:hadoop1.2.1,hive 0.13.1,maven 3.2.5,flume 1.4,kafka 0.7.2,eclipse luna,jdk 1.7_75;mysql-connector-java-5.1.26.bin.jar,flume-kafka-master.zip。
說明:所有服務都架設在一台機器上。
1:安裝hadoop:這篇文章寫得比較完整,可以看看:Ubuntu 12.10 安裝JDK、Hadoop全過程
我在安裝過程中出現:Does not contain a valid host:port authority: file:/// ,看了一遍自己的core-site.xml,hdfs-site.xml,mapred-site.xml沒有發現錯誤,還特地看了些hosts配置,最后網上找到,fs.default.name中default寫錯了,啟動hadoop。
2:安裝hive:下載解壓之后,設置HIVE_HOME,將HIVE_HOME/bin加入到PATH變量中,直接輸入hive即可啟動。默認hive是使用嵌入模式的Derby數據庫,它的特點是小巧,而且老爹也是apache,但存在單session,無法多用戶共享,這里參考網上的資料將元數據存儲到mysql中去:Hive集成Mysql作為元數據,這里我出現了點問題,只能使用localhost進行連接,無法使用root@myggg,試着按照文章中查找my.conf,但沒有找到相關配置,在實驗環境下這樣也可以用:
3:安裝flume:
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
Flume是由Cloudera推出的一個高效收集,處理,移動大量日志數據的分布式,可靠地,高可用的服務。它有簡單並且靈活的架構基於數據流之上。它使用可靠地協調性,容錯轉移,恢復機制使它強健性並且容錯。它使用簡單可伸縮的數據模型能實現在線數據分析。
Flume安agent划分,一個agent包括Source,Channel,Sink三個部分,Source從Web Server中取數據,push交給Channel,Sink將pull Channel得到數據,一個agent可以有多個Channel, Sink。
配置FLUME_HOME,將PATH中加入Flume下的執行路徑,將conf下的flume-conf.properties.template重命名為flume-conf.properties,然后進行配置,在單機情況下:
agent.sources = r1 //agent中添加source,命名為r1 agent.sinks = s1 //agent中添加sink,命名為s1 agent.channels = c1 //agent中添加channel,命名為c1 agent.sinks.s1.channel = c1 //s1從c1中取數據 agent.sources.r1.channels = c1 //r1將數據交給c1 #describe the source agent.sources.r1.type = exec //定義r1的類型為exec agent.sources.r1.command = tail -F /root/input/loginfo //r1執行的命令 #use a channel which buffers events in memory agent.channels.c1.type = memory //定義c1的類型memory agent.channels.c1.capacity = 1000 //c1的容量 agent.channels.c1.transactionCapacity = 100 //channel獲取或者sink獲得一次最大的數據量 #sinks type agent.sinks.s1.type = logger //定義s1的類型為logger
完成后啟動flume,測試:
bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent
4:安裝kafka:關於kafka的介紹:Kafka快速入門,簡單來說,kafka集群中的一台服務器就是一個broker,消息按名字分類,叫做topic,消息的產生是producer,消息的獲取方為customer。kafka的安裝方法同上。由於使用默認配置,kafka中的config下不需要配置了,直接啟動即可進行模擬:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-console-producer.sh --zookeeper localhost:2181 localhost:9092 --topic test bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
相關可以看apache-kafka。
5:進行整合:數據的處理過程包括數據收集,數據清理,數據存儲,數據分析,數據展現。在這里數據的收集由flume負責,定期從web server中收集log相關信息,對於實時數據的處理,將數據直接發送到kafka,然后交給后面的storm處理(這個沒有做),對於離線部分,經過簡單的mr處理后存儲到hdfs上,然后使用hive操作。
總的架構圖:
Flume的設計:
在搭建之前,先安裝maven:安裝步驟同上Flume與Kafka
安裝完后echo $PATH:
/usr/lib/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/Downloads/apache-maven-3.2.5/bin
Flume與Kafka之間整合需要一個插件:這里介紹個flume-kafka插件,flume1.4,kafka0.7.2的基礎上,將代碼下載下來,進入目錄,使用maven打包成jar文件,將生成的jar包放到flume的lib或相關目錄下,依次將hadoop.1.2.1-*.jar,kafka0.7.2.jar, scala-compiler.jar(2.8),scala-library.jar(2.8),zkclient-0.1.jar導入,mvn package過程中可能會報錯,找不到kafka0.7.2.jar,你需要將額外的extra-dependencies下的包放到‘~/.m2/repository/com/linkedin/kafka/kafka/0.7.2/’下,再進行package。
對於myggg開啟6個終端:
對於發送到kafka中的數據以后在處理,現在主要是針對hadoop中的數據,首先使用MR處理,格式化文本。
6:后續:解壓eclipse,將之前准備的hadoop-eclipse-plugin-1.2.1.jar放到eclipse下的plugins目錄下,使用vnc連接到機器,編寫MR程序。
使用’hadoop fs -cat /myFlume/FlumeData.1426320728464’查看文件:
1,b 2,c 3,d 4,e 5,f 6,g 7,z 0,o
編寫MR,將一行記錄拆分為key,value:
public static class MyMapper extends Mapper<Object, Text, IntWritable, Text>{ private IntWritable hello = new IntWritable(); private Text world = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String[] array = value.toString().split(","); if (array.length == 2) { hello.set(Integer.parseInt(array[0])); world.set(array[1]); context.write(hello, world); } } }
查看結果:
[root@myggg eclipse]# hadoop fs -cat /myoutput/part-r-00000 0 o 1 b 2 c 3 d 4 e 5 f 6 g 7 z
使用hive建立外部表查看數據:
create external table employee(id int, name string) row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile location '/myoutput';
然后就可以進行相關查詢與處理了。