環境
虛擬機:VMware 10
Linux版本:CentOS-6.5-x86_64
客戶端:Xshell4
FTP:Xftp4
jdk1.8
scala-2.10.4(依賴jdk1.8)
spark-1.6
從kafka消費消息的偏移量存儲到ZK 或者 mysql 或者 hbase,進行主動管理。
以下舉例通過ZK進行存儲管理:
package manageoffset; import java.util.Map; import kafka.common.TopicAndPartition; import manageoffset.getOffset.GetTopicOffsetFromKafkaBroker; import manageoffset.getOffset.GetTopicOffsetFromZookeeper; import org.apache.log4j.Logger; import org.apache.spark.streaming.api.java.JavaStreamingContext; public class UseZookeeperManageOffset { /** * 使用log4j打印日志,“UseZookeeper.class” 設置日志的產生類 */ static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class); public static void main(String[] args) { /** * 加載log4j的配置文件,方便打印日志 */ ProjectUtil.LoadLogConfig(); logger.info("project is starting..."); /** * 從kafka集群中得到topic每個分區中生產消息的最大偏移量位置 */ Map<TopicAndPartition, Long> topicOffsets = GetTopicOffsetFromKafkaBroker.getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic"); /** * 從zookeeper中獲取當前topic每個分區 consumer 消費的offset位置 */ Map<TopicAndPartition, Long> consumerOffsets = GetTopicOffsetFromZookeeper.getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic"); /** * 合並以上得到的兩個offset , * 思路是: * 如果zookeeper中讀取到consumer的消費者偏移量,那么就zookeeper中當前的offset為准。 * 否則,如果在zookeeper中讀取不到當前消費者組消費當前topic的offset,就是當前消費者組第一次消費當前的topic, * offset設置為topic中消息的最大位置。 */ if(null!=consumerOffsets && consumerOffsets.size()>0){ topicOffsets.putAll(consumerOffsets); } /** * 如果將下面的代碼解開,是將topicOffset 中當前topic對應的每個partition中消費的消息設置為0,就是從頭開始。 */ // for(Map.Entry<TopicAndPartition, Long> item:topicOffsets.entrySet()){ // item.setValue(0l); // } /** * 構建SparkStreaming程序,從當前的offset消費消息 */ JavaStreamingContext jsc = SparkStreamingDirect.getStreamingContext(topicOffsets,"zhy"); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
package manageoffset.getOffset; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import com.google.common.collect.ImmutableMap; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetRequest; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; /** * 測試之前需要啟動kafka * @author root * */ public class GetTopicOffsetFromKafkaBroker { public static void main(String[] args) { Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic"); Set<Entry<TopicAndPartition, Long>> entrySet = topicOffsets.entrySet(); for(Entry<TopicAndPartition, Long> entry : entrySet) { TopicAndPartition topicAndPartition = entry.getKey(); Long offset = entry.getValue(); String topic = topicAndPartition.topic(); int partition = topicAndPartition.partition(); System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset); } } /** * 從kafka集群中得到當前topic,生產者在每個分區中生產消息的偏移量位置 * @param KafkaBrokerServer * @param topic * @return */ public static Map<TopicAndPartition,Long> getTopicOffsets(String KafkaBrokerServer, String topic){ Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>(); //遍歷每個broker for(String broker:KafkaBrokerServer.split(",")){ //使用SimpleConsumer訪問broker SimpleConsumer simpleConsumer = new SimpleConsumer(broker.split(":")[0],Integer.valueOf(broker.split(":")[1]), 64*10000,1024,"consumer"); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic)); TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest); //遍歷獲取的元數據 for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { //遍歷每個partition上的元數據 for (PartitionMetadata part : metadata.partitionsMetadata()) { Broker leader = part.leader(); if (leader != null) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId()); PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 10000); OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); if (!offsetResponse.hasError()) { long[] offsets = offsetResponse.offsets(topic, part.partitionId()); retVals.put(topicAndPartition, offsets[0]); } } } } simpleConsumer.close(); } return retVals; } }
package manageoffset.getOffset; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryUntilElapsed; import com.fasterxml.jackson.databind.ObjectMapper; import kafka.common.TopicAndPartition; public class GetTopicOffsetFromZookeeper { /** * 從zookeeper中獲取當前topic每個分區 consumer 消費的offset位置 * @param zkServers * @param groupID * @param topic * @return */ public static Map<TopicAndPartition,Long> getConsumerOffsets(String zkServers,String groupID, String topic) { Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>(); ObjectMapper objectMapper = new ObjectMapper(); CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectString(zkServers) .connectionTimeoutMs(1000) .sessionTimeoutMs(10000) .retryPolicy(new RetryUntilElapsed(1000, 1000)) .build(); curatorFramework.start(); try { String nodePath = "/consumers/"+groupID+"/offsets/" + topic; if(curatorFramework.checkExists().forPath(nodePath)!=null) { List<String> partitions = curatorFramework.getChildren().forPath(nodePath); for(String partiton:partitions) { int partitionL = Integer.valueOf(partiton); Long offset = objectMapper.readValue(curatorFramework.getData().forPath(nodePath+"/"+partiton),Long.class); TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partitionL); retVals.put(topicAndPartition, offset); } } } catch(Exception e) { e.printStackTrace(); } curatorFramework.close(); return retVals; } public static void main(String[] args) { Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic"); Set<Entry<TopicAndPartition, Long>> entrySet = consumerOffsets.entrySet(); for(Entry<TopicAndPartition, Long> entry : entrySet) { TopicAndPartition topicAndPartition = entry.getKey(); String topic = topicAndPartition.topic(); int partition = topicAndPartition.partition(); Long offset = entry.getValue(); System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset); } } }
package manageoffset; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryUntilElapsed; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange; import com.fasterxml.jackson.databind.ObjectMapper; public class SparkStreamingDirect { /** * 根據傳入的offset來讀取kafka消息 * @param topicOffsets * @param groupID * @return */ public static JavaStreamingContext getStreamingContext(Map<TopicAndPartition, Long> topicOffsets,String groupID){ SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingOnKafkaDirect"); conf.set("spark.streaming.kafka.maxRatePerPartition", "10"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); jsc.checkpoint("/checkpoint"); Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list","node1:9092,node2:9092,node3:9092"); kafkaParams.put("group.id","MyFirstConsumerGroup"); for(Map.Entry<TopicAndPartition,Long> entry:topicOffsets.entrySet()){ System.out.println(entry.getKey().topic()+"\t"+entry.getKey().partition()+"\t"+entry.getValue()); } JavaInputDStream<String> message = KafkaUtils.createDirectStream( jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicOffsets, new Function<MessageAndMetadata<String,String>,String>() { private static final long serialVersionUID = 1L; public String call(MessageAndMetadata<String, String> v1)throws Exception { return v1.message(); } } ); final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); JavaDStream<String> lines = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; @Override public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd; } } ); message.foreachRDD(new VoidFunction<JavaRDD<String>>(){ private static final long serialVersionUID = 1L; @Override public void call(JavaRDD<String> t) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectString("node3:2181,node4:2181,node5:2181") .connectionTimeoutMs(1000) .sessionTimeoutMs(10000) .retryPolicy(new RetryUntilElapsed(1000, 1000)) .build(); curatorFramework.start(); for (OffsetRange offsetRange : offsetRanges.get()) { long fromOffset = offsetRange.fromOffset();//開始offset long untilOffset = offsetRange.untilOffset();//結束offset final byte[] offsetBytes = objectMapper.writeValueAsBytes(offsetRange.untilOffset()); String nodePath = "/consumers/"+groupID+"/offsets/" + offsetRange.topic()+ "/" + offsetRange.partition(); System.out.println("nodePath = "+nodePath); System.out.println("fromOffset = "+fromOffset+",untilOffset="+untilOffset); //ZK記錄分區主題的消費偏移量 if(curatorFramework.checkExists().forPath(nodePath) != null) { curatorFramework.setData().forPath(nodePath,offsetBytes); } else { curatorFramework.create().creatingParentsIfNeeded().forPath(nodePath, offsetBytes); } } curatorFramework.close(); } }); lines.print(); return jsc; } }
package manageoffset; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; public class ProjectUtil { /** * 使用log4j配置打印日志 */ static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class); /** * 加載配置的log4j.properties,默認讀取的路徑在src下,如果將log4j.properties放在別的路徑中要手動加載 */ public static void LoadLogConfig() { PropertyConfigurator.configure("d:/eclipse4.7WS/SparkStreaming_Kafka_Manage/resource/log4j.properties"); } /** * 加載配置文件 * 需要將放config.properties的目錄設置成資源目錄 * @return */ public static Properties loadProperties() { Properties props = new Properties(); InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("config.properties"); if(null != inputStream) { try { props.load(inputStream); } catch (IOException e) { logger.error(String.format("Config.properties file not found in the classpath")); } } return props; } public static void main(String[] args) { Properties props = loadProperties(); String value = props.getProperty("hello"); System.out.println(value); } }