Kafka高級API和低級API


Kafka消費過程分析

kafka提供了兩套consumer API高級Consumer API和低級API。

高級API

1高級API優點

高級API 寫起來簡單

不需要去自行去管理offset,系統通過zookeeper自行管理

不需要管理分區,副本等情況,系統自動管理

消費者斷線會自動根據上一次記錄在zookeeper中的offset去接着獲取數據(默認設置1分鍾更新一下zookeeper中存的的offset

可以使用group來區分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)

2高級API缺點

不能自行控制offset(對於某些特殊需求來說)

不能細化控制如分區、副本、zk

2 低級API

1)低級 API 優點

能夠開發者自己控制offset,想從哪里讀取就從哪里讀取。

自行控制連接分區,對分區自定義進行負載均衡

zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內存中)

2)低級API缺點

太過復雜,需要自行控制offset,連接哪個分區,找到分區leader

高級producer

package com.sinoiov.kafka.test;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**
 * Created by caoyu on 16/4/21.
 * By 中交興路 大數據中心-基礎平台部
 */
public class Kafka_produce extends Thread{
    private String topic;
    private SimpleDateFormat sdf = new SimpleDateFormat("MM-dd hh:mm:ss");

    public Kafka_produce(String topic){
        super();
        this.topic = topic;
    }

    @Override
    public void run() {
        Producer<String, String> producer = createProducer();
        long i = 0;
        while(true){
            i++;
            long now = System.currentTimeMillis();
            KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic,sdf.format(new Date(now))+"_"+i+"");
            producer.send(message);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private Producer<String,String> createProducer(){
        Properties properties = new Properties();
        properties.put("metadata.broker.list","192.168.110.81:9092,192.168.110.82:9092,192.168.110.83:9092");
        properties.put("serializer.class", StringEncoder.class.getName());
        properties.put("zookeeper.connect", "nnn1:2181,nnn2:2181,nslave1:2181");
        return new Producer<String, String>(new ProducerConfig(properties));
    }
}

高級consumer

package com.sinoiov.kafka.test;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by caoyu on 16/4/21.
 * By 中交興路 大數據中心-基礎平台部
 */
public class Kafka_consumer extends Thread {
    private String topic;
    private ConsumerConnector consumer;

    public Kafka_consumer(String topic){
        super();
        this.topic = topic;
        consumer = createConsumer();
    }

    public void shutDown(){
        if(consumer != null)
            consumer.shutdown();
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageSteam = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> steam = messageSteam.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = steam.iterator();
        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println(message);
        }
    }

    private ConsumerConnector createConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","nnn1:2181,nnn2:2181,nslave1:2181");
        properties.put("group.id", "testsecond");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
}

同樣,代碼也不復雜,基本都能參考看懂。

高級 API 的特點

優點
  • 高級 API 寫起來簡單

  • 不需要去自行去 管理offset,系統通過 zookeeper 自行管理

  • 不需要管理分區,副本等情況,系統自動管理

  • 消費者斷線會自動根據上一次記錄在 zookeeper 中的 offset去接着獲取數據(默認設置1分鍾更新一下 zookeeper 中存的的 offset)

  • 可以使用 group 來區分對同一個 topic 的不同程序訪問分離開來(不同的 group 記錄不同的 offset,這樣不同程序讀取同一個 topic 才不會因為 offset 互相影響)

缺點
  • 不能自行控制 offset(對於某些特殊需求來說)

  • 不能細化控制如分區、副本、zk 等

低級 API 的特點

優點
  • 能夠開發者自己控制 offset,想從哪里讀取就從哪里讀取。

  • 自行控制連接分區,對分區自定義進行負載均衡

  • 對 zookeeper 的依賴性降低(如:offset 不一定非要靠 zk 存儲,自行存儲 offset 即可,比如存在文件或者內存中)

缺點

  • 太過復雜,需要自行控制 offset,連接哪個分區,找到分區 leader 等,請參考下面的低級 API 的示例代碼

 

低級 API 示例代碼

 

package com.sinoiov.kafka.test;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 * Created by caoyu on 16/4/26.
 * By 中交興路 大數據中心-基礎平台部
 */
public class SimpleExample {
    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        // 最大讀取消息數量
        long maxReads = Long.parseLong("3");
        // 要訂閱的topic
        String topic = "test1";
        // 要查找的分區
        int partition = Integer.parseInt("0");
        // broker節點的ip
        List<String> seeds = new ArrayList<String>();
        seeds.add("192.168.110.81");
        seeds.add("192.168.110.82");
        seeds.add("192.168.110.83");
        // 端口
        int port = Integer.parseInt("9092");
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // 獲取指定Topic partition的元數據
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5)
                    break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }

                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null)
            consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    /**
     * @param a_oldLeader
     * @param a_topic
     * @param a_partition
     * @param a_port
     * @return String
     * @throws Exception
     *             找一個leader broker
     */
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop: for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (BrokerEndPoint replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM