flume_kafka_hdfs_hive數據的處理


     使用flume收集數據,將數據傳遞給kafka和hdfs,kafka上的數據可以使用storm構建實時計算,而hdfs上的數據,經過MR處理之后可以導入hive中進行處理。

     環境:hadoop1.2.1,hive 0.13.1,maven 3.2.5,flume 1.4,kafka 0.7.2,eclipse luna,jdk 1.7_75;mysql-connector-java-5.1.26.bin.jar,flume-kafka-master.zip。

     說明:所有服務都架設在一台機器上。

     1:安裝hadoop:這篇文章寫得比較完整,可以看看:Ubuntu 12.10 安裝JDK、Hadoop全過程

我在安裝過程中出現:Does not contain a valid host:port authority: file:/// ,看了一遍自己的core-site.xml,hdfs-site.xml,mapred-site.xml沒有發現錯誤,還特地看了些hosts配置,最后網上找到,fs.default.name中default寫錯了,啟動hadoop。

hadoopnew

     2:安裝hive:下載解壓之后,設置HIVE_HOME,將HIVE_HOME/bin加入到PATH變量中,直接輸入hive即可啟動。默認hive是使用嵌入模式的Derby數據庫,它的特點是小巧,而且老爹也是apache,但存在單session,無法多用戶共享,這里參考網上的資料將元數據存儲到mysql中去:Hive集成Mysql作為元數據,這里我出現了點問題,只能使用localhost進行連接,無法使用root@myggg,試着按照文章中查找my.conf,但沒有找到相關配置,在實驗環境下這樣也可以用:

hive

     3:安裝flume:

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Flume是由Cloudera推出的一個高效收集,處理,移動大量日志數據的分布式,可靠地,高可用的服務。它有簡單並且靈活的架構基於數據流之上。它使用可靠地協調性,容錯轉移,恢復機制使它強健性並且容錯。它使用簡單可伸縮的數據模型能實現在線數據分析。

Flume安agent划分,一個agent包括Source,Channel,Sink三個部分,Source從Web Server中取數據,push交給Channel,Sink將pull Channel得到數據,一個agent可以有多個Channel, Sink。

     配置FLUME_HOME,將PATH中加入Flume下的執行路徑,將conf下的flume-conf.properties.template重命名為flume-conf.properties,然后進行配置,在單機情況下:

agent.sources = r1                //agent中添加source,命名為r1
agent.sinks = s1                  //agent中添加sink,命名為s1
agent.channels = c1               //agent中添加channel,命名為c1
agent.sinks.s1.channel = c1       //s1從c1中取數據
agent.sources.r1.channels = c1    //r1將數據交給c1

#describe the source
agent.sources.r1.type = exec      //定義r1的類型為exec
agent.sources.r1.command = tail -F /root/input/loginfo    //r1執行的命令

#use a channel which buffers events in memory
agent.channels.c1.type = memory    //定義c1的類型memory
agent.channels.c1.capacity = 1000  //c1的容量
agent.channels.c1.transactionCapacity = 100  //channel獲取或者sink獲得一次最大的數據量 

#sinks type
agent.sinks.s1.type = logger       //定義s1的類型為logger

完成后啟動flume,測試:

bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent

flume

4:安裝kafka:關於kafka的介紹:Kafka快速入門,簡單來說,kafka集群中的一台服務器就是一個broker,消息按名字分類,叫做topic,消息的產生是producer,消息的獲取方為customer。kafka的安裝方法同上。由於使用默認配置,kafka中的config下不需要配置了,直接啟動即可進行模擬:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-producer.sh --zookeeper localhost:2181 localhost:9092 --topic test 
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

相關可以看apache-kafka

5:進行整合:數據的處理過程包括數據收集,數據清理,數據存儲,數據分析,數據展現。在這里數據的收集由flume負責,定期從web server中收集log相關信息,對於實時數據的處理,將數據直接發送到kafka,然后交給后面的storm處理(這個沒有做),對於離線部分,經過簡單的mr處理后存儲到hdfs上,然后使用hive操作。

總的架構圖:

arch

Flume的設計:

flume_ng

在搭建之前,先安裝maven:安裝步驟同上Flume與Kafka

安裝完后echo $PATH:

/usr/lib/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/Downloads/apache-maven-3.2.5/bin

Flume與Kafka之間整合需要一個插件:這里介紹個flume-kafka插件,flume1.4,kafka0.7.2的基礎上,將代碼下載下來,進入目錄,使用maven打包成jar文件,將生成的jar包放到flume的lib或相關目錄下,依次將hadoop.1.2.1-*.jar,kafka0.7.2.jar, scala-compiler.jar(2.8),scala-library.jar(2.8),zkclient-0.1.jar導入,mvn package過程中可能會報錯,找不到kafka0.7.2.jar,你需要將額外的extra-dependencies下的包放到‘~/.m2/repository/com/linkedin/kafka/kafka/0.7.2/’下,再進行package。

對於myggg開啟6個終端:

console

對於發送到kafka中的數據以后在處理,現在主要是針對hadoop中的數據,首先使用MR處理,格式化文本。

6:后續:解壓eclipse,將之前准備的hadoop-eclipse-plugin-1.2.1.jar放到eclipse下的plugins目錄下,使用vnc連接到機器,編寫MR程序。

使用’hadoop fs -cat /myFlume/FlumeData.1426320728464’查看文件:

1,b
2,c
3,d
4,e
5,f
6,g
7,z
0,o

編寫MR,將一行記錄拆分為key,value:

public static class MyMapper
         extends Mapper<Object, Text, IntWritable, Text>{
      private IntWritable hello = new IntWritable();
      private Text world = new Text();
  
      public void map(Object key, Text value, Context context
                      ) throws IOException, InterruptedException {
        String[] array = value.toString().split(",");
        if (array.length == 2) {
          hello.set(Integer.parseInt(array[0]));
          world.set(array[1]);
  
          context.write(hello, world);
        }
      }
    }

查看結果:

[root@myggg eclipse]# hadoop fs -cat /myoutput/part-r-00000
0    o
1    b
2    c
3    d
4    e
5    f
6    g
7    z

使用hive建立外部表查看數據:

create external table employee(id int, name string)
row format delimited fields terminated by '\t'
lines terminated by '\n'
stored as textfile
location '/myoutput';

然后就可以進行相關查詢與處理了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM