spark streaming從指定offset處消費Kafka數據 2017-06-13 15:19 770人閱讀 評論(2) 收藏 舉報 分類: spark(5) 原文地址:http://blog.csdn.net/high2011/article/details/53706446 首先很感謝原文作者,看到這篇文章我少走了很多彎路,轉載此文章是為了保留一份供復習用,請大家支持原作者,移步到上面的連接去看,謝謝 一、情景:當Spark streaming程序意外退出時,數據仍然再往Kafka中推送,然而由於Kafka默認是從latest的offset讀取,這會導致數據丟失。為了避免數據丟失,那么我們需要記錄每次消費的offset,以便下次檢查並且從指定的offset開始讀取 二、環境:kafka-0.9.0、Spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16 三、實現代碼: 1、引入spark和kafka的相關依賴包 [html] view plain copy <?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ngaa</groupId> <artifactId>test-my</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!--add maven release--> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <!--scala版本--> <scala.version>2.10.5</scala.version> <!--測試機器上的scala版本--> <test.scala.version>2.11.7</test.scala.version> <jackson.version>2.3.0</jackson.version> <!--slf4j版本--> <slf4j-version>1.7.20</slf4j-version> <!--cdh-spark--> <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version> <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version> <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version> <!--cdh-hadoop--> <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version> <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)--> <httpclient.version>4.2.5</httpclient.version> <!--http copre--> <httpcore.version>4.2.5</httpcore.version> <!--fastjson--> <fastjson.version>1.1.39</fastjson.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> <!--配置依賴庫地址(用於加載CDH依賴的jar包) --> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!--httpclient--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>${httpclient.version}</version> </dependency> <!--http core--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>${httpcore.version}</version> </dependency> <!--slf4j--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j-version}</version> </dependency> <!--hadoop--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!--spark scala--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <!--spark streaming和kafka的相關包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.streaming.cdh.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${kafka.spark.cdh.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!--引入windows本地庫的spark包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-assembly_2.10</artifactId> <version>${spark.cdh.version}</version> <scope>system</scope> <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath> </dependency> <!--引入測試環境linux本地庫的spark包--> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-assembly_2.10</artifactId>--> <!--<version>${spark.cdh.version}</version>--> <!--<scope>system</scope>--> <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar--> <!--</systemPath>--> <!--</dependency>--> <!--引入中央倉庫的spark包--> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-assembly_2.10</artifactId>--> <!--<version>${spark.cdh.version}</version>--> <!--</dependency>--> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-web-proxy</artifactId> <version>2.6.0-cdh5.8.0</version> </dependency> </dependencies> <!--maven打包--> <build> <finalName>test-my</finalName> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.7</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project> 2、新建測試類 [java] view plain copy import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.slf4j.LoggerFactory /** * Created by yangjf on 2016/12/18 * Update date: * Time: 11:10 * Describle :從指定偏移量讀取kafka數據 * Result of Test: * Command: * Email: jifei.yang@ngaa.com.cn */ object ReadBySureOffsetTest { val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass) def main(args: Array[String]) { //設置打印日志級別 Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR) Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR) Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) logger.info("測試從指定offset消費kafka的主程序開始") if (args.length < 1) { System.err.println("Your arguments were " + args.mkString(",")) System.exit(1) logger.info("主程序意外退出") } //hdfs://hadoop1:8020/user/root/spark/checkpoint val Array(checkpointDirectory) = args logger.info("checkpoint檢查:" + checkpointDirectory) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { createContext(checkpointDirectory) }) logger.info("streaming開始啟動") ssc.start() ssc.awaitTermination() } def createContext(checkpointDirectory: String): StreamingContext = { //獲取配置 val brokers = "hadoop3:9092,hadoop4:9092" val topics = "20161218a" //默認為5秒 val split_rdd_time = 8 // 創建上下文 val sparkConf = new SparkConf() .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]") .set("spark.app.id", "streaming_kafka") val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time)) ssc.checkpoint(checkpointDirectory) // 創建包含brokers和topic的直接kafka流 val topicsSet: Set[String] = topics.split(",").toSet //kafka配置參數 val kafkaParams: Map[String, String] = Map[String, String]( "metadata.broker.list" -> brokers, "group.id" -> "apple_sample", "serializer.class" -> "kafka.serializer.StringEncoder" // "auto.offset.reset" -> "largest" //自動將偏移重置為最新偏移(默認) // "auto.offset.reset" -> "earliest" //自動將偏移重置為最早的偏移 // "auto.offset.reset" -> "none" //如果沒有為消費者組找到以前的偏移,則向消費者拋出異常 ) /** * 從指定位置開始讀取kakfa數據 * 注意:由於Exactly Once的機制,所以任何情況下,數據只會被消費一次! * 指定了開始的offset后,將會從上一次Streaming程序停止處,開始讀取kafka數據 */ val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L)) //指定topic,partition_no,offset val fromOffsets = setFromOffsets(offsetList) //構建參數 val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //構建MessageAndMetadata //使用高級API從指定的offset開始消費,欲了解詳情, //請進入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看 val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) //數據操作 messages.foreachRDD(mess => { //獲取offset集合 val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges mess.foreachPartition(lines => { lines.foreach(line => { val o: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.info("++++++++++++++++++++++++++++++此處記錄offset+++++++++++++++++++++++++++++++++++++++") logger.info(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") logger.info("+++++++++++++++++++++++++++++++此處消費數據操作++++++++++++++++++++++++++++++++++++++") logger.info("The kafka line is " + line) }) }) }) ssc } //構建Map def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = { var fromOffsets: Map[TopicAndPartition, Long] = Map() for (offset <- list) { val tp = TopicAndPartition(offset._1, offset._2)//topic和分區數 fromOffsets += (tp -> offset._3) // offset位置 } fromOffsets } } 四、參考文檔: 1、spark API http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$ 2、Kafka官方配置說明:http://kafka.apache.org/documentation.html#configuration 3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 4、Spark streaming 消費遍歷offset說明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html 5、Kafka官方API說明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 注:以上測試通過,可以根據需要修改。如有疑問,請留言!
重復這個實驗的注意事項
1.首先要知道自己topic ,分區數,checkpoint的文件夾
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test
然后在下面的代碼設置每個分區的起始位置,
val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L))
每次運行之后都上次的消費記錄都會記錄在checkpint中,比如第一次運行是從0開始消費的,程序暫停之后會從checkpoint中讀取上次的位置然后基礎消費
注意的地方是checkpoint要和topic一一對應.不然會報錯,還有分分區的個數如果是3個list'里面就有三個,且是從0開始的
如果換了topic要記得換checkpoint
現在的代碼可以保證每次啟動之后從上次的問題開始消費.
從指定位置消費的做法是,切換一個新的checkpoint文件夾,在
val offsetList = List((topics, 0, 120L),(topics, 1, 0L),(topics, 2, 0L))中執行指定哪個分區從哪里開始消費,此時是指0號分區從120個偏移量開始消費
此時offset自己管理是沒有存儲在zk中的是查詢不到的
val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L))
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(checkpointDirectory)
})
http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
不進入斷點的原因
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the StreamingContext
will be created by called the provided `creatingFunc`.
Storing Offsets Outside Kafka
The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.
lz實現的代碼
package my.bigdata.studyKafka /** * Created by lq on 2017/8/30. */ import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, TaskContext} import org.slf4j.LoggerFactory object ReadBySureOffsetTest { val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass) def main(args: Array[String]) { //設置打印日志級別 System.setProperty("HADOOP_USER_NAME", "root") Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR) Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR) Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) logger.info("測試從指定offset消費kafka的主程序開始") if (args.length < 1) { System.err.println("Your arguments were " + args.mkString(",")) System.exit(1) logger.info("主程序意外退出") } //hdfs://hadoop1:8020/user/root/spark/checkpoint val Array(checkpointDirectory) = args logger.info("checkpoint檢查:" + checkpointDirectory) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { createContext(checkpointDirectory) }) logger.info("streaming開始啟動") ssc.start() ssc.awaitTermination() } def createContext(checkpointDirectory: String): StreamingContext = { //獲取配置 val brokers = "slave1:9092,slave2:9092" val topics = "maats1" //默認為5秒 val split_rdd_time = 8 // 創建上下文 val sparkConf = new SparkConf() .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]") .set("spark.app.id", "streaming_kafka") val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time)) ssc.checkpoint(checkpointDirectory) // 創建包含brokers和topic的直接kafka流 val topicsSet: Set[String] = topics.split(",").toSet //kafka配置參數 val kafkaParams: Map[String, String] = Map[String, String]( "metadata.broker.list" -> brokers, "group.id" -> "first3", "serializer.class" -> "kafka.serializer.StringEncoder" // "auto.offset.reset" -> "largest" //自動將偏移重置為最新偏移(默認) // "auto.offset.reset" -> "earliest" //自動將偏移重置為最早的偏移 // "auto.offset.reset" -> "none" //如果沒有為消費者組找到以前的偏移,則向消費者拋出異常 ) /** * 從指定位置開始讀取kakfa數據 * 注意:由於Exactly Once的機制,所以任何情況下,數據只會被消費一次! * 指定了開始的offset后,將會從上一次Streaming程序停止處,開始讀取kafka數據 */ //val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L)) //指定topic,partition_no,offset val offsetList = List((topics, 0, 230L),(topics, 1, 0L),(topics, 2, 0L)) val fromOffsets = setFromOffsets(offsetList) //構建參數 val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //構建MessageAndMetadata //使用高級API從指定的offset開始消費,欲了解詳情, //請進入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看 val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) //數據操作 messages.foreachRDD(mess => { //獲取offset集合 val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges mess.foreachPartition(lines => { lines.foreach(line => { val o: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.info("++++++++++++++++++++++++++++++此處記錄offset+++++++++++++++++++++++++++++++++++++++") logger.info(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") logger.info("+++++++++++++++++++++++++++++++此處消費數據操作++++++++++++++++++++++++++++++++++++++") logger.info("The kafka line is " + line) }) }) }) ssc } //構建Map def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = { var fromOffsets: Map[TopicAndPartition, Long] = Map() for (offset <- list) { val tp = TopicAndPartition(offset._1, offset._2)//topic和分區數 fromOffsets += (tp -> offset._3) // offset位置 } fromOffsets } }