kafka api的基本使用


kafka API

kafka Consumer提供兩套Java API:高級Consumer API、和低級Consumer API。

高級Consumer API 優點:

  • 高級API寫起來簡單,易用。
    不需要自行去管理offset,API已經封裝好了offset這塊的東西,會通過zookeeper自行管理
    不需要管理分區,副本等情況,系統自動管理
    消費者斷線后會自動根據上次記錄在zookeeper中的offset接着消費消息。

高級Consumer API 缺點:

  • 不能自行控制offset。
  • 不能自行管理分區,副本,zk等相關信息。

低級API 優點:

  • 能夠讓開發者自己維護offset.想從哪里消費就從哪里消費
  • 自行控制連接分區,對分區自定義負載均衡
  • 對zookeeper的依賴性降低(如 offset 不一定要用zk來存儲,可以存在緩存里或者內存中)

缺點:
過於復雜,需要自行控制offset,連接哪個分區,找分區leader等。

簡單入門使用

  1. 引入maven依賴
dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
  1. Producer簡單使用
package com.sonly.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * <b>package:com.sonly.kafka</b>
 * <b>project(項目):kafkaAPIdemo</b>
 * <b>class(類)demo</b>
 * <b>creat date(創建時間):2019-05-03 12:17</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(備注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class demo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"k8s-n1:9092");
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("mytest", Integer.toString(i), Integer.toString(i)));
        producer.close();

    }
}

帶回調函數的生產者

package com.sonly.kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * <b>package:com.sonly.kafka</b>
 * <b>project(項目):kafkaAPIdemo</b>
 * <b>class(類)${CLASS_NAME}</b>
 * <b>creat date(創建時間):2019-05-03 12:58</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(備注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class demo1 {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //設置kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"k8s-n1:9092");
        //設置brokeACK應答機制
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        //設置key序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //設置value序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //設置批量大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"6238");
        //設置提交延時
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        //設置producer緩存
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,Long.MAX_VALUE);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for ( int i = 0; i < 12; i++) {
            final int finalI = i;
            producer.send(new ProducerRecord<String, String>("mytest", Integer.toString(i), Integer.toString(i)), new Callback() {

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception==null){
                        System.out.println("發送成功: " + finalI +","+metadata.partition()+","+ metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}

結果:

發送成功: 0,0,170
發送成功: 2,0,171
發送成功: 11,0,172
發送成功: 4,1,101
發送成功: 5,2,116
發送成功: 6,2,117
發送成功: 10,2,118
發送成功: 1,3,175
發送成功: 3,3,176
發送成功: 7,3,177
發送成功: 8,3,178
發送成功: 9,3,179

數據不均等的分配到0-3 號分區上
3. 自定義分區發送

package com.sonly.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * <b>package:com.sonly.kafka</b>
 * <b>project(項目):kafkaAPIdemo</b>
 * <b>class(類)${CLASS_NAME}</b>
 * <b>creat date(創建時間):2019-05-03 13:43</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(備注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class CustomProducer implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {
    }
}

設置分區

package com.sonly.kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * <b>package:com.sonly.kafka</b>
 * <b>project(項目):kafkaAPIdemo</b>
 * <b>class(類)${CLASS_NAME}</b>
 * <b>creat date(創建時間):2019-05-03 13:46</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(備注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class demo2 {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //設置kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"k8s-n1:9092");
        //設置brokeACK應答機制
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        //設置key序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //設置value序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //設置批量大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"6238");
        //設置提交延時
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        //設置producer緩存
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,Long.MAX_VALUE);
        //設置partition
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.sonly.kafka.CustomProducer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for ( int i = 0; i < 12; i++) {
            final int finalI = i;
            producer.send(new ProducerRecord<String, String>("mytest", Integer.toString(i), Integer.toString(i)), new Callback() {

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception==null){
                        System.out.println("發送成功: " + finalI +","+metadata.partition()+","+ metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}

消費者高級API:

package com.sonly.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Arrays;
import java.util.Properties;

/**
 * <b>package:com.sonly.kafka.consumer</b>
 * <b>project(項目):kafkaAPIdemo</b>
 * <b>class(類)${CLASS_NAME}</b>
 * <b>creat date(創建時間):2019-05-03 13:59</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(備注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class ConsumerDemo {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //設置kafka集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"k8s-n1:9092");
        //設置brokeACK應答機制
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"teste3432");
        //設置key反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //設置value反序列化
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //設置拿取大小
        properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,100*1024*1024);
        //設置自動提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //設置自動提交延時
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("mytest","test"));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic()+"--"+record.partition()+"--"+record.value());
            }
        }
    }
}

低級API:
1.消費者使用低級API的主要步驟

步驟 主要工作
1 根據指定分區從topic元數據中找到leader
2 獲取分區最新的消費進度
3 從主副本中拉取分區消息
4 識別主副本的變化,重試

2.方法描述:

方法 描述
findLeader() 客戶端向種子階段發送主題元數據,將副本加入備用節點
getLastOffset() 消費者客戶端發送偏移量請求,獲取分區最近的偏移量
run() 消費者低級API拉取消息的方法
findNewLeader() 當分區主副本節點發生故障時,客戶端將要找出新的主副本

修改pom

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>
package com.sonly.kafka.consumer;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.KAFKA_0_8_1$;
import kafka.cluster.BrokerEndPoint;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.kafka.clients.consumer.Consumer;

import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * <b>package:com.sonly.kafka.consumer</b>
 * <b>project(項目):kafkaAPIdemo</b>
 * <b>class(類)${CLASS_NAME}</b>
 * <b>creat date(創建時間):2019-05-03 15:21</b>
 * <b>author(作者):</b>xxydliuyss</br>
 * <b>note(備注)):</b>
 * If you want to change the file header,please modify zhe File and Code Templates.
 */
public class LowerConsumer {
    //保存offset
    private long offset;
    //保存分區副本
    private Map<Integer,List<BrokerEndPoint>> partitionsMap = new HashMap<Integer, List<BrokerEndPoint>>(1024);
    public static void main(String[] args) throws InterruptedException {
        List<String> brokers = Arrays.asList("k8s-n1", "k8s-n2","k8s-n3");
        int port = 9092;
        int partition = 1;
        long offset=2;
        LowerConsumer lowerConsumer = new LowerConsumer();
        while(true){
//            offset = lowerConsumer.getOffset();
            lowerConsumer.getData(brokers,port,"mytest",partition,offset);
            TimeUnit.SECONDS.sleep(1);
        }

    }

    public long getOffset() {
        return offset;
    }


    private BrokerEndPoint findLeader(Collection<String> brokers,int port,String topic,int partition){
        for (String broker : brokers) {
            //創建消費者對象操作每一台服務器
            SimpleConsumer getLeader = new SimpleConsumer(broker, port, 10000, 1024 * 24, "getLeader");
            //構造元數據請求
            TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic));
            //發送元數據請求
            TopicMetadataResponse response = getLeader.send(topicMetadataRequest);
            //解析元數據
            List<TopicMetadata> topicMetadatas = response.topicsMetadata();
            //遍歷數據
            for (TopicMetadata topicMetadata : topicMetadatas) {
                //獲取分區元數據信息
                List<PartitionMetadata> partitionMetadatas = topicMetadata.partitionsMetadata();
                //遍歷分區元數據
                for (PartitionMetadata partitionMetadata : partitionMetadatas) {
                    if(partition == partitionMetadata.partitionId()){
                        //保存,分區對應的副本,如果需要主副本掛掉重新獲取leader只需要遍歷這個緩存即可
                        List<BrokerEndPoint> isr = partitionMetadata.isr();
                        this.partitionsMap.put(partition,isr);
                        return partitionMetadata.leader();
                    }
                }
            }
        }
        return null;
    }
    private void getData(Collection<String> brokers,int port,String topic,int partition,long offset){
        //獲取leader
        BrokerEndPoint leader = findLeader(brokers, port, topic, partition);
        if(leader==null) return;
        String host = leader.host();
        //獲取數據的消費者對象
        SimpleConsumer getData = new SimpleConsumer(host, port, 10000, 1024 * 10, "getData");
        //構造獲取數據request 這里一次可以添加多個topic addFecth 添加即可
        FetchRequest fetchRequestBuilder = new FetchRequestBuilder().addFetch(topic, partition, offset, 1024 * 10).build();
        //發送獲取數據請求
        FetchResponse fetchResponse = getData.fetch(fetchRequestBuilder);
        //解析元數據返回,這是message的一個set集合
        ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
        //遍歷消息集合
        for (MessageAndOffset messageAndOffset : messageAndOffsets) {
            long offset1 = messageAndOffset.offset();
            this.setOffset(offset);
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] buffer = new byte[payload.limit()];
            payload.get(buffer);
            String message = new String(buffer);
            System.out.println("offset:"+ offset1 +"--message:"+ message);

        }
    }

    private void setOffset(long offset) {
        this.offset = offset;
    }
}

這個低級API在最新的kafka版本中已經不再提供了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM