spark streaming 读取kafka topic上json格式数据,存储为parquet文件;使用redis存储offset;因为是将数据存储下来,没能使用事务,本文不能实现exactly once语义;基于幂等的角度,可以考虑数据设置唯一标志,进行merge去重,来实现exactly once。
package com.abc.etl
package spark
import java.util.{HashSet => JavaHashSet, Set => JavaSet}
import cn.hutool.core.util.StrUtil
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.parser.Feature
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import redis.clients.jedis.{Jedis, JedisSentinelPool}
import scala.collection.mutable.ListBuffer
object MoreTopic {
/**
* redis中储存的topic偏移量的数据模型: topic_partition <- offset
*
* @param topics
* @param jedis
* @return 所有TopicPartition的offset
*/
def getRedisOffset(topics: collection.Set[String], jedis: Jedis): collection.Map[TopicPartition, Long] = {
val res: collection.mutable.HashMap[TopicPartition, Long] = collection.mutable.HashMap.empty;
for (topic <- topics) {
val topicPartitionKeys: JavaSet[String] = jedis.keys(topic + StrUtil.UNDERLINE + "*")
val iterator = topicPartitionKeys.iterator()
while (iterator.hasNext) {
val topicPartitionKey = iterator.next()
val offset = jedis.get(topicPartitionKey)
val topic = topicPartitionKey.split(StrUtil.UNDERLINE)(0)
val partition = topicPartitionKey.split(StrUtil.UNDERLINE)(1)
res.put(new TopicPartition(topic, partition.toInt), offset.toLong)
}
}
res
}
def main(args: Array[String]): Unit = {
val duration = 20
val appName = "sparkstreamingkafka2"
val conf = SparkConfSingleton.getInstance().setAppName(appName).setMaster("local[2]")
val ssc = new StreamingContext(SparkContextSingleton.getInstance(conf), Seconds(duration))
val kafkaParams = Map[String, Object](
"auto.offset.reset" -> "earliest",
"value.deserializer" -> classOf[StringDeserializer]
, "key.deserializer" -> classOf[StringDeserializer]
, "bootstrap.servers" -> "zk1:9092,zk2:9092,zk3:9092"
, "group.id" -> "hellokitty"
, "enable.auto.commit" -> (false: java.lang.Boolean)
)
val sentinels = new JavaHashSet[String] {
{
add("zk2:26379");
add("zk3:26379");
}
}
val master = "mymaster";
val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource
var stream: InputDStream[ConsumerRecord[String, String]] = null
val topics = Set("Kafka2Hdfs")
stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, getRedisOffset(topics, jedis))
)
jedis.close()
stream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
//require rdd format is Rdd[ConsumerRecord],ref https://blog.csdn.net/xianpanjia4616/article/details/85871063
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(partition => {
val sc = SparkContextSingleton.getInstance(conf)
val o = offsetRanges(TaskContext.get.partitionId)
println("reach position .................: " + s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
val list = ListBuffer.empty[Row]
while (partition.hasNext) {
val consumerRecord = partition.next()
val json = consumerRecord.value()
println("json is : " + json)
val jsonObject = JSON.parseObject(json, Feature.OrderedField)
val values = jsonObject.values().toArray()
val row = Row.apply(values: _*)
list += row
}
val rowRdd = sc.makeRDD(list)
val schema = StructType(
List(
StructField("key1", StringType, true),
StructField("value1", StringType, true),
StructField("key1.type", StringType, true)
)
)
val sqlContext = SQLContextSingleton.getInstance(sc)
val df = sqlContext.createDataFrame(rowRdd, schema)
df.write.format("parquet").mode("append").save("sparkStreamingKafka2HdfsData")
val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource
offsetRanges.foreach { offsetRange =>
println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
val topic_partition_key_new = offsetRange.topic + StrUtil.UNDERLINE + offsetRange.partition
jedis.set(topic_partition_key_new, offsetRange.untilOffset + "")
}
jedis.close()
})
}
})
ssc.start()
ssc.awaitTermination()
}
object JedisSentinelPoolSingleton {
@transient private var instance: JedisSentinelPool = _
def getInstance(master: String, sentinels: JavaHashSet[String]): JedisSentinelPool = {
if (instance == null) {
val gPoolConfig = new GenericObjectPoolConfig();
gPoolConfig.setMaxIdle(10);
gPoolConfig.setMaxTotal(10);
gPoolConfig.setMaxWaitMillis(10);
gPoolConfig.setJmxEnabled(true);
instance = new JedisSentinelPool(master, sentinels, gPoolConfig)
}
instance
}
}
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
object SparkContextSingleton {
@transient private var instance: SparkContext = _
def getInstance(sparkConf: SparkConf): SparkContext = {
if (instance == null) {
instance = new SparkContext(sparkConf)
}
instance
}
}
object SparkConfSingleton {
@transient private var instance: SparkConf = _
def getInstance(): SparkConf = {
if (instance == null) {
instance = new SparkConf()
}
instance
}
}
}
特别依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<!-- fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.37</version>
</dependency>
<!-- redis客户端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<!--hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.5.10</version>
</dependency>
参考:https://blog.csdn.net/xianpanjia4616/article/details/81709075
