Spark streaming消費kafka數據通過手動管理kafkaoffset保證實時流消費數據的一致性


1.寫在前面

在大數據流式和實時數據計算方面,目前大多采用`kafka+spark streaming`和`kafka+flink`,這兩種方式的不同在於組件的不同,spark是離線批和流式一體的大數據分布式計算引擎,而flink最初是為流式和實時計算而生的,所以在流式和實時上更加有優勢。而隨着flink不斷的更新逐漸對批的支持也非常好。這兩種方案在企業業務應用中采用任一種即可。
對於基於kafka或其他消息隊列的流式和實時計算,保證數據一致性是至關重要的,是保證數據質量的前提。而相對於離線的批處理計算,流式和實時計算在保證數據的一致性較難一些。因為批處理一批計算完后,數據仍然存在分布式文件系統HDFS中,如果確保了這一批數據通過計算引擎組件計算完好到大數據處理的下一個環節,並且下一個環節的數據質量不存在問題,那么可以將這批數據從HDFS刪除或者按照周期性保存,反之計算不正確,下游的數據質量有問題,那么只需要更改做些對應的調整,再次消費即可。而對於基於kafka的流式和實時計算,則需要對計算引擎消費kafka的數據做一個監測,從而確保是否對kafka的每一條數據或者每一批數據對應的offset提交,對於不懂`kafka offset`可以自行去apche官方去查看kafka的官方API和文檔。而本篇博文主要是介紹`kafka+spark streaming`這個方案如何保存數據的一直性和數據質量。

2.方案及邏輯流圖

2.1.方案

第一點:需要明白kafka的底層機制及工作原理,這里只簡要說明,詳細的參考kafka官網。kafka是將每一條寫入kafka的數據按分區分布存儲,將每條寫入的數據作一個offset標記,這個標記的順序是按插入數據自增的。當消費程序的時候,會按照分區區分,逐個根據offset順序消費。當在消費數據時,如果將自動提交offset參數設置為true(enable.auto.commit=true),那么不管消費數據的結果是否正確,只要消費數據程序沒有因為異常而中斷,kafka都會講數據的offset信息按照分區組合的方式存在依賴的zookeeper上。反之當enable.auto.commit=false時,消費程序及時消費結果正確,程序沒有中斷都不會提交offset,需要程序手動提交offset。舉一個場景,如果消費程序沒有出現異常,但消費數據的結果不對,應該是不提交offset的,當優化了流式消費程序,在啟動消費程序,應該必須能消費到之前消費結果不對的數據。但是前者已經提交了offset,沒法拿到了。而后者能夠很好的解決這個問題,提交與不提交offset,由消費程序自己決定。
第二點:需要清楚spark的底層機制,這里做簡要說明,詳細的參考spark官網。spark在對數據進行分布式計算時(不管是流式還是離線批),都是將數據讀成RDD,然后在對RDD進行spark自帶算子計算和spark的方法API進行業務處理,而這兩種本質上是一樣的。這里以計算數據寫入下游某個組件舉例重點說方法API。一般情況下,spark都是將獲取的數據RDD做如下操作:
1>.先對rdd進行foreach得到每個Partition
2>.在對每個Partition進行遍歷得到Partition里面的數據,這里是一個迭代器(iterator),iterator里面就是實際每一條數據
	rdd.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> partition) throws Exception {
                partition.foreachPartition(new VoidFunction<Iterator<String>>() {
                    @Override
                    public void call(Iterator<String> iterator) throws Exception {                     
                        while (iterator.hasNext()) {
                            String message = iterator.next();
                    	}
                });
            }
        });
明白一二兩點后就需要思考spark streaming消費kafka數據時如何保存數據一致性,怎樣去保證。這里其實不難,有兩種方式:
第一種:將spark獲取到的kafka數據轉化成的rdd對應的kafka的offset全部拿出來,以rdd為實體,當`rdd.foreachRDD`整個無誤后手動提交offset
第二種:在`rdd.foreachRDD`里面的`partition.foreachPartition`里面將每個partition對應的數據的kafka數據的offset查詢出來,然后單個partition處理無誤后提交單個partition對應的kafka數據的offset
第一種和第二種比較,他們的原理基本相同,第一種獲取的offset其實也是第二種的數組,而第一種更加廣義,第二種更加詳細。
可能有讀者會問,為什么不更加詳細點,在`partition.foreachPartition`里面迭代每一條數據時,將每一條數據的offset獲取出來,成功一條,提交該條對應的offset。為什么不這樣做,有一下三點原因:
1>.分而治之,在保證數據質量的同時,要確保性能和其他指標,如果消費一條數據保存該數據的offset,勢必會帶來性能的影響。而按照分區的方式,一個分區里面的每一條記錄都消費成功,證明這個分區處理是無誤的,則提交offset。如果分區里的每一條和幾條數據消費不成功,則認為該分區處理是不成功的,不提交offset,待修復后再一次消費這個partition對應的kafka的offset數據,這樣肯定會造成數據重復,但一定不會造成數據遺漏。而大數據處理中,數據重復從來不是問題。但數據遺漏是不被允許的。
2>.spark streaming消費kafka數據的官方api中並沒有這樣的api,而是將partition作為一個整體的到offset的信息

2.2.邏輯流圖

3.實現代碼

這里以下游寫回kafka為例
這里的版本為:kafka_2.10,spark_2.10
pom.xml引入必要的jar包
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.6.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.6.0</version>
			<scope>compile</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>1.6.0</version>
		</dependency>
		
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.9.0.0</version>
		</dependency>

具體實現

第一種:將spark獲取到的kafka數據轉化成的rdd對應的kafka的offset全部拿出來,以rdd為實體,當`rdd.foreachRDD`整個無誤后手動提交offset
public class SparkStreamingKafkaToKafka{
    public static void main(String args[]) throws Exception {
    	SparkConf conf = new SparkConf().setAppName("kafka-to-kafka-test");
        setSparkConf(parameterParse, conf);
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Seconds.apply(Long.parseLong("50")));

    	String topic = "mytopic_test";
    	String saveTopic = "save_test";

    	final Broadcast<Map<String, Object>> kafkaParamsProducerBroadcast = sparkContext.broadcast(generatorKafkaParamsProduce());

    	//從kafka獲取數據流
		JavaInputDStream<String> dStream = KafkaUtils.createDirectStream(streamingContext, String.class, String.class,
	        StringDecoder.class, StringDecoder.class, String.class,
	        generatorKafkaParams(), generatorTopicOffsets(topic),
	        new Function<MessageAndMetadata<String, String>, String>() {
	            private static final long serialVersionUID = 1L;
	                @Override
	                public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
	                    return msgAndMd.message();
	                }
	        });

		dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
	            @Override
	            public void call(JavaRDD<String> rdd) throws Exception {
	                final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
                	final OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                	offsetRanges.set(offsets);
	                rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
	                    @Override
	                    public void call(Iterator<String> iterator) throws Exception {
	                        Producer<String, String> producer = new KafkaProducer<>(kafkaParamsProducerBroadcast.getValue());

	                        while (iterator.hasNext()) {
	                            String message = iterator.next();
	                            if (!StringUtils.isEmpty(message)) {
	                                Map<String, String> resultMap = (Map<String, String>) JSON.parse(message);
	                                try {
	                                    ProducerRecord record = new ProducerRecord<String, String>(saveTopic, null, JSONObject.toJSONString(resultMap));
	                                    producer.send(record);
	                                    successCount++;
	                                } catch (Exception e) {
	                                   e.printStackTrace();
	                                }
	                            }
	                        }

	                        producer.flush();
	                    }
	                });

	                saveOffset(offsetRanges);
	            }
	});

	public static Map<String, String> generatorKafkaParams() {
        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder");
        kafkaParams.put("metadata.broker.list", "hadoop10:9092,hadoop11:9092,hadoop12:9092");
        kafkaParams.put("zookeeper.connect", "hadoop10:2181,hadoop11:2181,hadoop12:2181/kafka");
        kafkaParams.put("zookeeper.connection.timeout.ms", "10000");
        kafkaParams.put("zookeeper.session.timeout.ms", "6000");
        kafkaParams.put("zookeeper.sync.time.ms", "2000");
        kafkaParams.put("group.id", "test");
        kafkaParams.put("auto.offset.reset", "largest");
        kafkaParams.put("auto.commit.interval.ms", "1000");
        kafkaParams.put("fetch.message.max.bytes", "104857600");
        kafkaParams.put("replica.fetch.max.bytes", "104857600");
        return kafkaParams;
    }

    public static Map<TopicAndPartition, Long> generatorTopicOffsets(String topic) {
        Map<TopicAndPartition, Long> topicOffsets = KafkaOffsetUtils.
                getTopicOffsets("hadoop10:9092,hadoop11:9092,hadoop12:9092", topic);
        Map<TopicAndPartition, Long> consumerOffsets = KafkaOffsetUtils.
                getConsumerOffsets("hadoop10:2181,hadoop11:2181,hadoop12:2181/kafka",
                        "test", topic,
                        Integer.parseInt("10000"),
                        Integer.parseInt("6000"));
        if (null != consumerOffsets && consumerOffsets.size() > 0) {
            topicOffsets.putAll(consumerOffsets);
        }
        return topicOffsets;
    }

    public static void saveOffset(final AtomicReference<OffsetRange[]> offsetRanges) throws Exception {
        org.codehaus.jackson.map.ObjectMapper objectMapper = new org.codehaus.jackson.map.ObjectMapper();
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString("hadoop10:2181,hadoop11:2181,hadoop12:2181/kafka")
                .connectionTimeoutMs(Integer.parseInt("10000"))
                .sessionTimeoutMs(Integer.parseInt("6000"))
                .retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
        curatorFramework.start();
        for (OffsetRange offsetRange : offsetRanges.get()) {
            final byte[] offsetBytes = objectMapper.writeValueAsBytes(offsetRange.untilOffset());
            String nodePath = "/consumers/" + groupIdBroadcast.getValue()
                    + "/offsets/" + offsetRange.topic() + "/" + offsetRange.partition();
            if (null != curatorFramework.checkExists().forPath(nodePath)) {
                curatorFramework.setData().forPath(nodePath, offsetBytes);
            } else {
                curatorFramework.create().creatingParentsIfNeeded().forPath(nodePath, offsetBytes);
            }
        }
        curatorFramework.close();
    }

    public static Map<String, Object> generatorKafkaParamsProduce() {
        Map<String, Object> kafkaParams = new HashMap<String, Object>();

        kafkaParams.put("bootstrap.servers", "hadoop10:9092,hadoop11:9092,hadoop12:9092");
        // 消息內容使用的反序列化類
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        return kafkaParams;
    }
}

第二種:在`rdd.foreachRDD`里面的`partition.foreachPartition`里面將每個partition對應的數據的kafka數據的offset查詢出來,然后單個partition處理無誤后提交單個partition對應的kafka數據的offset
public class SparkStreamingKafkaToKafka{
    public static void main(String args[]) throws Exception {
    	SparkConf conf = new SparkConf().setAppName("kafka-to-kafka-test");
        setSparkConf(parameterParse, conf);
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, Seconds.apply(Long.parseLong("50")));

    	String topic = "mytopic_test";
    	String saveTopic = "save_test";

    	final Broadcast<Map<String, Object>> kafkaParamsProducerBroadcast = sparkContext.broadcast(generatorKafkaParamsProduce());

    	//從kafka獲取數據流
		JavaInputDStream<String> dStream = KafkaUtils.createDirectStream(streamingContext, String.class, String.class,
	        StringDecoder.class, StringDecoder.class, String.class,
	        generatorKafkaParams(), generatorTopicOffsets(topic),
	        new Function<MessageAndMetadata<String, String>, String>() {
	            private static final long serialVersionUID = 1L;
	                @Override
	                public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
	                    return msgAndMd.message();
	                }
	        });

		dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
	            @Override
	            public void call(JavaRDD<String> rdd) throws Exception {
	                final OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
	                rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
	                    @Override
	                    public void call(Iterator<String> iterator) throws Exception {
	                        Producer<String, String> producer = new KafkaProducer<>(kafkaParamsProducerBroadcast.getValue());

	                        OffsetRange offset = offsets[TaskContext.get().partitionId()];
	                        long dataCount = offset.count();//數據總量
	                        long successCount = 0;//寫入成功總量

	                        while (iterator.hasNext()) {
	                            String message = iterator.next();
	                            if (!StringUtils.isEmpty(message)) {
	                                Map<String, String> resultMap = (Map<String, String>) JSON.parse(message);
	                                try {
	                                    ProducerRecord record = new ProducerRecord<String, String>(saveTopic, null, JSONObject.toJSONString(resultMap));
	                                    producer.send(record);
	                                    successCount++;
	                                } catch (Exception e) {
	                                   e.printStackTrace();
	                                }
	                            }
	                        }

	                        //根據offset將數據的處理結果寫到mysql表中,如果dataCount=0,證明這一批流沒有數據,不需要寫
	                        if (dataCount > 0) {
	                            long failedCount = dataCount - successCount;//寫入失敗總量
	                            if (failedCount == 0) {
	                                saveOffsetSingle(offset);
	                            }
	                        }
	                        producer.flush();
	                    }
	                });
	            }
	        });
	}

	public static Map<String, String> generatorKafkaParams() {
        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder");
        kafkaParams.put("metadata.broker.list", "hadoop10:9092,hadoop11:9092,hadoop12:9092");
        kafkaParams.put("zookeeper.connect", "hadoop10:2181,hadoop11:2181,hadoop12:2181/kafka");
        kafkaParams.put("zookeeper.connection.timeout.ms", "10000");
        kafkaParams.put("zookeeper.session.timeout.ms", "6000");
        kafkaParams.put("zookeeper.sync.time.ms", "2000");
        kafkaParams.put("group.id", "test");
        kafkaParams.put("auto.offset.reset", "largest");
        kafkaParams.put("auto.commit.interval.ms", "1000");
        kafkaParams.put("fetch.message.max.bytes", "104857600");
        kafkaParams.put("replica.fetch.max.bytes", "104857600");
        return kafkaParams;
    }

    public static Map<TopicAndPartition, Long> generatorTopicOffsets(String topic) {
        Map<TopicAndPartition, Long> topicOffsets = KafkaOffsetUtils.
                getTopicOffsets("hadoop10:9092,hadoop11:9092,hadoop12:9092", topic);
        Map<TopicAndPartition, Long> consumerOffsets = KafkaOffsetUtils.
                getConsumerOffsets("hadoop10:2181,hadoop11:2181,hadoop12:2181/kafka",
                        "test", topic,
                        Integer.parseInt("10000"),
                        Integer.parseInt("6000"));
        if (null != consumerOffsets && consumerOffsets.size() > 0) {
            topicOffsets.putAll(consumerOffsets);
        }
        return topicOffsets;
    }

    public static void saveOffsetSingle(final OffsetRange offsetRange) throws Exception {
        org.codehaus.jackson.map.ObjectMapper objectMapper = new org.codehaus.jackson.map.ObjectMapper();
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString("hadoop10:2181,hadoop11:2181,hadoop12:2181/kafka")
                .connectionTimeoutMs(Integer.parseInt("10000"))
                .sessionTimeoutMs(Integer.parseInt("6000"))
                .retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
        curatorFramework.start();
        final byte[] offsetBytes = objectMapper.writeValueAsBytes(offsetRange.untilOffset());
        String nodePath = "/consumers/" + "test"
                + "/offsets/" + offsetRange.topic() + "/" + offsetRange.partition();
        if (null != curatorFramework.checkExists().forPath(nodePath)) {
            curatorFramework.setData().forPath(nodePath, offsetBytes);
        } else {
            curatorFramework.create().creatingParentsIfNeeded().forPath(nodePath, offsetBytes);
        }
        curatorFramework.close();
    }

    public static Map<String, Object> generatorKafkaParamsProduce() {
        Map<String, Object> kafkaParams = new HashMap<String, Object>();

        kafkaParams.put("bootstrap.servers", "hadoop10:9092,hadoop11:9092,hadoop12:9092");
        // 消息內容使用的反序列化類
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        return kafkaParams;
    }
}

對kafka offset操作的工具類

package com.surfilter.dp.timer.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import com.google.common.collect.ImmutableMap;

import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;

import kafka.api.PartitionOffsetRequestInfo;
//import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.kafka.OffsetRange;

//import kafka.cluster.Broker;

public class KafkaOffsetUtils {
    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        // long[] offsets2 = response.offsets(topic, 3);
        return offsets[0];
    }

    public static TreeMap<Integer, PartitionMetadata> findLeader(
            String brokerHost, int a_port, String a_topic) throws Exception {
        TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
        SimpleConsumer consumer = null;
        try {
            consumer = new SimpleConsumer(brokerHost, a_port, 100000, 64 * 1024, "leaderLookup" + new Date().getTime());
            List<String> topics = Collections.singletonList(a_topic);
            TopicMetadataRequest req = new TopicMetadataRequest(topics);
            kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
            List<TopicMetadata> metaData = resp.topicsMetadata();
            for (TopicMetadata item : metaData) {
                for (PartitionMetadata part : item.partitionsMetadata()) {
                    map.put(part.partitionId(), part);
                }
            }
        } catch (Exception e) {
            throw new Exception("Error communicating with Broker [" + brokerHost
                    + "] to find Leader for [" + a_topic + ", ]", e);
        } finally {
            if (consumer != null) {
                consumer.close();
            }
        }
        return map;
    }

    /**
     * 為了解決kafka.common.OffsetOutOfRangeException
     * 當streaming zk里面記錄kafka偏移小於kafka有效偏移,就會出現OffsetOutOfRangeException
     *
     * @param topic            主題
     * @param bootstrapServers kafka配置{e.g rzx162:9092,rzx164:9092,rzx166:9092}
     */
    public static Map<Integer, Long> getEarliestOffset(String topic, String bootstrapServers) throws Exception {
        String[] servers = bootstrapServers.split(",");
        List<String> kafkaHosts = new ArrayList<String>();
        List<Integer> kafkaPorts = new ArrayList<Integer>();
        for (int i = 0, size = servers.length; i < size; i++) {
            String[] hostAndPort = servers[i].split(":");
            try {
                String host = hostAndPort[0];
                Integer port = Integer.parseInt(hostAndPort[1]);
                kafkaHosts.add(host);
                kafkaPorts.add(port);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (kafkaHosts.size() < 1) {
            throw new Exception("parse bootstrapServers error!");
        }
        Map<Integer, Long> partionAndOffset = getOffset(topic, kafkaHosts, kafkaPorts, false);
        return partionAndOffset;
    }

    /**
     * 初始化到最新數據
     *
     * @param topic            主題
     * @param bootstrapServers kafka配置{e.g rzx162:9092,rzx164:9092,rzx166:9092}
     */
    public static Map<Integer, Long> getLastestOffset(String topic, String bootstrapServers) throws Exception {
        String[] servers = bootstrapServers.split(",");
        List<String> kafkaHosts = new ArrayList<String>();
        List<Integer> kafkaPorts = new ArrayList<Integer>();
        for (int i = 0, size = servers.length; i < size; i++) {
            String[] hostAndPort = servers[i].split(":");
            try {
                String host = hostAndPort[0];
                Integer port = Integer.parseInt(hostAndPort[1]);
                kafkaHosts.add(host);
                kafkaPorts.add(port);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (kafkaHosts.size() < 1) {
            throw new Exception("parse bootstrapServers error!");
        }
        Map<Integer, Long> partionAndOffset = getOffset(topic, kafkaHosts, kafkaPorts, true);
        return partionAndOffset;
    }

    public static Map<Integer, Long> getOffset(String topic, String bootstrapServers, boolean isLast) throws Exception {
        String[] servers = bootstrapServers.split(",");
        List<String> kafkaHosts = new ArrayList<String>();
        List<Integer> kafkaPorts = new ArrayList<Integer>();
        for (int i = 0, size = servers.length; i < size; i++) {
            String[] hostAndPort = servers[i].split(":");
            try {
                String host = hostAndPort[0];
                Integer port = Integer.parseInt(hostAndPort[1]);
                kafkaHosts.add(host);
                kafkaPorts.add(port);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (kafkaHosts.size() < 1) {
            throw new Exception("parse bootstrapServers error!");
        }
        Map<Integer, Long> partionAndOffset = getOffset(topic, kafkaHosts, kafkaPorts, isLast);
        return partionAndOffset;
    }

    private static Map<Integer, Long> getOffset(String topic, List<String> kafkaHosts,
                                                List<Integer> kafkaPorts, boolean isLast) throws Exception {
        Map<Integer, Long> partionAndOffset = null;
        for (int i = 0, size = kafkaHosts.size(); i < size; i++) {
            String host = kafkaHosts.get(i);
            int port = kafkaPorts.get(i);
            try {
                partionAndOffset = getOffset(topic, host, port, isLast);
            } catch (Exception e) {
                throw new Exception("topic(" + topic + "),kafkaHost(" + host + "),kafkaPort(" + port + "), Kafka getEarliestOffset error!", e);
            }
            if (partionAndOffset.size() > 0) {
                break;
            } else {
                continue;
            }
        }
        return partionAndOffset;
    }

    private static Map<Integer, Long> getOffset(String topic, String kafkaHost, int kafkaPort, boolean isLast) throws Exception {
        Map<Integer, Long> partionAndOffset = new HashMap<Integer, Long>();
        TreeMap<Integer, PartitionMetadata> metadatas = null;
        try {
            metadatas = KafkaOffsetUtils.findLeader(kafkaHost, kafkaPort, topic);
        } catch (Exception e) {
            throw new Exception("topic(" + topic + "),kafkaHost(" + kafkaHost + "),kafkaPort(" + kafkaPort + "), Kafka findLeader error!", e);
        }
        for (Entry<Integer, PartitionMetadata> entry : metadatas.entrySet()) {
            int partition = entry.getKey();
            String leadBroker = entry.getValue().leader().host();
            String clientName = "Client_" + topic + "_" + partition;
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(leadBroker, kafkaPort, 100000, 64 * 1024, clientName);
                long offset = -1;
                if (isLast) {
                    // 獲取最新偏移
                    offset = KafkaOffsetUtils.getOffset(consumer, topic, partition,
                            kafka.api.OffsetRequest.LatestTime(), clientName);
                } else {
                    // 獲取最早偏移
                    offset = KafkaOffsetUtils.getOffset(consumer, topic, partition,
                            kafka.api.OffsetRequest.EarliestTime(), clientName);
                }
                partionAndOffset.put(partition, offset);
            } catch (Exception e) {
                throw new Exception("topic(" + topic + "),kafkaHost(" + kafkaHost + "),kafkaPort(" + kafkaPort +
                        "), Kafka fetch earliestOffset error!", e);
            } finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
        return partionAndOffset;
    }

    /**
     * 獲得zookeeper里存放的某個topic已消費的偏移量信息
     *
     * @param zkServers         kafka在zookeeper里的地址
     * @param groupID           kafka消費者歸屬的組的名稱
     * @param topic             topic名稱
     * @param connectionTimeout 連接超時時間(毫秒)
     * @param sessionTimeout    session超時時間(毫秒)
     * @return Map<TopicAndPartition       ,       Long>
     */
    public static Map<TopicAndPartition, Long> getConsumerOffsets(String zkServers, String groupID, String topic,
                                                                  int connectionTimeout, int sessionTimeout) {
        Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
        ObjectMapper objectMapper = new ObjectMapper();
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(zkServers).connectionTimeoutMs(connectionTimeout)
                .sessionTimeoutMs(sessionTimeout).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
        curatorFramework.start();
        try {
            String nodePath = "/consumers/" + groupID + "/offsets/" + topic;
            if (curatorFramework.checkExists().forPath(nodePath) != null) {
                List<String> partitions = curatorFramework.getChildren().forPath(nodePath);
                for (String partiton : partitions) {
                    int partitionL = Integer.valueOf(partiton);
                    Long offset = objectMapper.readValue(curatorFramework.getData().forPath(nodePath + "/" + partiton), Long.class);
                    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionL);
                    retVals.put(topicAndPartition, offset);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        curatorFramework.close();
        return retVals;
    }

    public static Map<TopicAndPartition, Long> getConsumerOffsetsOfTopics(String zkServers, String groupID, List<String> topics,
                                                                          int connectionTimeout, int sessionTimeout) {
        Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
        ObjectMapper objectMapper = new ObjectMapper();
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(zkServers).connectionTimeoutMs(connectionTimeout)
                .sessionTimeoutMs(sessionTimeout).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
        curatorFramework.start();
        for (int i = 0; i < topics.size(); i++) {
            try {
                String nodePath = "/consumers/" + groupID + "/offsets/" + topics.get(i);
                if (curatorFramework.checkExists().forPath(nodePath) != null) {
                    List<String> partitions = curatorFramework.getChildren().forPath(nodePath);
                    for (String partiton : partitions) {
                        int partitionL = Integer.valueOf(partiton);
                        Long offset = objectMapper.readValue(curatorFramework.getData().forPath(nodePath + "/" + partiton), Long.class);
                        TopicAndPartition topicAndPartition = new TopicAndPartition(topics.get(i), partitionL);
                        retVals.put(topicAndPartition, offset);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            curatorFramework.close();
        }
        return retVals;
    }

    /**
     * 獲得kafka里某個topic的分區和偏移量信息
     *
     * @param kafkaBrokers kafka集群的節點
     * @param topic        kafka的主題
     * @return Map<TopicAndPartition       ,       Long>
     */
    public static Map<TopicAndPartition, Long> getTopicOffsets(String kafkaBrokers, String topic) {
        Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
        for (String kafkaBroker : kafkaBrokers.split(",")) {
            SimpleConsumer simpleConsumer = new SimpleConsumer(kafkaBroker.split(":")[0],
                    Integer.valueOf(kafkaBroker.split(":")[1]), 10000, 1024, "consumer");
            TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));
            TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
            for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
                for (PartitionMetadata part : metadata.partitionsMetadata()) {
//            	Broker leader = part.leader();//kafka 0.8.1.1
                    BrokerEndPoint leader = part.leader(); //kafka 0.9.0
                    if (leader != null) {
                        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId());
                        PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
                                kafka.api.OffsetRequest.LatestTime(), 10000);
                        OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition,
                                partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
                        OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
                        if (!offsetResponse.hasError()) {
                            long[] offsets = offsetResponse.offsets(topic, part.partitionId());
                            retVals.put(topicAndPartition, offsets[0]);
                        }
                    }
                }
            }
            simpleConsumer.close();
        }
        return retVals;
    }

    public static Map<TopicAndPartition, Long> getTopicsOffsets(String kafkaBrokers, List<String> topics) {
        Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
        for (String kafkaBroker : kafkaBrokers.split(",")) {
            SimpleConsumer simpleConsumer = new SimpleConsumer(kafkaBroker.split(":")[0],
                    Integer.valueOf(kafkaBroker.split(":")[1]), 10000, 1024, "consumer");
            TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);
            TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
            for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
                for (PartitionMetadata part : metadata.partitionsMetadata()) {
//            	Broker leader = part.leader();//kafka 0.8.1.1
                    BrokerEndPoint leader = part.leader(); //kafka 0.9.0
                    if (leader != null) {
                        TopicAndPartition topicAndPartition = new TopicAndPartition(metadata.topic(), part.partitionId());
                        PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
                                kafka.api.OffsetRequest.LatestTime(), 10000);
                        OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition,
                                partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
                        OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
                        if (!offsetResponse.hasError()) {
                            long[] offsets = offsetResponse.offsets(metadata.topic(), part.partitionId());
                            retVals.put(topicAndPartition, offsets[0]);
                        }
                    }
                }
            }
            simpleConsumer.close();
        }
        return retVals;
    }

    public static void saveOffset(final Broadcast<String> kafkaZkConnectBroadcast,
                                  final Broadcast<String> zkConnectionTimeoutBroadcast,
                                  final Broadcast<String> zkSessionTimeoutBroadcast, final Broadcast<String> groupIdBroadcast,
                                  final AtomicReference<OffsetRange[]> offsetRanges) throws Exception {
        org.codehaus.jackson.map.ObjectMapper objectMapper = new org.codehaus.jackson.map.ObjectMapper();
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(kafkaZkConnectBroadcast.getValue())
                .connectionTimeoutMs(Integer.parseInt(zkConnectionTimeoutBroadcast.getValue()))
                .sessionTimeoutMs(Integer.parseInt(zkSessionTimeoutBroadcast.getValue()))
                .retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
        curatorFramework.start();
        for (OffsetRange offsetRange : offsetRanges.get()) {
            final byte[] offsetBytes = objectMapper.writeValueAsBytes(offsetRange.untilOffset());
            String nodePath = "/consumers/" + groupIdBroadcast.getValue()
                    + "/offsets/" + offsetRange.topic() + "/" + offsetRange.partition();
            if (null != curatorFramework.checkExists().forPath(nodePath)) {
                curatorFramework.setData().forPath(nodePath, offsetBytes);
            } else {
                curatorFramework.create().creatingParentsIfNeeded().forPath(nodePath, offsetBytes);
            }
        }
        curatorFramework.close();
    }

    public static void saveOffsetSingle(final Broadcast<String> kafkaZkConnectBroadcast,
                                        final Broadcast<String> zkConnectionTimeoutBroadcast,
                                        final Broadcast<String> zkSessionTimeoutBroadcast, final Broadcast<String> groupIdBroadcast,
                                        final OffsetRange offsetRange) throws Exception {
        org.codehaus.jackson.map.ObjectMapper objectMapper = new org.codehaus.jackson.map.ObjectMapper();
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(kafkaZkConnectBroadcast.getValue())
                .connectionTimeoutMs(Integer.parseInt(zkConnectionTimeoutBroadcast.getValue()))
                .sessionTimeoutMs(Integer.parseInt(zkSessionTimeoutBroadcast.getValue()))
                .retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
        curatorFramework.start();
        final byte[] offsetBytes = objectMapper.writeValueAsBytes(offsetRange.untilOffset());
        String nodePath = "/consumers/" + groupIdBroadcast.getValue()
                + "/offsets/" + offsetRange.topic() + "/" + offsetRange.partition();
        if (null != curatorFramework.checkExists().forPath(nodePath)) {
            curatorFramework.setData().forPath(nodePath, offsetBytes);
        } else {
            curatorFramework.create().creatingParentsIfNeeded().forPath(nodePath, offsetBytes);
        }
        curatorFramework.close();
    }
}

這里還是推薦用第二種方法,第二種方法在業務處理過程中更加的靈活可用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM