Flume學習 & Kafka & Storm 等 & Log4J 配置


正在學習這篇文章:

http://blog.csdn.net/ymh198816/article/details/51998085

和工作中接觸的電商、訂單、分析,可以結合起來。

 

開宗明義,這幅圖片:

 

 

Strom是一個非常快的實時計算框架,至於快到什么程度呢?

官網首頁給出的數據是每一個Storm集群上的節點每一秒能處理一百萬條數據。
相比Hadoop的“Mapreduce”計算框架,Storm使用的是"Topology";
Mapreduce程序在計算完成后最終會停下來,而Topology則是會永遠運行下去除非你顯式地使用“kill -9 XXX”命令停掉它。

 

准備實際寫一個實時分析系統。不然紙上得來終覺淺。

首先需要讓Java程序在Linux環境上運行。用leetcode的java程序來做實驗,把leetcode工程的output目錄拷貝到安裝了java的機器(m42n05.gzns)。

$ pwd
/home/work/data/code/out/production/leetcode

$ java com.company.Main
Hello!
ret:3 注意,只能在這個目錄,運行完成的package。如果換到子目錄,就不行:

$ pwd
/home/work/data/code/out/production/leetcode/com

$ java company.Main
Error: Could not find or load main class company.Main

 

用Idea創建了一個Maven項目“LogGenerator”,項目的主要代碼如下:

package com.comany.log.generator;

/**
 * Created by baidu on 16/11/7.
 */

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

// Import log4j classes.
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;


public class LogGenerator {

    public enum paymentWays {
        Wechat,Alipay,Paypal
    }
    public enum merchantNames {
        優衣庫,天貓,淘寶,咕嚕大大,快樂寶貝,守望先峰,哈毒婦,Storm,Oracle,Java,CSDN,跑男,路易斯威登,
        暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati
    }

    public enum productNames {
        黑色連衣裙, 灰色連衣裙, 棕色襯衫, 性感牛仔褲, 圓腳牛仔褲,塑身牛仔褲, 朋克衛衣,高腰闊腿休閑褲,人字拖鞋,
        沙灘拖鞋
    }

    float[] skuPriceGroup = {299,399,699,899,1000,2000};
    float[] discountGroup = {10,20,50,100};
    float totalPrice = 0;
    float discount = 0;
    float paymentPrice = 0;

    private static final Logger logger = LogManager.getLogger(LogGenerator.class);
    private int logsNumber = 10;

    public void generate() {

        for(int i = 0; i <= logsNumber; i++) {
            logger.info(randomOrderInfo());
        }
    }

    public String randomOrderInfo() {

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();

        String orderNumber = randomNumbers(5) + date.getTime();

        String orderDate = sdf.format(date);

        String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8);

        String paymentDate = sdf.format(date);

        String merchantName = randomMerchantNames();

        String skuInfo = randomSkus();

        String priceInfo = calculateOrderPrice();

        return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " +
                paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName +
                " | sku: " + skuInfo + " | price: " + priceInfo;
    }

    private String randomPaymentWays() {

        paymentWays[] paymentWayGroup = paymentWays.values();
        Random random = new Random();
        return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name();
    }

    private String randomMerchantNames() {

        merchantNames[] merchantNameGroup = merchantNames.values();
        Random random = new Random();
        return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name();
    }

    private String randomProductNames() {

        productNames[] productNameGroup = productNames.values();
        Random random = new Random();
        return productNameGroup[random.nextInt(productNameGroup.length)].name();
    }


    private String randomSkus() {

        Random random = new Random();
        int skuCategoryNum = random.nextInt(3);

        String skuInfo ="[";

        totalPrice = 0;
        for(int i = 1; i <= 3; i++) {

            int skuNum = random.nextInt(3)+1;
            float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)];
            float totalSkuPrice = skuPrice * skuNum;
            String skuName = randomProductNames();
            String skuCode = randomCharactersAndNumbers(10);
            skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode
                    + " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";";
            totalPrice += totalSkuPrice;
        }


        skuInfo += " ]";

        return skuInfo;
    }

    private String calculateOrderPrice() {

        Random random = new Random();
        discount = discountGroup[random.nextInt(discountGroup.length)];
        paymentPrice = totalPrice - discount;

        String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]";

        return priceInfo;
    }

    private String randomCharactersAndNumbers(int length) {

        String characters = "abcdefghijklmnopqrstuvwxyz0123456789";
        String randomCharacters = "";
        Random random = new Random();
        for (int i = 0; i < length; i++) {
            randomCharacters += characters.charAt(random.nextInt(characters.length()));
        }
        return randomCharacters;
    }

    private String randomNumbers(int length) {

        String characters = "0123456789";
        String randomNumbers = "";
        Random random = new Random();
        for (int i = 0; i < length; i++) {
            randomNumbers += characters.charAt(random.nextInt(characters.length()));
        }
        return randomNumbers;
    }

    public static void main(String[] args) {

        LogGenerator generator = new LogGenerator();
        generator.generate();
    }
}

運行的時候報錯:

log4j:WARN No appenders could be found for logger (com.comany.log.generator.LogGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

所以增加一個 log4j.properties

log4j.rootLogger=INFO,Console,File

#控制台日志
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.Target=System.out
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=[%p][%t][%d{yyyy-MM-dd HH\:mm\:ss}][%C] - %m%n

#普通文件日志
log4j.appender.File=org.apache.log4j.RollingFileAppender
log4j.appender.File.File=logs/generator.log
log4j.appender.File.MaxFileSize=10MB
#輸出日志,如果換成DEBUG表示輸出DEBUG以上級別日志
log4j.appender.File.Threshold=ALL
log4j.appender.File.layout=org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern=[%p][%t][%d{yyyy-MM-dd HH\:mm\:ss}][%C] - %m%n

然后在外層目錄能夠看到有Log目錄生成:

 

直接把target目錄拷貝到Linux機器上,發現找不到依賴包 log4j。

需要用 Intellij 進行打包。在File -> Project Structure里面。

 

然后應該會自動生成Jar包(也可以Build->Build Artifacts) LogGenerator.jar ,拷貝到Linux機器上。

 

但是開始不能運行。提示找不到Manifest.mf。搜索之后,發現,要選第二個選項“copy to ...”,而且必須把Manifest的目錄從java改到Resources才行。

這樣選項之后,生成的目錄里面有兩個jar。目錄拷貝到Linux,然后運行:

$ java -jar LogGenerator.jar 

能夠看到logs目錄下有新的日志生成:
$ ll logs/*
-rw-rw-r--  1 work work 12328 Nov  7 20:12 logs/generator.log

但是Linux的log文件都是亂碼。試了改SecureCRT配置什么的,都沒有用。

查看到文件編碼是 ASCII TEXT

只好用  (printf "\357\273\277";cat generator.log) > File2 來修改文件編碼格式。

還是亂碼。在log4j的配置文件里面加上

log4j.appender.file.encoding=UTF-8

還是亂碼。加上上面哪個 BOM符號,還是亂碼。

 

這時候把mac上面的日志拷貝到Linux上,發現是正常的。那么還是log4j打印的地方出了問題。

再仔細檢查log4j的地方,發現上面log4j配置里面的file應該大寫才行,要與上下文一致:

log4j.appender.File.encoding=UTF-8

修改之后,重新生成artifact,拷貝,運行。能看到中文啦:

[INFO][main][2016-11-07 23:50:51][com.comany.log.generator.LogGenerator] - orderNumber: 494341478533851064 | orderDate: 2016-11-07 23:50:51 | paymentNumber: Alipay-46983228 | paymentDate: 2016-11-07 23:50:51 | merchantName: 守望先峰 | sku: [ skuName: 黑色連衣裙 skuNum: 3 skuCode: 06vteu0ewx skuPrice: 899.0 totalSkuPrice: 2697.0; skuName: 塑身牛仔褲 skuNum: 3 skuCode: oz13bdht0w skuPrice: 2000.0 totalSkuPrice: 6000.0; skuName: 圓腳牛仔褲 skuNum: 3 skuCode: geuum757jk skuPrice: 399.0 totalSkuPrice: 1197.0; ] | price: [ totalPrice: 9894.0 discount: 100.0 paymentPrice: 9794.0 ]
[INFO][main][2016-11-07 23:50:51][com.comany.log.generator.LogGenerator] - orderNumber: 623011478533851071 | orderDate: 2016-11-07 23:50:51 | paymentNumber: Alipay-58335677 | paymentDate: 2016-11-07 23:50:51 | merchantName: 暴雪公司 | sku: [ skuName: 黑色連衣裙 skuNum: 1 skuCode: mp9fbajaj9 skuPrice: 699.0 totalSkuPrice: 699.0; skuName: 黑色連衣裙 skuNum: 1 skuCode: oaiwi0xj3z skuPrice: 2000.0 totalSkuPrice: 2000.0; skuName: 黑色連衣裙 skuNum: 1 skuCode: tkbd5a91iq skuPrice: 399.0 totalSkuPrice: 399.0; ] | price: [ totalPrice: 3098.0 discount: 100.0 paymentPrice: 2998.0 ]

 

安裝Flume

參考 http://www.aboutyun.com/thread-8917-1-1.html

http://www.cnblogs.com/smartloli/p/4468708.html

下載了apache-flume-1.7.0-bin.tar.gz , 拷貝到 m42n06:/home/work/data/installed 

解壓后在conf目錄 cp flume-env.sh.template flume-env.sh

把JAVA_HOME改對:

export JAVA_HOME=/home/work/.jumbo/opt/sun-java8/

運行命令,報錯:

$ bin/flume-ng version
bin/flume-ng: line 82: syntax error in conditional expression: unexpected token `('
bin/flume-ng: line 82: syntax error near `^java\.library\.path=(.'
bin/flume-ng: line 82: `      if [[ $line =~ ^java\.library\.path=(.*)$ ]]; then'

懷疑又是shell腳本的原因。。

用jumbo 裝了一個 zsh,但是執行的時候貌似還是有點問題。再用jumbo裝一個coreutils試試吧。

裝好coreutils之后 bash版本沒有變化,試了一下還是不行,只好還是用zsh。

把flume-ng腳本的第一行改成 #!/home/work/.jumbo/bin/zsh-5.0.2

運行報錯:

$ bin/flume-ng version
bin/flume-ng version
bin/flume-ng:cd:409: too many arguments
run_flume:13: no such file or directory: /home/work/data/installed/apache-flume-1.7.0-bin/bin/java

發現原來是要指定配置文件,而且不能指定文件名,只能指定目錄,如下:

$ bin/flume-ng version -c conf
bin/flume-ng version -c conf
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

准備配一個flume,sink輸出到hdfs上面,注意我們能夠通過兩種命令方式訪問hdfs上面的文件:

$ ../hadoop-2.7.3/bin/hdfs dfs -cat /output/part-00000
../hadoop-2.7.3/bin/hdfs dfs -cat /output/part-00000
      5       5      15

$ ../hadoop-2.7.3/bin/hadoop fs -cat /output/part-00000
../hadoop-2.7.3/bin/hadoop fs -cat /output/part-00000
      5       5      15

配置成hdfs的方式:

agent1.sources = origin
agent1.channels = memoryChannel
agent1.sinks = hsink

# For each one of the sources, the type is defined
agent1.sources.origin.type = exec
agent1.sources.origin.command = tail -f /home/work/data/LogGenerator_jar/logs/generator.log
# The channel can be defined as follows.
agent1.sources.origin.channels = memoryChannel

# Each sink's type must be defined
agent1.sinks.hsink.type = hdfs
agent1.sinks.hsink.hdfs.path = /output/logOut
agent1.sinks.hsink.hdfs.fileType = DataStream
agent1.sinks.hsink.hdfs.writeFormat=TEXT
agent1.sinks.hsink.hdfs.rollInterval=1
agent1.sinks.hsink.hdfs.filePrefix=%Y-%m-%d
#Specify the channel the sink should use
agent1.sinks.hsink.channel = memoryChannel

# Each channel's type is defined.
agent1.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent1.channels.memoryChannel.capacity = 100

然后運行命令,還是報錯:

$ bin/flume-ng agent -n agent -f conf/flume-conf.properties  -c conf
Info: Sourcing environment configuration script /home/work/data/installed/apache-flume-1.7.0-bin/conf/flume-env.sh
Info: Including Hive libraries found via () for Hive access
+run_flume:13> /home/work/.jumbo/opt/sun-java8//bin/java -Xmx20m -cp '/home/work/data/installed/apache-flume-1.7.0-bin/conf:/home/work/data/installed/apache-flume-1.7.0-bin/lib/*:/lib/*' '-Djava.library.path=' org.apache.flume.node.Application ' -n agent1 -f flume-conf.properties'

日志 logs/flums.log顯示:

08 Nov 2016 17:17:01,783 ERROR [main] (org.apache.flume.node.Application.main:348)  - A fatal error occurred while running. Exception follows.
org.apache.commons.cli.MissingOptionException: Missing required option: n
        at org.apache.commons.cli.Parser.checkRequiredOptions(Parser.java:299)
        at org.apache.commons.cli.Parser.parse(Parser.java:231)
        at org.apache.commons.cli.Parser.parse(Parser.java:85)
        at org.apache.flume.node.Application.main(Application.java:263)

無語。。

又安裝了flume 1.5試了下,還是不行。

又仔細檢查了一下,很可能是java版本太高導致的。只能再裝下java1.7試試看。發現還是不行。

 

最后發現是運行的java命令里面貌似有點問題:

/home/work/.jumbo/opt/sun-java8//bin/java -Xmx20m -cp '/home/work/data/installed/apache-flume-1.7.0-bin/conf:/home/work/data/installed/apache-flume-1.5.0-bin/lib/*' '-Djava.library.path=' org.apache.flume.node.Application ' -n agent -f flume-conf.properties'

需要把最后的引號去掉。

然后改成向日志輸出。配置文件flume-conf.properties如下:

agent.sources = origin
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.origin.type =  exec
agent.sources.origin.command = tail -f /home/work/data/LogGenerator_jar/logs/generator.log

# The channel can be defined as follows.
agent.sources.origin.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

Java命令如下:

/home/work/.jumbo/opt/sun-java8//bin/java -Xmx20m -cp '/home/work/data/installed/apache-flume-1.7.0-bin/conf:/home/work/data/installed/apache-flume-1.5.0-bin/lib/*' '-Dflume.root.logger=INFO,console' org.apache.flume.node.Application   -n agent -f conf/flume-conf.properties 

然后貌似能夠監聽到日志了。。。

其他的地方,還要再處理。。。

不知道為什么。登出重新登錄之后,居然就好了。。。

用了HDFS作為sink的新的配置文件內容:

agent.sources = origin
agent.channels = memoryChannel
agent.sinks = hdfsSink

# For each one of the sources, the type is defined
agent.sources.origin.type =  exec
agent.sources.origin.command = tail -f /home/work/data/LogGenerator_jar/logs/generator.log
# The channel can be defined as follows.
agent.sources.origin.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /output/Logger
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.writeFormati = TEXT
agent.sinks.hdfsSink.hdfs.rollInterval = 1
agent.sinks.hdfsSink.hdfs.filePrefix=%Y-%m-%d

# 不加下面這一行會一直報錯:Expected timestamp in the Flume event headers
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp
= true #Specify the channel the sink should use agent.sinks.hdfsSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100

然后運行命令:

bin/flume-ng agent -n agent -f conf/flume-conf.properties  -c conf

然后查看日志 logs/flume.log,貌似成功了(因為直接tail -f 是有內容的)

08 Nov 2016 23:19:03,709 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:231)  - Creating /output/Logger/2016-11-08.1478618341857.tmp
08 Nov 2016 23:19:04,818 INFO  [hdfs-hdfsSink-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:357)  - Closing /output/Logger/2016-11-08.1478618341857.tmp
08 Nov 2016 23:19:04,828 INFO  [hdfs-hdfsSink-call-runner-9] (org.apache.flume.sink.hdfs.BucketWriter$8.call:618)  - Renaming /output/Logger/2016-11-08.1478618341857.tmp to /output/Logger/2016-11-08.1478618341857
08 Nov 2016 23:19:04,835 INFO  [hdfs-hdfsSink-roll-timer-0] (org.apache.flume.sink.hdfs.HDFSEventSink$1.run:382)  - Writer callback called.

在另一台機器05上面看下HDFS是否生成。

$ ~/data/installed/hadoop-2.7.3/bin/hadoop fs -ls /output/Logger
Found 5 items
-rw-r--r--   3 work supergroup       1187 2016-11-08 23:19 /output/Logger/2016-11-08.1478618341853
-rw-r--r--   3 work supergroup       1182 2016-11-08 23:19 /output/Logger/2016-11-08.1478618341854
-rw-r--r--   3 work supergroup       1188 2016-11-08 23:19 /output/Logger/2016-11-08.1478618341855
-rw-r--r--   3 work supergroup       1196 2016-11-08 23:19 /output/Logger/2016-11-08.1478618341856
-rw-r--r--   3 work supergroup       1176 2016-11-08 23:19 /output/Logger/2016-11-08.1478618341857

發現生成的是一個目錄,然后里面有按照時間戳生成的文件

然后打開一個文件看看:

$ ~/data/installed/hadoop-2.7.3/bin/hadoop fs -cat /output/Logger/2016-11-08.1478618810929
[INFO][main][2016-11-08 23:26:50][com.comany.log.generator.LogGenerator] - orderNumber: 966811478618810487 | orderDate: 2016-11-08 23:26:50 | paymentNumber: Wechat-92539661 | paymentDate: 2016-11-08 23:26:50 | merchantName: Sumsam | sku: [ skuName: 人字拖鞋 skuNum: 1 skuCode: 1q9vaq3484 skuPrice: 2000.0 totalSkuPrice: 2000.0; skuName: 朋克衛衣 skuNum: 3 skuCode: 7vwna4abw2 skuPrice: 2000.0 totalSkuPrice: 6000.0; skuName: 塑身牛仔褲 skuNum: 3 skuCode: 50qem6vlid skuPrice: 699.0 totalSkuPrice: 2097.0; ] | price: [ totalPrice: 10097.0 discount: 100.0 paymentPrice: 9997.0 ]

采用HDFS輸出成功。

 

然后安裝 Kafka。

Kafka安裝新起一篇:http://www.cnblogs.com/charlesblc/p/6046023.html 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM