sparkstraming 接收kafka数据到mysql(offset保存在zk)


代码共三部分组成

1.zookeeper 初始化

/**
 * @Author: 唐
 * @Date: 2020/3/25 20:19
 */
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkMarshallingError
import org.I0Itec.zkclient.serialize.ZkSerializer
object ZKUtil {
  def initZKClient(zkServers : String,sessionTimeout : Int,connectionTimeout: Int): ZkClient ={              --需要在逻辑代码中传入三个参数 1.zk集群地址 2.会话超时时间 3.连接超时时间
    new ZkClient(zkServers,sessionTimeout,connectionTimeout,new ZkSerializer {                               --数据写入到zk中的时候,需要对数据进行序列化(serialize)和反序列化(deserialize)
      override def serialize(data: scala.Any): Array[Byte] = {                            --注:序列化就是指将对象信息转化为可以传输的位和字节
        try {
          data.toString.getBytes("UTF-8")
        } catch  {
          case _: ZkMarshallingError => null
        }
      }

      override def deserialize(bytes: Array[Byte]): AnyRef = {
        try{
          new String(bytes,"UTF-8")
        }catch {
          case _: ZkMarshallingError => null
        }
      }
    })
  }

}

 

2.数据库连接池

/**
 * @Author: 唐
 * @Date: 2020/3/25 23:28
 */
import java.sql.{Connection,DriverManager}
import java.util.concurrent.ConcurrentLinkedDeque
object ConnectionPool {
  private var queue: ConcurrentLinkedDeque[Connection] = _
  Class.forName("com.mysql.jdbc.Driver")
  def getConnection()={
    if (queue == null) queue=new ConcurrentLinkedDeque[Connection]()
    if (queue.isEmpty){
      for (i<- 1 to 10){
        val conn = DriverManager.getConnection("jdbc:mysql://min01:3306/syllabus?useUnicode=true&characterEncoding=utf8", "root", "123456")
        conn.setAutoCommit(false)         --提交由程序负责
        queue.offer(conn)
      }
    }
    queue.poll()
  }
  def returnConnection(conn:Connection)={
    queue.offer(conn)
  }
}

 

3.代码逻辑

/**
 * @Author: 唐
 * @Date: 2020/3/25 22:32
 */
/**
 * @Author: 唐
 * @Date: 2020/3/25 20:12
 */



import java.sql.{DriverManager, PreparedStatement}

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
import scala.util.{Success, Try}


object test01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Chapter8_4_5")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Durations.seconds(10))

    val topics = Set("spark_streaming_test")
    val kafkaParams = mutable.Map[String, String]()
    kafkaParams.put("bootstrap.servers", "min01:9092,min02:9092,min03:9092")                             --kafka 集群信息
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")      --反序列化 key (因为sparkstreaming要把数据从kafka里读出来)
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")   --反序列化 value
  kafkaParams.put("session.timeout.ms", "30000")
kafkaParams.put("enable.auto.commit", "false")
kafkaParams.put("max.poll.records", "100")
kafkaParams.put("kafka.topics", "spark_streaming_test") --kafka topic
kafkaParams.put("group.id", "g_spark_test")        --消费者所属消费组

val zkHost = "min01:2181,min02:2181,min03:2181"
val sessionTimeout = 120000
val connectionTimeout = 60000
val zkClient = ZKUtil.initZKClient(zkHost, sessionTimeout, connectionTimeout) --初始化 zookeeper 所需要的参数

val zkTopic = "spark_streaming_test" --
val zkConsumerGroupId = "g_spark_test"          --kafka有多少分区,zookeeper里面就会有多少文件,言外之意,就是为kafka里面的每个partition单独有一个zk文件用来保存offset偏移量

val zkTopicDir = new ZKGroupTopicDirs(zkConsumerGroupId, zkTopic) --这行连着下行作用是得到偏移量信息的存储目录
val zkTopicPath = zkTopicDir.consumerOffsetDir

val childrenCount = zkClient.countChildren(zkTopicPath) --得到偏移量目录下有多少文件
var kafkaStream: InputDStream[(String, String)] = null --会把kafka的数据流转化为 Dstream
var fromOffsets: Map[TopicAndPartition, Long] = Map() --用于确定本次消费的消息从何处开始

kafkaStream = if (childrenCount > 0) { --如果对应zk目录下没有存在偏移量文件则根据KafkaUtils.createDirectStream方法创建Dstream
      val req = new TopicMetadataRequest(topics.toList, 0)           --通过向kafka发送一个TopicMetadataRequest实例,得到kafka指定主题各个分区的状态
      val leaderConsumer = new SimpleConsumer("min01", 9092, 10000, 10000, "StreamingOffsetObserver")

val res = leaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption

val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
case None =>
Map[Int, String]()
}

for (partition <- 0 until childrenCount) {
val partitionOffset = zkClient.readData[String](zkTopicPath + "/" + partition)
val tp = TopicAndPartition(kafkaParams("kafka.topics"), partition)
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
val consumerMin = new SimpleConsumer(partitions(partition), 9092, 10000, 10000, "getMinOffset")
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
if (curOffsets.nonEmpty && nextOffset < curOffsets.head) {
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset)
}

val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.key, mam.message)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams.toMap, fromOffsets, messageHandler)
} else {
KafkaUtils.createDirectStream[
String,
String,
StringDecoder,
StringDecoder](ssc, kafkaParams.toMap, topics)
}

var offsetRanges: Array[OffsetRange] = null

val kafkaInputDStream = kafkaStream.transform { rdd => {
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
}

val kafkaValues = kafkaInputDStream.map(_._2)

val kafkaSplits = kafkaValues.map(_.split(",")).filter(_.length == 4)

val results =kafkaSplits.map(_.mkString(","))
results.foreachRDD(rdd => {
//在Driver端执行
rdd.foreachPartition(p => {
//在Worker端执行
//如果将输出结果保存到某个数据库,可在此处实例化数据库的连接器
p.foreach(result => {

val car = result.split(",")(0)
val longitude = result.split(",")(1)
val latitude = result.split(",")(2)
val timestamp = result.split(",")(3)

val conn=ConnectionPool.getConnection()

// val sql = "INSERT INTO syllabus.t_car_position (plate_num,longitude,latitude,timestamp ) values (?,?,?,? )"
// val sql = "INSERT INTO syllabus.people (id,name,area,sex ) values (?,?,?,? )"
// val sql = "INSERT INTO syllabus.keshi(time01,name01,count01,sign01 ) values (?,?,?,? )" --问号是指占位符,在sql创建的时候未被指定,通过与PreparedStatemen接口整合来传递数据,(PreparedStatemen要使用setstring方法来为占位符提供数据)

// val sql = "INSERT INTO syllabus.area(areaid,name,jing,wei) values (?,?,?,? )"
//
// val statement: PreparedStatement = conn.prepareStatement(sql)             --接口继承Statement,里面包含已编译的语句
// statement.setString(1,car)
// statement.setString(2,longitude)
// statement.setString(3,latitude)
// statement.setString(4,timestamp)
//
// statement.addBatch() --批处理调交
// statement.executeBatch() --批处理更新,通常要关闭sql自动提交,防止JDBC进行事务处理。(好处是在发生事务处理异常的时候由操作者决定要不要进行事务处理)

conn.commit()
ConnectionPool.returnConnection(conn)
println(result)

})
})
//ZkUtils不可序列化,所以需要在Driver端执行
for (o <- offsetRanges) {
ZkUtils.updatePersistentPath(zkClient, zkTopicDir.consumerOffsetDir + "/" + {
o.partition
}, o.fromOffset.toString)
println("本次消息消费成功后,偏移量状态:" + o)
}
})

ssc.start()
ssc.awaitTermination()
}
}
注: pom信息
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka_spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <!-- 额外指定可以通过如下链接下载Jar包依赖 -->
    <repositories>
        <repository>
            <id>1</id>
            <name>MAVEN-CENTRE</name>
            <url>http://central.maven.org/maven2/</url>
        </repository>
    </repositories>

    <!-- 添加相关依赖 -->
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.29</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
            <exclusions>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.4</version>
            <!-- 排除Spark依赖中关于Hadoop和Scala的依赖,以便于添加自已的版本 -->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>

                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

        <!-- 添加自己的Hadoop版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.29</version>
        </dependency>
    </dependencies>

    <!-- 编译Scala代码的插件-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
View Code

 源码地址

链接:https://pan.baidu.com/s/12OylfnDkfTnN6CLjElBKfg 
提取码:kxn1

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM