中國移動實時數據分析-基於spark+kafka+flume


  這兩天主要是做了中國移動的實時數據分析一個小項目(可以說是demo了),這里記錄下來整個過程里面遇到的坑,首先安裝好flume,kafka,spark(基於代碼本地運行可以不安裝),redis,zookeeper 主要是為了熟悉一下整個的一個spark-streaming的一個整個流程,還有就是了解調優的地方。

  上述假設已經安裝好了相應的組件,然后就開始正式的踩坑之路:

  1.編寫一個java程序去讀取原始數據文件,模擬1s進行文件的插入一行,原始的數據文件格式如下:

    

     坑a

    .整個的數據格式是json,但是是一整行的。。。。

    解決a1:於是就想這去把這樣的數據轉化為json格式的,就去搗鼓了一下notepad++轉json格式的方法:notepad++上面的菜單欄中,插件-> plugins Admin..->search中直接查找就好了,然后找找有個install的按鈕點擊一下就ok了,然后各種確定,之后notepad++會自動重啟,重啟之后上面的菜單欄中,插件->就會多出一個JSON Viewer,然后就可以了。但是我操作的時候遇到了notepad++重啟之后沒有出現JSON Viewer(但是后來又出現了),

    解決a2:於是又去找了idea實現json格式的方法:setting->keymap->main enum->code->reformat code 這個功能是將文本格式化,該功能的快捷鍵默認是ctrl+shift+l,但是這個快捷鍵組合是有沖突的,所以將其轉化為ctrl+shift+s,修改后進行保存,然后創建一個xxx.json的文件,復制一行json數據到該文件中,然后全選,按下ctrl+shift+s即可轉化為標准的json文件格式

    

    相應的java實現代碼如下:

 
         
import java.io.*;
import java.util.ArrayList;
import java.util.List;

public class WriteCMCC {
public static void main(String[] args) {
List<String> allLines = getCmcc(args[0]);
System.out.println(allLines.size());
writeCmcc(allLines, args[1]);
}

/**
* 一次性讀取cmcc中的數據
* @return 存放在list中
*/
private static List<String> getCmcc(String path) {
BufferedReader br = null;
List<String> allLines = new ArrayList<String>();
try {
br = new BufferedReader(new FileReader(new File(path)));
String line = "";
while ((line = br.readLine()) != null) {
allLines.add(line);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (br != null) br.close();
} catch (IOException e) {
e.printStackTrace();
}
}

return allLines;
}

/**
* 寫入cmcc中的數據,一次寫入一個list的數據集
*/
private static void writeCmcc(List<String> cmcc, String path) {

BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(new File(path)));
for(String line : cmcc) {
bw.write(line);
bw.flush();
Thread.sleep(1000);
bw.newLine();
}

} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (bw != null) bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
 

    代碼寫好,然后測試完,然后打成jar包,丟到Linux准備運行。

    java -jar /home/soft/jar/write_cmcc_5_seconds.jar /home/soft/cmcc.log /home/soft/cmcc/cmcc_write.log

  2.flume編寫相應的conf去把數據抽取到kafka中(cmcc.conf)

    先啟動zookeeper,啟動kafka並創建topic(cmcc):

      zookeeper啟動命令:

        /home/soft/zookeeper-3.4.6/bin/zkServer.sh start(每個節點都需要啟動)
      kafka啟動命令:
        /home/soft/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /home/soft/kafka_2.11-0.10.1.0/config/server.properties &
      kafka創建topic:
        bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --create --topic cmcc --partitions 6 --replication-factor

      kafka查看所有的topic:
        bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --list

    然后編寫conf測試(cmcc.conf):

      

a1.sources = s1
a1.channels = c1

#這里先不使用該種方式去讀取文件,因為該方式flume會出如下的錯誤
#java.lang.IllegalStateException: File has been modified since being read: /home/soft/cmcc/cmcc_write.log
#原因:出現這個問題的原因是,當我們拷貝一個文件的時候,一些對文件進行了修改
#解決:最好的方法就是,確保大文件完全拷貝后,再讓flume來讀取,思路是將拷貝中的文件加上一個多余的后綴,flume一開始不會讀取文件,當文件拷貝完成后去掉多余的后綴,這個時候flume就會針對新文件進行讀取。
#a1.sources.s1.type =spooldir
#a1.sources.s1.spoolDir =/home/soft/cmcc
#a1.sources.s1.fileHeader= true

a1.sources.s1.type=exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = os1:9092,os2:9092,os3:9092
#創建好相應的topic
a1.channels.c1.kafka.topic = cmcc
#這個是自己定義的沒啥事情
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#這個一定要設置,否則就是個坑,寫入到kafka中的數據會被追加進一些數據,而且還是亂碼的
a1.channels.c1.parseAsFlumeEvent = false

#拼接source和channel
a1.sources.s1.channels=c1

     flume啟動命令:下面的a1就對應着上面的a1(控制台打印信息)
      bin/flume-ng agent -n a1 -c conf -f conf/cmcc.conf -Dflume.root.logger=INFO,console

  3.spark程序去讀取kafka的中的數據並將結果存放至redis中

    啟動redis:/usr/local/redis/bin/redis-server  /usr/local/redis/etc/redis.conf

    程序相應的配置:resources -> application.conf 

#kafka的相關參數
kafka.topic = "cmcc"
kafka.broker.list="os1:9092,os2:9092,os3:9092"
kafka.group.id="cmcc"
redis.host="xxx.xxx.xxx.xxx"
redis.db.index="0"

    主程序代碼:scala -> BootStarpApp

package app

import java.text.SimpleDateFormat

import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.{AppParams, Jpools}

object BootStarpApp {
  def main(args: Array[String]): Unit = {

    /**
      * 錯誤集:
      * 1.Caused by: org.apache.kafka.common.KafkaException: org.codehaus.jackson.map.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
      *   錯誤解釋,kafka在進行序列化實例對象的時候出錯
      *   查找原因:
      *   org.codehaus.jackson.map.deser.std.StringDeserializer是我們AppParas中導入的類型,可能是導錯了,
      *   查看后發現應該導入:import org.apache.kafka.common.serialization.StringDeserializer
      * 2. 程序出現INFO:Marking the coordinator os3:9092 (id: 2147483645 rack: null) dead for group cmcc_test2,且程序不再執行下去
      *   原因:因為kafka-clent程序默認讀取到kafka上的信息之后將host:os3返回作為主機節點去獲取數據,但是在本機中沒有配置相應的host與ip的映射,所有這里就無法直接進行訪問os3
      *   解決辦法;在windows中配置相應的ip與hostname的映射(kafka中的broker節點)
      * 3.json解析出錯:error parse false
      *   原因json格式錯誤
      *
      * 4.flume的坑:a0.channels.c1.parseAsFlumeEvent = false  1.7以后默認為true
      * 如果設置此項為 true,Kafka Sink 則會把數據按照標准的 Flume Event 格式(即Headers域和body域結合的數據結構)發送。Flume Event 中的 Headers 域通常是一些附加字段,可以是時間戳(比如時間戳攔截器指定的時間戳)、文件名(比如 spooldir Source 開啟的 fileHeader = true)等信息。但是 1.7.0 版本的 Flume 一旦開啟此配置,會導致 Headers 域里面的信息亂碼
      *
      * 5.flume異常崩潰 File has been modified since being read
      *   原因:出現這個問題的原因是,當我們拷貝一個文件的時候,一些對文件進行了修改,就會出現這個錯誤
      *   解決:最好的方法就是,確保大文件完全拷貝后,再讓flume來讀取,思路是將拷貝中的文件加上一個多余的后綴,flume一開始不會讀取文件,當文件拷貝完成后去掉多余的后綴,這個時候flume就會針對新文件進行讀取。
      *   另外針對大文件,flume的解決方案可以設置一個文件完成后綴:
      */

    val sparkConf = new SparkConf()

    sparkConf.setAppName("中國移動運營實時監控平台")
    sparkConf.setMaster("local[*]")

    /**
      *將rdd以kryo的序列化保存,以減少內存的使用
      */
    sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    /**
      * 對rdd進行壓縮,使用內存空間換去處理時間的方式,減少內存的使用
     */
    sparkConf.set("spark.rdd.compress", "true")

    /**
      *
      */
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100")

    /**
      * 進行優雅的停止程序
      */
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    /**
      * 每兩秒執行一個批次
      */
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    /**
      * 獲取kafka的數據
      * LocationStrategies:位置策略,如果kafka的broker節點與Excutor在同一台機器上給一種策略,不再一台機器上給另一種策略
      * 設定策略之后會以最有的策略進行獲取數據
      * 一般在企業中kafka節點與Excutor不會放到一台機器的,原因是kafka是消息存儲的,Executor是用來做消息計算的
      * 因此計算與存儲需要分開,存儲對磁盤要求高,計算對內存和cpu的要求更高
      * 如果Executor節點跟Broker的節點在一起的話就使用PreferBrokers策略,不再一起的話就使用preferConsisent策略
      * 使用preferConsisent策略的話,將來在kafka中拉去數據以后盡量將數據分散到所有的Executor上
      */
    val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent
      , ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams))


    stream.foreachRDD(rdd => {

      /**
        * {
        * "bussinessRst": "0000",
        * "channelCode": "0705",
        * "chargefee": "10000",
        * "clientIp": "125.82.117.133",
        * "endReqTime": "20170412080609613",
        * "idType": "01",
        * "interFacRst": "0000",
        * "logOutTime": "20170412080609613",
        * "orderId": "384681890175026754",
        * "prodCnt": "1",
        * "provinceCode": "280",
        * "requestId": "20170412080450886738519397327610",
        * "retMsg": "成功",
        * "serverIp": "172.16.59.241",
        * "serverPort": "8088",
        * "serviceName": "sendRechargeReq",
        * "shouldfee": "9950",
        * "startReqTime": "20170412080609503",
        * "sysId": "15"
        * }
        */

      /**
        *  業務邏輯:
        *   serviceName:reChargeNotifyReq,則為充值通知的記錄
        *   requestId:包含充值的日期(訂單開始時間)
        *   bussinessRst:是否成功 0000 為成功,其他為不成功
        *   chargefee:充值的金額
        *   receiveNotifyTime:訂單結束時間
        *
        */

      /**
        * 我們可以通過serviceName字段來確定,如果該字段是reChargeNotifyReq則代表該條數據是充值通知部分的數據。
        * 獲取所有的充值通知日志
        */
      val baseData = rdd.map(cr => {
        print(cr.value())
        JSON.parseObject(cr.value())
      }).filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache()

      /**
        * 獲取每天充值成功的訂單筆數
        * 回憶:
        *   wordcount flatMap-》map-》reduceByKey
        */
      val totalSucc = baseData.map(obj=> {
        //獲取日期
        val reqId = obj.getString("requestId")
        //獲取日期
        val day = reqId.substring(0, 8)
        //取出該條充值是否成功的標志
        val result = obj.getString("bussinessRst")
        val flag = if(result.equals("0000")) 1 else 0
        (day, flag)
      }).reduceByKey(_+_)

      /**
        * 獲取充值成功的訂單金額
        */
      val totalMoney = baseData.map(obj=> {
        val reqId = obj.getString("requestId")
        //獲取日期
        val day = reqId.substring(0, 8)
        //去除該條充值是否成功的標記
        val result = obj.getString("bussinessRst")
        val fee = if(result.equals("0000")) obj.getString("chargefee").toDouble else 0
        (day, fee)
      }).reduceByKey(_+_)

      //總訂單數
      val total = baseData.count()

      /**
        * 獲取充值成功的充值時長
        */
      val totalTime = baseData.map(obj=> {
        var reqId = obj.getString("requestId")
        //獲取日期
        val day = reqId.substring(0, 8)

        //取出該條充值是否成功的標示
        val result = obj.getString("bussinessRst")
        //時間格式為:yyyyMMddHHmissSSS
        val endTime = obj.getString("receiveNotifyTime")
        val startTime = reqId.substring(0, 17)

        val format = new SimpleDateFormat("yyyyMMddHHmissSSS")

        val cost = if(result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0
        (day, cost)
      }).reduceByKey(_+_)

      /**
        * 將數據存儲到redis中:
        * (CMCC-20170412,35)
        */
      totalSucc.foreachPartition(itr=> {
       val jedis = Jpools.getJedis
        itr.foreach(tp => {
         // print("CMCC-"+tp._1, tp._2)
          jedis.incrBy("CMCC-"+tp._1, tp._2)
        })
      })
    })


    ssc.start()
    ssc.awaitTermination()
  }
}

  兩個工具類:

package utils

import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer

object AppParams {
  /**Scala中使用關鍵字lazy來定義惰性變量,實現延遲加載(懶加載)。
  惰性變量只能是不可變變量,並且只有在調用惰性變量時,才會去實例化這個變量。
    load中可以指定相應的配置文件,但是不指定的情況下默認去讀取resources下的application.conf文件
      默認規則:application.conf->application.json->application.properties
    **/
  private lazy val config = ConfigFactory.load()

  val redisHost = config.getString("redis.host")
  val selectDBIndex = config.getInt("redis.db.index")
  /**
    * 返回訂閱的主題
    */
  val topic = config.getString("kafka.topic").split(",")

  /**
    * kafka集群所在的主機和端口
    */
  val brokers:String = config.getString("kafka.broker.list")

  /**
    * 消費者的id
    */
  val groupId = config.getString("kafka.group.id")

  /**
    * 將kafka的相關參數進行分裝到map中
    */
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer"-> classOf[StringDeserializer],
    "group.id"-> groupId,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> "false"
  )
}
package utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

/**
  * 創建一個redis的線程池
  */
object Jpools {
  private val poolConfig = new GenericObjectPoolConfig
  poolConfig.setMaxIdle(5) //最大的空閑連接數為5,連接池中最大的空閑連接數,默認是8
  poolConfig.setMaxTotal(2000) //最大支持的連接數量,默認也是8

  //連接池是私有的,不能對外進行公開訪問
  private lazy val  jedisPool = new JedisPool(poolConfig, AppParams.redisHost)


  def getJedis = {
    val jedis = jedisPool.getResource
    jedis.select(AppParams.selectDBIndex)
    jedis
  }
}

  pom文件

<dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


        <!-- 導入kafka的依賴-->
       <!-- <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>-->
        <!-- 指定kafka-client API的版本-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
        <!-- 導入spark streaming 與kafka的依賴包-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

    </dependencies>

 

 

問題總結:

  1.json格式的轉換 (已解決)

  2.flume讀取數據到kafka后數據亂碼增多問題(已解決)

  3.flume  spooldir  讀取文件的同時對文件更改造成的java.lang.IllegalStateException:File has been modified since being read:問題 (待解決)

  4.上述spark主程序代碼優化問題 (待解決)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM