代碼共三部分組成
1.zookeeper 初始化
/** * @Author: 唐 * @Date: 2020/3/25 20:19 */ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkMarshallingError import org.I0Itec.zkclient.serialize.ZkSerializer object ZKUtil { def initZKClient(zkServers : String,sessionTimeout : Int,connectionTimeout: Int): ZkClient ={ --需要在邏輯代碼中傳入三個參數 1.zk集群地址 2.會話超時時間 3.連接超時時間 new ZkClient(zkServers,sessionTimeout,connectionTimeout,new ZkSerializer { --數據寫入到zk中的時候,需要對數據進行序列化(serialize)和反序列化(deserialize) override def serialize(data: scala.Any): Array[Byte] = { --注:序列化就是指將對象信息轉化為可以傳輸的位和字節 try { data.toString.getBytes("UTF-8") } catch { case _: ZkMarshallingError => null } } override def deserialize(bytes: Array[Byte]): AnyRef = { try{ new String(bytes,"UTF-8") }catch { case _: ZkMarshallingError => null } } }) } }
2.數據庫連接池
/** * @Author: 唐 * @Date: 2020/3/25 23:28 */ import java.sql.{Connection,DriverManager} import java.util.concurrent.ConcurrentLinkedDeque object ConnectionPool { private var queue: ConcurrentLinkedDeque[Connection] = _ Class.forName("com.mysql.jdbc.Driver") def getConnection()={ if (queue == null) queue=new ConcurrentLinkedDeque[Connection]() if (queue.isEmpty){ for (i<- 1 to 10){ val conn = DriverManager.getConnection("jdbc:mysql://min01:3306/syllabus?useUnicode=true&characterEncoding=utf8", "root", "123456") conn.setAutoCommit(false) --提交由程序負責 queue.offer(conn) } } queue.poll() } def returnConnection(conn:Connection)={ queue.offer(conn) } }
3.代碼邏輯
/** * @Author: 唐 * @Date: 2020/3/25 22:32 */ /** * @Author: 唐 * @Date: 2020/3/25 20:12 */ import java.sql.{DriverManager, PreparedStatement} import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest} import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Durations, Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable import scala.util.{Success, Try} object test01 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("Chapter8_4_5") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Durations.seconds(10)) val topics = Set("spark_streaming_test") val kafkaParams = mutable.Map[String, String]() kafkaParams.put("bootstrap.servers", "min01:9092,min02:9092,min03:9092") --kafka 集群信息 kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") --反序列化 key (因為sparkstreaming要把數據從kafka里讀出來) kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") --反序列化 value
kafkaParams.put("session.timeout.ms", "30000")
kafkaParams.put("enable.auto.commit", "false")
kafkaParams.put("max.poll.records", "100")
kafkaParams.put("kafka.topics", "spark_streaming_test") --kafka topic
kafkaParams.put("group.id", "g_spark_test") --消費者所屬消費組
val zkHost = "min01:2181,min02:2181,min03:2181"
val sessionTimeout = 120000
val connectionTimeout = 60000
val zkClient = ZKUtil.initZKClient(zkHost, sessionTimeout, connectionTimeout) --初始化 zookeeper 所需要的參數
val zkTopic = "spark_streaming_test" --
val zkConsumerGroupId = "g_spark_test" --kafka有多少分區,zookeeper里面就會有多少文件,言外之意,就是為kafka里面的每個partition單獨有一個zk文件用來保存offset偏移量
val zkTopicDir = new ZKGroupTopicDirs(zkConsumerGroupId, zkTopic) --這行連着下行作用是得到偏移量信息的存儲目錄
val zkTopicPath = zkTopicDir.consumerOffsetDir
val childrenCount = zkClient.countChildren(zkTopicPath) --得到偏移量目錄下有多少文件
var kafkaStream: InputDStream[(String, String)] = null --會把kafka的數據流轉化為 Dstream
var fromOffsets: Map[TopicAndPartition, Long] = Map() --用於確定本次消費的消息從何處開始
kafkaStream = if (childrenCount > 0) { --如果對應zk目錄下沒有存在偏移量文件則根據KafkaUtils.createDirectStream方法創建Dstream
val req = new TopicMetadataRequest(topics.toList, 0) --通過向kafka發送一個TopicMetadataRequest實例,得到kafka指定主題各個分區的狀態
val leaderConsumer = new SimpleConsumer("min01", 9092, 10000, 10000, "StreamingOffsetObserver")
val res = leaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
case None =>
Map[Int, String]()
}
for (partition <- 0 until childrenCount) {
val partitionOffset = zkClient.readData[String](zkTopicPath + "/" + partition)
val tp = TopicAndPartition(kafkaParams("kafka.topics"), partition)
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
val consumerMin = new SimpleConsumer(partitions(partition), 9092, 10000, 10000, "getMinOffset")
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
if (curOffsets.nonEmpty && nextOffset < curOffsets.head) {
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset)
}
val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.key, mam.message)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams.toMap, fromOffsets, messageHandler)
} else {
KafkaUtils.createDirectStream[
String,
String,
StringDecoder,
StringDecoder](ssc, kafkaParams.toMap, topics)
}
var offsetRanges: Array[OffsetRange] = null
val kafkaInputDStream = kafkaStream.transform { rdd => {
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
}
val kafkaValues = kafkaInputDStream.map(_._2)
val kafkaSplits = kafkaValues.map(_.split(",")).filter(_.length == 4)
val results =kafkaSplits.map(_.mkString(","))
results.foreachRDD(rdd => {
//在Driver端執行
rdd.foreachPartition(p => {
//在Worker端執行
//如果將輸出結果保存到某個數據庫,可在此處實例化數據庫的連接器
p.foreach(result => {
val car = result.split(",")(0)
val longitude = result.split(",")(1)
val latitude = result.split(",")(2)
val timestamp = result.split(",")(3)
val conn=ConnectionPool.getConnection()
// val sql = "INSERT INTO syllabus.t_car_position (plate_num,longitude,latitude,timestamp ) values (?,?,?,? )"
// val sql = "INSERT INTO syllabus.people (id,name,area,sex ) values (?,?,?,? )"
// val sql = "INSERT INTO syllabus.keshi(time01,name01,count01,sign01 ) values (?,?,?,? )" --問號是指占位符,在sql創建的時候未被指定,通過與PreparedStatemen接口整合來傳遞數據,(PreparedStatemen要使用setstring方法來為占位符提供數據)
// val sql = "INSERT INTO syllabus.area(areaid,name,jing,wei) values (?,?,?,? )"
//
// val statement: PreparedStatement = conn.prepareStatement(sql) --接口繼承Statement,里面包含已編譯的語句
// statement.setString(1,car)
// statement.setString(2,longitude)
// statement.setString(3,latitude)
// statement.setString(4,timestamp)
//
// statement.addBatch() --批處理調交
// statement.executeBatch() --批處理更新,通常要關閉sql自動提交,防止JDBC進行事務處理。(好處是在發生事務處理異常的時候由操作者決定要不要進行事務處理)
conn.commit()
ConnectionPool.returnConnection(conn)
println(result)
})
})
//ZkUtils不可序列化,所以需要在Driver端執行
for (o <- offsetRanges) {
ZkUtils.updatePersistentPath(zkClient, zkTopicDir.consumerOffsetDir + "/" + {
o.partition
}, o.fromOffset.toString)
println("本次消息消費成功后,偏移量狀態:" + o)
}
})
ssc.start()
ssc.awaitTermination()
}
}
注: pom信息
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>kafka_spark</artifactId> <version>1.0-SNAPSHOT</version> <!-- 額外指定可以通過如下鏈接下載Jar包依賴 --> <repositories> <repository> <id>1</id> <name>MAVEN-CENTRE</name> <url>http://central.maven.org/maven2/</url> </repository> </repositories> <!-- 添加相關依賴 --> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.29</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.41</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.11</artifactId> <version>1.6.3</version> <exclusions> <exclusion> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>2.4.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.4</version> <!-- 排除Spark依賴中關於Hadoop和Scala的依賴,以便於添加自已的版本 --> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.4</version> </dependency> <!-- 添加自己的Hadoop版本 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.29</version> </dependency> </dependencies> <!-- 編譯Scala代碼的插件--> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <id>scala-compile-first</id> <goals> <goal>compile</goal> </goals> <configuration> <includes> <include>**/*.scala</include> </includes> </configuration> </execution> <execution> <id>scala-test-compile</id> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
源碼地址
鏈接:https://pan.baidu.com/s/12OylfnDkfTnN6CLjElBKfg 提取碼:kxn1
