kafka的客戶端操作,consumer API


一:Consumer API

1.自動提交程序

  這種不建議在實際中使用

    /**
     * 簡單的消費kafka消息,自動提交
     * 消費過的數據再消費不到了
     */
    public static void helloConsumer() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        // 訂閱
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
        }
    }

  

2.手動提交offset

  如果事務失敗了,么有提交,下次還能繼續獲取到數據

 /**
     * 手動提交
     */
    public static void commitedOffset() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        // 訂閱
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
            // 手動提交,for循環成功才執行;不然不執行,在下一次還會再拉取數據
            consumer.commitAsync();
        }
    }

  

3.ConsumerGroup

  單個分區的消息只能有ConsumerGroup中的某個Consumer消費

  Consumer從partition中的消費是順序,默認從頭開始

  單個ConsumerGroup會消費所有partition中的消息

 

4.特性

  

 

 

5.按照patition維度進行處理

/**
     * 按照patition維度進行處理
     */
    public static void commitedOffsetWithPartition() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        // 訂閱
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecords) {

                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }

  

6.只消費某個partition

    /**
     * 訂閱topic下的partition中的內容
     *
     */
    public static void commitedOffsetWithTopicPartition() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);

        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);


        // 訂閱partition
        consumer.assign(Arrays.asList(p1));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecords) {

                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }

  

二:Consumer API的多線程處理

 1.第一種方式

  

 

2.程序

 

package com.jun.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class ConsumerThreadSample {
    private final static String TOPIC_NAME="caojun-topic";

    /*
        這種類型是經典模式,每一個線程單獨創建一個KafkaConsumer,用於保證線程安全
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);

        t1.start();

        Thread.sleep(15000);

        r1.shutdown();
    }

    public static class KafkaConsumerRunner implements Runnable{
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        public KafkaConsumerRunner() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.19.129:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

            consumer.assign(Arrays.asList(p0,p1));
        }


        public void run() {
            try {
                while(!closed.get()) {
                    //處理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));

                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 處理每個分區的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(),record.offset(), record.key(), record.value());
                        }

                        // 返回去告訴kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }

                }
            }catch(WakeupException e) {
                if(!closed.get()) {
                    throw e;
                }
            }finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }

}

  

 

3.第二種方式

  這種方式,是沒有辦法提交offset的,只是為了快速消費數據

  

 

 

4.程序

package com.jun.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerRecordThreadSample {
    private final static String TOPIC_NAME = "caojun-topic";

    public static void main(String[] args) throws InterruptedException {
        String brokerList = "192.168.19.129:9092";
        String groupId = "test";
        int workerNum = 5;

        CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
        consumers.execute(workerNum);

        Thread.sleep(1000000);

        consumers.shutdown();

    }

    // Consumer處理
    public static class CunsumerExecutor{
        private final KafkaConsumer<String, String> consumer;
        private ExecutorService executors;

        public CunsumerExecutor(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
        }

        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(200);
                for (final ConsumerRecord record : records) {
                    executors.submit(new ConsumerRecordWorker(record));
                }
            }
        }

        public void shutdown() {
            if (consumer != null) {
                consumer.close();
            }
            if (executors != null) {
                executors.shutdown();
            }
            try {
                if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }


    }

    // 記錄處理
    public static class ConsumerRecordWorker implements Runnable {

        private ConsumerRecord<String, String> record;

        public ConsumerRecordWorker(ConsumerRecord record) {
            this.record = record;
        }

        @Override
        public void run() {
            // 假如說數據入庫操作
            System.out.println("Thread - "+ Thread.currentThread().getName());
            System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }

    }
}

 

三:一些其他的特性

1.手動控制offset

/**
     * 手動指定offset的起始位置,手動提交offset
     *
     * 手動指定offset起始位置
     *  1、人為控制offset起始位置
     *  2、如果出現程序錯誤,重復消費一次
     *
     * 步驟
     *   1、第一次從0消費【一般情況】
     *   2、比如一次消費了100條, offset置為101並且存入Redis
     *   3、每次poll之前,從redis中獲取最新的offset位置
     *   4、每次從這個位置開始消費
     *
     * 建議
     *   1.使用redis進行保存
     */

    public static void controllerOffset() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);

        // 訂閱partition
        consumer.assign(Arrays.asList(p0));
        while (true) {
            // 設置offset
            consumer.seek(p0, 5);
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecords) {

                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }

  

2.限流

/**
     * 限流
     */
    public static void controllerLimit() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);

        TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
        TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

        long totalNum = 100;

        // 訂閱partition
        consumer.assign(Arrays.asList(p0, p1));
        while (true) {
            // 1000毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 操作維度是partition了,每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                long num = 0;
                for (ConsumerRecord<String, String> record : pRecords) {
                    System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                     /*
                        1、接收到record信息以后,去令牌桶中拿取令牌
                        2、如果獲取到令牌,則繼續業務處理
                        3、如果獲取不到令牌, 則pause等待令牌
                        4、當令牌桶中的令牌足夠, 則將consumer置為resume狀態
                     */
                    num++;
                    if(record.partition() == 0){
                        if(num >= totalNum){
                            consumer.pause(Arrays.asList(p0));
                        }
                    }

                    if(record.partition() == 1){
                        if(num == 40){
                            consumer.resume(Arrays.asList(p0));
                        }
                    }

                }
                long lastOffset = pRecords.get(pRecords.size() - 1).offset();
                // 手動提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                consumer.commitSync(offset);
            }
        }
    }

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM