Kafka Consumer應用與高級應用


Kafka Consumer應用與高級應用

PS:本博客僅作學習、總結、交流使用,參考以下博客&資料

1.http://kafka.apache.org/intro.html

2.https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

3.http://www.cnblogs.com/luotianshuai/p/5206662.html

4.http://www.cnblogs.com/fxjwind/p/3794255.html

5.https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

一.Kafka使用背景

  在我們大量使用分布式數據庫、分布式計算集群的時候,是否會遇到這樣的一些問題:
  1. 我們想分析下用戶行為(pageviews),以便我們設計出更好的廣告位
  2. 我想對用戶的搜索關鍵詞進行統計,分析出當前的流行趨勢
  3. 有些數據,存儲數據庫浪費,直接存儲硬盤效率又低 
   這些場景都有一個共同點:數據是由上游模塊產生,下游模塊,使用上游模塊的數據計算、統計、分析,這個時候就可以使用消息系統,尤其是分布式消息系統!

.Kafka(科技術語)

  Kafka是一種高吞吐量的 分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像 Hadoop的一樣的 日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過 Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過 集群機來提供實時的消費。

      2.1 特性

    Kafka 是一種高吞吐量 的分布式發布訂閱消息系統,有如下特性:
  • 通過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能
  • 高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒數百萬 的消息
  • 支持通過Kafka服務器和消費機集群來分區消息
  • 支持 Hadoop並行數據加載

       2.2 Kafka相關術語介紹

  • Broker        Kafka集群包含一個或多個服務器,這種服務器被稱為broker
  • Topic          每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
  • Partition     Partition是物理上的概念,每個Topic包含一個或多個Partition.
  • Producer     負責發布消息到Kafka broker
  • Consumer   消息消費者,向Kafka broker讀取消息的客戶端。
  • Consumer Group    每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)

三、KafKa安裝&配置  參考 http://www.cnblogs.com/denghongfu/p/6085685.html

四、KafKa Consumer接口

       kafka的consumer接口,有兩種版本:

       A.high-level 比較簡單不用關心offset, 會自動的讀zookeeper中該Consumer group的last offset

       B.就是官網上提供的 SimpleConsumer Example low-level

       幾點說明:

          1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許並發的,所以consumer數不要大於partition數 
      2. 如果consumer比partition少,一個consumer會對應於多個partitions,這里主要合理分配consumer數和partition數,否則會導致partition里面的數據被取的不均勻 
          最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目 
         3. 如果consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不同 
         4. 增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發生變化 
         5. High-level接口中獲取不到數據的時候是會block的

  1.High-level

        如果測試流程是,先produce一些數據,然后再用consumer讀的話,記得加上第一句設置因為初始的offset默認是非法的,然后這個設置的意思是,當offset非法時,如何修正offset,默認是largest,即最新,所以不加這個配置,你是讀不到你之前produce的數據的,而且這個時候你再加上smallest配置也沒用了,因為此時offset是合法的,不會再被修正了,需要手工或用工具改重置offset

 

package com.tydic.kafka.client;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringEncoder;

/**
* Description:
* Created by hadoop on 2016/11/9.
* Date 2016/11/9
*/
public class ConsumerKafka {
private ConsumerConfig config;
private String topic;
private int partitionsNum;
private MessageExecutor executor;
private ConsumerConnector connector;
private ExecutorService threadPool;
public ConsumerKafka(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
Properties prop = new Properties();
prop.put("auto.offset.reset", "smallest"); //必須要加,如果要讀舊數據
prop.put("zookeeper.connect", "cna3:2181,cna4:2181,cna5:2181");
prop.put("serializer.class", StringEncoder.class.getName());
prop.put("metadata.broker.list", "cna3:9092,cna4:9092,cna5:9092");
prop.put("group.id", "test-consumer-group");
config = new ConsumerConfig(prop);
this.topic = topic;
this.partitionsNum = partitionsNum;
this.executor = executor;
}

public void start() throws Exception{
connector = Consumer.createJavaConsumerConnector(config);
Map<String,Integer> topics = new HashMap<String,Integer>();
topics.put(topic, partitionsNum);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
threadPool = Executors.newFixedThreadPool(partitionsNum);
for(KafkaStream<byte[], byte[]> partition : partitions){
threadPool.execute(new MessageRunner(partition));
}
}


public void close(){
try{
threadPool.shutdownNow();
}catch(Exception e){
//
}finally{
connector.shutdown();
}

}

class MessageRunner implements Runnable{
private KafkaStream<byte[], byte[]> partition;

MessageRunner(KafkaStream<byte[], byte[]> partition) {
this.partition = partition;
}

public void run(){
ConsumerIterator<byte[], byte[]> it = partition.iterator();
while(it.hasNext()){
MessageAndMetadata<byte[],byte[]> item = it.next();
System.out.println("partiton:" + item.partition());
System.out.println("offset:" + item.offset());
executor.execute(new String(item.message()));//UTF-8
}
}
}

interface MessageExecutor {

public void execute(String message);
}

/**
* @param args
*/
public static void main(String[] args) {
ConsumerKafka consumer = null;
try{
MessageExecutor executor = new MessageExecutor() {

public void execute(String message) {
System.out.println(message);

}
};
consumer = new ConsumerKafka("topic1",3, executor);
consumer.start();
}catch(Exception e){
e.printStackTrace();
}finally{
if(consumer != null){
consumer.close();
}
}

}

}

在用high-level的consumer時,兩個給力的工具,

1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv

可以看到當前group offset的狀況,比如這里看pv的狀況,3個partition

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   21              21              0               none 
pv              page_visits                    1   19              19              0               none 
pv              page_visits                    2   20              20              0               none

關鍵就是offset,logSize和Lag 
這里以前讀完了,所以offset=logSize,並且Lag=0

2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties  page_visits

3個參數, 
[earliest | latest],表示將offset置到哪里 
consumer.properties ,這里是配置文件的路徑 
topic,topic名,這里是page_visits

我們對上面的pv group執行完這個操作后,再去check group offset狀況,結果如下,

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   0               21              21              none 
pv              page_visits                    1   0               19              19              none 
pv              page_visits                    2   0               20              20              none

可以看到offset已經被清0,Lag=logSize

多線程consumer的完整代碼

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 創建Connector,注意下面對conf的配置
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }
 
    public void run(int a_numThreads) { // 創建並發的consumers
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads)); // 描述讀取哪個topic,需要幾個線程讀
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 創建Streams
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每個線程對應於一個KafkaStream
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber)); // 啟動consumer thread
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
    }
}

  

 2.low-level SimpleConsumer Example

package com.tydic.kafka.client;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.fastjson.JSON;
import com.tydic.kafka.util.Utils;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 * Description:kafka消費者實現,關注信息消費位置
 * Created by hadoop on 2016/11/18.
 * Date 2016/11/18
 */
public class KafkaSimpleConsumer {
    public static void main(String arg[]) {
        KafkaSimpleConsumer example = new KafkaSimpleConsumer();
        List<String> seeds = new ArrayList<>();
        int maxReads = 0;
        int partition = 0;
        int port = 0;
        String topic = "";
        try {
            System.out.println(JSON.toJSONString(arg, true));
            Map<String, Object> paramMap = Utils.parseParam(arg);
            System.out.println(JSON.toJSONString(paramMap, true));
            if (null == paramMap || paramMap.size() == 0) {
                return;
            }
            maxReads = (int) paramMap.get("consumer.maxReads");
            partition = (int) paramMap.get("consumer.partition");
            List<String> seedStr = (List<String>) paramMap.get("consumer.seedBrokers");
            for(String s:seedStr){
                seeds.add(s);
            }
            port = (int) paramMap.get("consumer.port");
            topic = (String) paramMap.get("consumer.topic");
            System.out.print("maxReads=" + maxReads + "partition=" + partition + "seedStr=" + seedStr + "port=" + port + "topic=" + topic);
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<>();

    public KafkaSimpleConsumer() {
        m_replicaBrokers = new ArrayList<>();
    }

    /**
     *
     * @param a_maxReads  Maximum number of messages to read (so we don’t loop forever) 最大讀取消息數量
     * @param a_topic      Topic to read from   訂閱的topic
     * @param a_partition   Partition to read from  查找的分區
     * @param a_seedBrokers   One broker to use for Metadata lookup   broker節點
     * @param a_port         Port the brokers listen on  端口
     * @throws Exception
     */
    public void run(long a_maxReads, String a_topic, int a_partition,
                    List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        // 獲取指定topic partition的元數據
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
                a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition,
                kafka.api.OffsetRequest.LatestTime(), clientName);
        System.out.print("readOffset="+readOffset+kafka.api.OffsetRequest.LatestTime());
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000,
                        64 * 1024, clientName);
            }
            // Note: this fetchSize of 100000 might need to be increased if
            // large batches are written to Kafka
            FetchRequest req = new FetchRequestBuilder().clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);
            if (fetchResponse.hasError()) {
                System.out.println("error");
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:"
                        + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                // 處理offset非法的問題,用最新的offset
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition,
                            kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                // 更新leader broker
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
                    a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                System.out.println("currentOffset="+currentOffset+"readOffset="+readOffset);
                // 必要判斷,因為對於compressed message,會返回整個block,所以可能包含old的message
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset
                            + " Expecting: " + readOffset);
                    continue;
                }
                // 獲取下一個readOffset
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset())
                        + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }
        if (consumer != null)
            consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic,
                                     int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
        //build offset fetch request info
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        //取到offsets
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: "
                    + response.errorCode(topic, partition));
            return 0;
        }
        //取到的一組offset
        long[] offsets = response.offsets(topic, partition);
        //取第一個開始讀
        return offsets[0];
    }

    /**
     * @Descrition :Finding the Lead Broker for a Topic and Partition 從活躍的Broker列表中找出指定Topic、Partition中的Leader Broker
     *思路就是,遍歷每個broker,取出該topic的metadata,然后再遍歷其中的每個partition metadata,
     * 如果找到我們要找的partition就返回根據返回的PartitionMetadata.leader().host()找到leader broker
     * @param a_oldLeader
     * @param a_topic
     * @param a_partition
     * @param a_port
     * @return
     * @throws Exception
     */
    private String findNewLeader(String a_oldLeader, String a_topic,
                                 int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
                    && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers,
                                         int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        //遍歷每個broker
        for (String seed : a_seedBrokers) {
            //遍歷每個broker
            SimpleConsumer consumer = null;
            try {
                //創建Simple Consumer,
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                //發送TopicMetadata Request請求
                TopicMetadataResponse resp = consumer.send(req);
                //取到Topic的Metadata
                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    //遍歷每個partition的metadata
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        //確認是否是我們要找的partition
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;  //找到就返回
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed
                        + "] to find Leader for [" + a_topic + ", "
                        + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }

}

  It's all。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM