[Kafka] - Kafka Java Consumer實現(二)


Kafka提供了兩種Consumer API,分別是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API)

High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取數據、更新offset、設置偏移量等操作屏蔽掉,直接將操作數據流的處理工作提供給編寫程序的人員。優點是:操作簡單;缺點:可操作性太差,無法按照自己的業務場景選擇處理方式。(入口類:ConsumerConnector)

Lower Level Consumer API:通過直接操作底層API獲取數據的方式獲取Kafka中的數據,需要自行給定分區、偏移量等屬性。優點:可操作性強;缺點:代碼相對而言比較復雜。(入口類:SimpleConsumer) 

這里主要將High Level Consumer API使用Java代碼實現並測試:

Lower Level Consumer API詳見博客:[Kafka] - Kafka Java Consumer實現(一)

========================================================================

一、JavaKafkaConsumerHighAPI:使用Kafka High Level Consumer API多線程讀取數據的相關API實現,具體代碼如下:

import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 自定義簡單Kafka消費者, 使用高級API
 * Created by gerry on 12/21.
 */
public class JavaKafkaConsumerHighAPI implements Runnable {
    /**
     * Kafka數據消費對象
     */
    private ConsumerConnector consumer;

    /**
     * Kafka Topic名稱
     */
    private String topic;

    /**
     * 線程數量,一般就是Topic的分區數量
     */
    private int numThreads;

    /**
     * 線程池
     */
    private ExecutorService executorPool;

    /**
     * 構造函數
     *
     * @param topic      Kafka消息Topic主題
     * @param numThreads 處理數據的線程數/可以理解為Topic的分區數
     * @param zookeeper  Kafka的Zookeeper連接字符串
     * @param groupId    該消費者所屬group ID的值
     */
    public JavaKafkaConsumerHighAPI(String topic, int numThreads, String zookeeper, String groupId) {
        // 1. 創建Kafka連接器
        this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
        // 2. 數據賦值
        this.topic = topic;
        this.numThreads = numThreads;
    }

    @Override
    public void run() {
        // 1. 指定Topic
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(this.topic, this.numThreads);

        // 2. 指定數據的解碼器
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        // 3. 獲取連接數據的迭代器對象集合
        /**
         * Key: Topic主題
         * Value: 對應Topic的數據流讀取器,大小是topicCountMap中指定的topic大小
         */
        Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        // 4. 從返回結果中獲取對應topic的數據流處理器
        List<KafkaStream<String, String>> streams = consumerMap.get(this.topic);

        // 5. 創建線程池
        this.executorPool = Executors.newFixedThreadPool(this.numThreads);

        // 6. 構建數據輸出對象
        int threadNumber = 0;
        for (final KafkaStream<String, String> stream : streams) {
            this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream, threadNumber));
            threadNumber++;
        }
    }

    public void shutdown() {
        // 1. 關閉和Kafka的連接,這樣會導致stream.hashNext返回false
        if (this.consumer != null) {
            this.consumer.shutdown();
        }

        // 2. 關閉線程池,會等待線程的執行完成
        if (this.executorPool != null) {
            // 2.1 關閉線程池
            this.executorPool.shutdown();

            // 2.2. 等待關閉完成, 等待五秒
            try {
                if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
                    System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
                }
            } catch (InterruptedException e) {
                System.out.println("Interrupted during shutdown, exiting uncleanly!!");
            }
        }

    }

    /**
     * 根據傳入的zk的連接信息和groupID的值創建對應的ConsumerConfig對象
     *
     * @param zookeeper zk的連接信息,類似於:<br/>
     *                  hadoop-senior01.ibeifeng.com:2181,hadoop-senior02.ibeifeng.com:2181/kafka
     * @param groupId   該kafka consumer所屬的group id的值, group id值一樣的kafka consumer會進行負載均衡
     * @return Kafka連接信息
     */
    private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
        // 1. 構建屬性對象
        Properties prop = new Properties();
        // 2. 添加相關屬性
        prop.put("group.id", groupId); // 指定分組id
        prop.put("zookeeper.connect", zookeeper); // 指定zk的連接url
        prop.put("zookeeper.session.timeout.ms", "400"); //
        prop.put("zookeeper.sync.time.ms", "200");
        prop.put("auto.commit.interval.ms", "1000");
        // 3. 構建ConsumerConfig對象
        return new ConsumerConfig(prop);
    }


    /**
     * Kafka消費者數據處理線程
     */
    public static class ConsumerKafkaStreamProcesser implements Runnable {
        // Kafka數據流
        private KafkaStream<String, String> stream;
        // 線程ID編號
        private int threadNumber;

        public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream, int threadNumber) {
            this.stream = stream;
            this.threadNumber = threadNumber;
        }

        @Override
        public void run() {
            // 1. 獲取數據迭代器
            ConsumerIterator<String, String> iter = this.stream.iterator();
            // 2. 迭代輸出數據
            while (iter.hasNext()) {
                // 2.1 獲取數據值
                MessageAndMetadata value = iter.next();

                // 2.2 輸出
                System.out.println(this.threadNumber + ":" + ":" + value.offset() + value.key() + ":" + value.message());
            }
            // 3. 表示當前線程執行完成
            System.out.println("Shutdown Thread:" + this.threadNumber);
        }
    }
}

 

二、JavaKafkaConsumerHighAPITest:測試類

/**
 * Created by ibf on 12/21.
 */
public class JavaKafkaConsumerHighAPITest {
    public static void main(String[] args) {
        String zookeeper = "192.168.187.146:2181";
        String groupId = "group1";
        String topic = "test2";
        int threads = 1;

        JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);
        new Thread(example).start();

        // 執行10秒后結束
        int sleepMillis = 600000;
        try {
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 關閉
        example.shutdown();
    }
}

 

三、運行測試截圖

Kafka相關命令可以參考博客[Kafka] - Kafka基本操作命令, 測試截圖如下:

 

至此,開發基本完成

========================================================

四、Kafka Pom文件依賴

<properties>
    <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM