Spark Stream整合flum和kafka,數據存儲在HBASE上,分析后存入數據庫


開發環境:Hadoop+HBASE+Phoenix+flum+kafka+spark+MySQL

默認配置好了Hadoop的開發環境,並且已經安裝好HBASE等組件。

下面通過一個簡單的案例進行整合:

這是整個工作的流程圖:

 

第一步:獲取數據源

  由於外部埋點獲取資源較為繁瑣,因此,自己寫了個自動生成類似數據代碼:

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Genlog {
    static String[] srcurls={"http://www.baidu.com","http://www.sougou.com",
                            "http://www.360.com","http://www.taobao.com"};
    static String[] oss={"android","ios","mac","win","linux"};
    static String[] sexs={"f","m"};

    public static void main(String[] args) throws InterruptedException {
        //http://xxxxx?refurl=http://www.baidu.com&pid=xx&os=andriod&sex=f/m&wx=abc
        Logger logger=LogManager.getLogger(Genlog.class);
        while(true){
        String srcurl=srcurls[(int) (Math.random()*srcurls.length)];
        String os=oss[(int) (Math.random()*oss.length)];
        String sex=sexs[(int) (Math.random()*sexs.length)];

            String url=String.format("http://xxxxx?refurl=%s&pid=xx&os=%s&wx=abc&sex=%s/m",srcurl,os,sex);
            logger.info(url);
            Thread.sleep(300);
            
        }

    }
}

這部分代碼表示,在啟動程序后,將會不斷生成類似文中注釋類型的數據,這樣flume的source端就可以源源不斷的獲取到數據。

pom.xml文件就是關於log4j的依賴api  core  和flum-ng即可,不再贅述。

  同時,在項目中,要編寫連接虛擬機的配置文件,放在resource下,配置文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <Flume name ="hi" compress="false" type="avro">
            <agent host ="192.168.110.101" port="44444"></agent>
        </Flume>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="hi"></AppenderRef>
        </Root>
    </Loggers>
</Configuration>

這樣,我們的配置數據源的項目就已經完成了,當然,在實際生產中,肯定要比這復雜的多。

第二步:配置flume

配置flume/config/a1.conf,文件可以直接touch創建,配置如下:

# 定義資源  管道 目的地
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 設置源的屬性 
a1.sources.r1.type =avro
a1.sources.r1.bind=192.168.110.101
a1.sources.r1.port=44444

# 設置目的地屬性
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.producer.acks = 0
a1.sinks.k1.kafka.topic = mylog
a1.sinks.k1.kafka.bootstrap.servers = 192.168.110.101:9092

# 管道屬性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 把源通過管道連接到目的地
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意更換自己的IP地址,同時,根據需求更改acks的結果,如1、-1、0,具體介紹看官網即可。此時flume是依賴kafka的。所以啟動順序請先啟動kafka,否則會報錯。

第三步編寫spark stream項目

項目目標主要是將kafka中的數據拉取下來消費,通過內部邏輯,將數據轉變為DataFrame格式,通過Phoenix存儲在HBASE上,以方便對數據進行分析。

項目配置文件pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yzhl</groupId>
    <artifactId>spark-streaming-phoneix-kafkademo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
           </plugin>
        </plugins>
    </build>
</project>

邏輯代碼如下:

 

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object LogSave  extends App {
  //定義brokers, groupId, topics
  /**
    * 關於driver和worker的執行位置的代碼
    */
  val Array(brokers, groupId, topics) = Array("192.168.86.128:9092","mylog","mylog")//driver
  //spark上下文對象相當於connection
  val spark = SparkSession.builder().appName("mylog").getOrCreate()//driver
  //創建spark streaming 上下文
  val ssc = new StreamingContext(spark.sparkContext, Seconds(5))//driver
  val topicsSet = topics.split(",").toSet//driver
  //定義kafka配置屬性
  val kafkaParams = Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
    ConsumerConfig.GROUP_ID_CONFIG -> groupId,
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])//driver
  //使用KafkaUtils工具來的createDirectStream靜態方法創建DStream對象
  val messages = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))//driver
  //messages中的每一條數據都是一個(key,value) 其中value指的是log中的一行數據
  val lines = messages.map(_.value)//worker
  import spark.implicits._//driver  worker
  //在driver端編譯成了class,之后上傳到worker中
  case class MyRecord(id:String,time:String,srcUrl:String,os:String,sex:String)
  //為記錄產生ID
  lines.print(5)//driver
  //foreachRDD在driver上執行,
  lines.foreachRDD((rdd,t) =>{
    val props = scala.collection.mutable.Map[String,String]()//driver
    props += "table" -> "tb_mylog"
    props += "zkUrl" -> "jdbc:phoenix:hadoop"
    //從下面到toDF.都會放在worker上執行
    rdd.zipWithUniqueId().map( x =>{
      val p =""".+(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}).+refurl=(.*)&.+&os=(.+)&.+&sex=(.+)""".r
      x._1 match {
        case p(time,srcUrl,os,sex) => MyRecord(t.toString()+x._2,time,srcUrl,os,sex)
        case _ => MyRecord(null,null,null,null,null)
      }
    }).filter(_.id !=null).toDF().write.format("org.apache.phoenix.spark")
      .mode("overwrite")
      .options(props).save();//todf--save之間都是在worker上執行,save()是在driver上
  })
  ssc.start()//driver
  ssc.awaitTermination()//driver


  /**
    * spark的所有上下文的創建都在driver上執行
    * spark的所有action都在driver上執行
    * spark的所有transformation都在worker上執行
    *
    */

}

 

 

 

這部分代碼可以將拉取的數據進行格式化 的存儲。其中正則表達式是對數據行的拆分,並通過Phoenix存儲到HBASE上。

第四步:項目打包

 我用的idea,打包很簡單,maven-->plugins-->scala:compile(編譯)-->Lifecycle的package   即可打包完成,可在target目錄下查看。

eclipse的打包也很簡單,網上一大堆。

到此,在代碼階段的操作基本完成,接下來就是在集群上的運行過程。

第五步:啟動各個進程

本次的部署是在yarn上的,所以肯定有yarn的啟動。我們按照順序啟動。

1,啟動HDFS:start-dfs.sh

2.啟動yarn:start-yarn.sh

3.啟動zookeeper:如果是自己安裝的zookeeper,可以直接用./zkServer.sh start

   如果是用kafka自帶的zookeeper,啟動命令:bin/zookeeper-server-start.sh config/zookeeper.properties

4.啟動kafka:bin/kafka-server-start.sh config/server.properties

5.啟動flume:bin/flume-ng agent -n a1 -c conf -f conf/a1.conf    此時可以啟動數據源的生成項目運行

6.啟動kafka的消費者consumer:bin/kafka-console-consumer.sh --bootstrap-server 192.168.110.101:9092 --topic mylog

7.啟動HBASE:start-hbase.sh

8.啟動Phoenix: ./sqlline.py localhost

第六步:以上進程都啟動成功后,可以將打包好的jar包上傳到系統路徑

此時有一個問題一定要注意,不然肯定會報錯,列如空指針的異常,但無法查詢錯誤具體信息,根本原因是缺少對於的依賴包。

在下載依賴包的時候,我們還需要將兩個必須的依賴包導入到spark的jars文件中,因為我們打包的瘦包,無法包含所有的依賴包。

這兩包是:spark-streaming-kafka-0-10_2.11和他的依賴包kafka_2.11。根據你自己的版本不同,找到對應的版本依賴包,否則會報出版本依賴的異常信息。

添加方法:cd到spark的jars目錄先,在maven官網,右鍵點擊相應的依賴包的jar,復制路徑,運用命令 ”wget 復制的路徑”,也可以自己下載到本地后上傳。

接着,在啟動的Phoenix中,創建我們自己的表,在編碼中的表名為tb_mylog,所以創建表:

!create table tb_mylog(id varchar(255) primary key,time varchar(255),srcUrl varchar(255),os varchar(255),sex varchar(20));

此時!tables里面就會存在了tb_mylog個表。

第七步:運行上傳的jar包,處理數據

運行命令:spark-submit --master yarn --deploy-mode client --class 包名  jar包

運行后,可以看到數據在不斷的寫入,spark Stream在不斷的獲取,此時,進入Phoenix中,

select *  from tb_mylog,可以看到數據在表中存在,並不斷的增長,如果機器性能不是很好,建議運行一段時間后,可以停掉源數據的生成。

對於關閉HBASE,需要注意,不可直接stop掉HBASE,這樣數據就會丟失或者出發預寫機制,無法將數據完全的保存到HDFS上,所以停掉HBASE的最好方式是:先運行hbase-daemon.sh stop master,然后在運行stop-hbase.sh. 這樣既可。

 由於是基於yarn模式,所以要讀取到yarn-site.xml文件,所以在spark-env.sh中配置HADOOP_CONF-DIR=Hadoop路徑,或者YARN_CONF_DIR=yarn路徑。

注意:

  如果用Phoenix連接spark,那么需要Phoenix里的Phoenix-spark-hbase.jar和Phoenix-HBASE-client.jar。

,worker節點通過Phoenix連接HBASE時,自己有了客戶端,那么HBASE的regionserver端需要Phoenix-HBASE-server.jar和Phoenix-spark-hbase.jar兩個包。

 

flume通信數據源:通過通信協議avro.  給到flume的source處,通過配置channel后,得到下沉的位置,即得到kafka的producer,然后通過worker節點進行消費,消費形式是kafkaDStream。

接下來是數據的分析,然后存儲到MySQL中。

第八步:存儲到數據庫中的編碼

新建項目:

import org.apache.spark.sql.{SaveMode, SparkSession}

object ETLSparkSql extends App {

  val spark = SparkSession.builder().appName("from-hbase-etl-to-mysql using spark+phoenix").getOrCreate()//driver
  val props = scala.collection.mutable.Map[String,String]() //driver
  props += "table" -> "tb_mylog"
  props += "zkUrl" -> "hadoop:2181"
  val df = spark.read.format("org.apache.phoenix.spark").options(props).load();
  df.createOrReplaceTempView("tb_mylog")
  val df2 = spark.sql("select srcUrl,count(1) as count_nums from tb_mylog group by srcUrl");

  df2.createOrReplaceTempView("tb_url_count")
  val sql =
    """
      |select
      |       case when srcUrl = 'http://www.baidu.com' then count_nums
      |                                else 0 end as baidu,
      |       case when srcUrl = 'http://www.souguo.com' then count_nums
      |                                else 0 end as souguo,
      |       case when srcUrl = 'http://www.360.com' then count_nums
      |                                else 0 end as `360`,
      |       case when srcUrl = 'http://www.taobao.com' then count_nums
      |                                else 0 end as `taobao`,
      |       case when srcUrl not in  ('http://www.baidu.com','http://www.souguo.com','http://www.taobao.com','http://www.360.com') then count_nums
      |                                else 0 end as `qita`
      |       from  tb_url_count
    """.stripMargin
  val df3 = spark.sql(sql)

  df3.createOrReplaceTempView("tb_case")
  val jdbcops = scala.collection.mutable.Map[String,String]() //driver
  props += "table" -> "tb_log_count"
  props += "url" -> "jdbc:mysql://192.168.86.1:3306/logdb"
  props += "user" -> "root"
  props += "password" -> "root"
  props += "driver" -> "com.mysql.jdbc.Driver"

  spark.sql("select sum(baidu),sum(souguo),sum(`360`),sum(taobao),sum(qita) from tb_case").write.format("jdbc").mode(SaveMode.Append).options(jdbcops).save()

 println("任務提交,等待結果")



}

 第九步:創建數據庫和表

創建logdb的數據庫,創建表tb_log_count,列名分別為id,baidu,souguo,360,taobao,qita。

然后對項目進行編譯和打包,上傳到客戶端driver上,

啟動HDFS,啟動yarn,啟動HBASE,同時可以執行編譯運行語句:

spark-submit  --master yarn --deploy-mode client ETLSparkSql 包名

  到此為止,我們的數據的獲取,數據的處理,數據的存儲,數據的存庫都已經完成,可以在MySQL數據庫中查看結果了。

第十步:數據庫數據的展示

我們用到的技術是Dubbo,對項目做微服務。本項目的Dubbo框架如下:

 

下面開始建立我們的項目

1.建立entity:

  建立一個maven項目,創建一個實體類對象,並實現序列化接口,以便讀取數據庫對象。設置對應數據庫的屬性,並添加set和get方法,以方便后面的過程調用。

  同時,在pom文件中,添加<packaging>jar</packaging>用來打包,此時可以通過install進行打包,可以在本地磁盤的.m2相應的目錄中找到對用的jar文件。

2.創建dao-interface項目

此時,創建的項目pom文件中同樣加入jar,另外,將上一個entity項目中pom文件中的信息作為本項目的依賴,這樣兩個項目就可以關聯到一起了。接口類寫到了一個裝載實體的列表list方法。然后同樣,通過install進行打包。

3.創建dao-impl類,即dao的實現類:

此時創建的項目是spring-boot項目,這個項目要用到mybatis進行整合。

創建后,首先導入依賴問題,在pom文件中加入依賴:

       <dependency>
                <groupId>com.yzhl</groupId>
                <artifactId>dao-api</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba.boot</groupId>
                <artifactId>dubbo-spring-boot-starter</artifactId>
                <version>0.2.0</version>
            </dependency>

證明此時依賴的時上一個項目dao接口,同時還依賴了Dubbo.

接下來,創建一個接口類,同樣具有的時實體類的集合方法。有了接口,需要做映射文件,創建映射文件mapper.xml,文件內容大致為

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org//dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yzhl.dao.LogMapper">
    <select id="list" resultType="com.yzhl.commen.Logentity">
        select * from tb_log_count
    </select>

</mapper>

 映射完成,需要通過App做掃描,添加掃描注解:@MapperScan(basePackages = "com.yzhl.dao")

接下來編寫實現類:

@Service
@Component
public class LogServiceImpl implements  LogService {
    @Autowired
    private LogMapper logMapper;
    
    @Override
    public List<Logentity> list() {
        return logMapper.list();
    }
}

 同時配置properties.yml文件:

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/logdb
    username: root
    password: XXoo0321
    driver-class-name: com.mysql.jdbc.Driver
mybatis:
  mapper-locations: classpath:mapping/*xml
dubbo:
  application:
    id: dao-impl
    name: dao-impl
  protocol:
    id: dubbo
    name: dubbo
    port: 9999
  registry:
    id: my-1
    address: zookeeper://192.168.110.101:2181
  scan:
    basePackages: com.yzhl.dao

 到此,dao的實現類也已經完成了。

4.創建web項目:

同樣是spring-boot項目,pom文件依然需要dao接口項目和Dubbo的依賴,導入即可。

配置properties.yml文件:

server:
  port: 8888
dubbo:
  application:
    id: web
    name: web
  protocol:
    id: dubbo
    name: dubbo

  registry:
    id: my-2
    address: zookeeper://192.168.110.101:2181
  scan:
    basePackages: com.yzhl.webs

 如果是非本地操作,需要在protocol中添加port端口號,且不能與前面實現類的相同,本地操作可不用添加。

創建Controller對象:

@RestController
@RequestMapping("/log")
public class LogController {
    @Reference//因為是外部的對象,這個注入只能用阿里的
private LogService logService;

    @GetMapping("list")
    @ResponseBody
    public List<Logentity> list(){
        return logService.list();
    }
}

 到此,我們對數據庫的資源獲取已經完成,接下來就是利用Angular進行展示效果的編寫。

第十一步:Angular展示效果圖

 

 

新手上路,有不對的地方還請指正。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM