Kafka-生產者、消費者、自定義分區器


記錄下和kafka相關的生產者和消費者,文中很多理解參考文末博文、書籍還有某前輩。

生產者

包含原理和代碼兩部分。

原理相關

如果要寫生產者代碼,需要先了解其發送的大致過程,即消息經歷了什么才存儲到對應topic的對應分區,大致如下。

(1)執行生產者代碼后,會在JVM中啟動一個producer,它會將數據發送到指定的topic。

(2)message不會直接就發送出去,會首先封裝成ProducerRecord,構造ProducerRecord實例對象時,可以傳入topic、key、value等。當需要指定消息發送到哪個分區,就需要傳入key。value里是消息內容,一般是json格式。

(3)消息還需要序列化,因為涉及到數據的磁盤落地,然后又重新從磁盤讀取數據,因此需要使用序列化(生產者)和反序列化(消費者)。

(4)序列化后的數據,還會經過分區器,這里可以指定自定義分區器,如果不指定就是默認分區器。分區器決定數據將存在topic哪個分區,那如何知道這個topic有幾個分區?知道了又如何確定哪個分區就是leader分區,就算知道leader分區,又如何判斷屬於哪個broker呢?這一切都需要通過獲取broker上的元數據來得到答案。

在0.8版本,這些元數據是存在zookeeper中的,這樣設計是有弊端的,zookeeper本來不是為高並發設計的,如果大量訪問涌入zookeeper獲取元數據,可能會出問題。在0.10.x之后,這些原數據通過存在某個broker的controller,將從zookeeper獲取的元數據都分發到各個broker一份,因此從其中一個broker獲取到的數據就是元數據,這樣各個broker分攤了zookeeper的壓力,將以前從zookeeper獲取元數據,分到多個broker去提供了。

(5)接下來數據還不會直接發送出去,會先存入到一個默認是32M大小的內存緩沖區。

(6)緩沖區的數據,會先填入一個又一個的batch,默認一個batch是16K,這個也是可以設置batch.size修改的,需要根據實際情況來配置。batch大小達到指定大小就會發送出去,如果大小沒達到16K,還有一個時間限定,可以通過linger.ms來設置,當達到指定的時間不管batch有沒有達到指定大小都會發送出去。

producer會有一個專門的sender線程,將滿足條件的batch一起發送過去,這樣可以將多條消息批量的發送,比一條條的發送更加的節省資源,不用頻繁的創建和銷毀連接,在0.8版本,是沒有batch這個東西的,來一條就發送一條(有改進的空間,仿造批量發送可以提高性能,來自某前輩的經驗)。

(7)消息通過sender發送給leader分區,需要經過三層網絡架構,然后先寫入到broker的os cache里,然后再落地到本地磁盤,落地到磁盤是采用順序寫的方式,一般不會直接寫入到磁盤,這樣會影響性能(datanode寫入數據是直接寫入到磁盤的,如果也先寫入到os cache,會提高整體性能)。

代碼相關

有了上面的原理,生產者的代碼部分相對就好理解了,涉及到性能的優化,也會在代碼中實現,具體參考代碼注釋。

package com.boe.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 定義一個生產者,將消息發送出去
 */
public class MyProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        //step1 配置參數,這些跟優化kafka性能有關系
        Properties props=new Properties();

        //1 連接broker
        props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

        //2 key和value序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //3 acks
        // -1 代表所有處於isr列表中的follower partition都會同步寫入消息成功
        // 0 代表消息只要發送出去就行,其他不管
        // 1 代表發送消息到leader partition寫入成功就可以
        props.put("acks","-1");

        //4 重試次數
        props.put("retries",3);//大部分問題,設置這個就可以解決,生產環境可以設置多些 5-10次

        // 5 隔多久重試一次
        props.put("retry.backoff.ms",2000);

        //6 如果要提升kafka的吞吐量,可以指定壓縮類型,如lz4
        props.put("compression.type","none");

        //7 緩沖區大小,默認是32M
        props.put("buffer.size",33554432);

        //8 一個批次batch的大小,默認是16k,需要根據一條消息的大小去調整
        props.put("batch.size",323840);//設置為32k

        //9 如果一個batch沒滿,達到如下的時間也會發送出去
        props.put("linger.ms",200);

        //10 一條消息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯
        props.put("max.request.size",1048576);

        //11 一條消息發送出去后,多久還沒收到響應,就認為是超時
        props.put("request.timeout.ms",5000);

        //step2 創建生產者對象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);

        //step3 使用消息的封裝形式,注意value一般是json格式
        ProducerRecord<String,String> record=new ProducerRecord<String,String>("topicA","{'name':'clyang','age':'34','salary':'8848'}");
        //ProducerRecord<String,String> record=new ProducerRecord<String,String>("topicA","I am sorry");

        //step4 調用生產者對象的send方法發送消息,有異步和同步兩種選擇

        //1 異步發送,一般使用異步,發送后會執行一個回調函數
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                //判斷是否有異常
                if(exception==null){
                    System.out.println("消息發送到分區"+metadata.partition()+"成功");
                }else{
                    System.out.println("消息發送失敗");
                    //TODO 可以寫入到redis,或mysql
                }
            }
        });

        Thread.sleep(10*1000);

        //2 同步發送,需要等待一條消息發送完成,才能發送下一條消息
        //RecordMetadata recordMetadata = producer.send(record).get();
        //System.out.println("發送到的分區是:"+recordMetadata.partition());

        //step5 關閉連接
        producer.close();
    }

}

執行后,控制台顯示發送消息成功,並打印出發送到了哪個分區。

消費者

包含原理和代碼兩部分。

原理相關

消費者消費數據,需要反序列化數據,且采用了零拷貝的技術,由於消費者和broker都在同一個操作系統下,一般都是linux,不涉及到linux到windows這種跨平台的數據讀取,因此數據反序列化后讀取到了os cache,然后發送到網關就直接被消費者消費,如下圖。如果數據反序列化到os cache(理解為數據的內核態),再拷貝一次到用戶態(這個狀態的數據可以跨系統平台)再消費,在同一平台下這會是一次多余的拷貝,kafka中省略了這個動作,這大大提高了消費者讀取數據的速度。

消費者消費某個leader分區的數據,會從消費者offset的下一個位置開始消費,如圖所示上一次消費到了offset 7的位置,下一次消費就從offset 8的位置開始消費。在zookeeper 0.8版本前,消費者的offset都保存在zookeeper中的,后面考慮到多個消費者要和zookeeper通信獲取offset會增加zookeeper的壓力,從1.0.x開始,這些消費者的offset改保存到了__consumer_offset這個主題里,而它分布在多個broker,將壓力就分攤了。

注意消費者能消費到的數據offset,需要小於這個分區的HW(高水印值),比如下圖這個分區的HW是9,則offset 10開始的數據就不可以消費,后面將整理HW和LEO相關的知識。

代碼相關

有了上面的原理,消費者的代碼部分相對就好理解了,涉及到性能的優化,也會在代碼中實現,具體參考代碼注釋。但是一般消費者是storm、spark streaming或者flink,又是另外的寫法了。

package com.boe.consumer;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.*;

/**
 * 自定義一個消費者,從指定的topic消費數據
 */
public class MyConsumer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //step1 配置消費者參數,也跟kafka性能有關
        Properties props=new Properties();

        //1 連接broker
        props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

        //2 指定key和value的反序列化
        //還需要指定消費組id,否則報錯
        props.put("group.id","clyang");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //3 消費者給coordinator發送心跳的時間間隔
        props.put("heartbeat.interval.ms",1000);

        //4 coordinator認為多久沒接受到心跳,就認為超時
        props.put("session.timout.ms",10*1000);

        //5 隔多久執行一次poll
        props.put("max.poll.interval.ms",10*1000);

        //6 一次poll返回多少條record,默認是500條
        props.put("max.poll.records",1000);

        //7 不要回收socket連接
        //consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,
        //但是下次消費就要重新建立socket連接,這個建議設置為-1,不要去回收連接
        props.put("connection.max.idle.ms",-1);

        //8 設置自動提交offset
        props.put("enable.auto.commit","true");//注意kafka版本,1.0.x是這么寫

        //9 多久自動提交offset
        props.put("auto.commit.interval.ms",1000);

        //10 設置consumer重啟后,從分區最新的offset讀取
        //latest:如果分區下有提交的offset,從這個offset開始讀取,否則從最新的數據開始讀取
        //earliest:如果分區下有提交的offset,從這個offset開始讀取,否則從頭開始讀取
        //none:如果分區下有提交的offset,從這個offset開始讀取,只要有一個分區沒有提交的offset,就報錯
        props.put("auto.offset.reset","latest");

        //step2 創建一個消費者對象
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);


        //step3 訂閱主題
        consumer.subscribe(Arrays.asList("topicA"));

        //創建線程池,小池子大隊列,只有核心線程,沒有臨時線程,工作隊列是個阻塞式隊列
        ExecutorService threadPool= Executors.newFixedThreadPool(5);


        //step4 不斷消費數據,並對數據進行處理

        try {
            while(true){
                //超時時間是3s
                //新版本的kafka,這個poll方法將干很多事情
                //如監聽這個消費者跟多個topic的分區所在broker的通信,如有新的數據就會拉取過來,緩存數據、內存里更新offset
                ConsumerRecords<String, String> consumerRecords = consumer.poll(3000);
                for(ConsumerRecord<String, String> record:consumerRecords){

                    //1 寫法1
                    //如果value是json格式,將其轉換成JSON對象
                    //JSONObject json=JSONObject.parseObject(record.value());
                    //System.out.println("消費的消息是"+json.toJSONString()+",name為:"+json.getString("name"));

                    //2 寫法2 可以放到線程池去消費
                    //實現Runnable接口
                    threadPool.submit(new ConsumerTask(record));
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
            System.out.println("消費消息失敗");
            consumer.close();
        }

    }

}

/**
 * 如果實現Runnable接口,出現異常,需要在run方法進行捕獲
 */
class ConsumerTask implements Runnable{

    private ConsumerRecord<String, String> record;

    public ConsumerTask(ConsumerRecord<String, String> record) {
        this.record = record;
    }

    @Override
    public void run() {
        JSONObject json=JSONObject.parseObject(record.value());
        System.out.println("消費的消息是"+json.toJSONString()+",消息的分區為:"+record.partition()+",消息的offse為:"+record.offset());
    }
}

執行后,控制台顯示消費成功,並且從消息的offset變化可以看出,每生產一條數據,同一個分區的消息,其offset都會加1。

分區器

kafka中也可以自定義分區器,根據key的不同,實現數據寫入到指定分區的效果,下面簡單的實現一個,實現以下效果。

  • key為"china",發給0號分區

  • key為"usa",發給1號分區

  • key為"korea",發給2號分區

以下是代碼部分,類似MapReduce的自定義分區器,它需要實現一個kafka提供的接口Partitioner,實現里面的partition方法 。

package com.boe.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

/**
 * 自定義分區器
 */
public class MyPartitioner implements Partitioner {

    //初始化值
    int partitionNum;

    /**
     * 主要重寫這個方法,假設有topic country三個分區,producer將key為china、usa和korea的消息分開存儲到不同的分區,否則都放到0號分區
     * @param topic 要使用自定義分區的topic
     * @param key 消息key
     * @param keyBytes 消息key序列化字節數組
     * @param value 消息value
     * @param valueBytes 消息value序列化字節數組
     * @param cluster 集群元信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String keyStr=(String) key;
        //獲取分區信息
        List<PartitionInfo> partitionInfoList=cluster.availablePartitionsForTopic("country");
        int partitionInfoListSize=partitionInfoList.size();
        //判斷是否有三個分區
        if(partitionInfoListSize==3){
            switch (keyStr){
                case "china":
                    partitionNum=0;
                    break;
                case "usa":
                    partitionNum=1;
                    break;
                case "korea":
                    partitionNum=2;
                    break;
                default:
                    partitionNum=0;
                    break;
            }
        }

        //返回分區序號
        return partitionNum;
    }

    @Override
    public void close() {
        //資源的清理工作在這里執行
        System.out.println("-----分區結束-----");
    }

    @Override
    public void configure(Map<String, ?> configs) {
        //資源的初始化工作在這里執行
        partitionNum=0;
    }
}

實現了自定義分區器,需要在上面生產者producer的代碼中,添加分區器到props文件中,才能生效!

package com.boe.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 定義一個生產者,將消息發送出去
 */
public class MyProducer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        //step1 配置參數,這些跟優化kafka性能有關系
        Properties props=new Properties();

        //1 連接broker
        props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

        //2 key和value序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //3 acks
        // -1 代表所有處於isr列表中的follower partition都會同步寫入消息成功
        // 0 代表消息只要發送出去就行,其他不管
        // 1 代表發送消息到leader partition寫入成功就可以
        props.put("acks","-1");

        //4 重試次數
        props.put("retries",3);//大部分問題,設置這個就可以解決,生產環境可以設置多些 5-10次

        // 5 隔多久重試一次
        props.put("retry.backoff.ms",2000);

        //6 如果要提升kafka的吞吐量,可以指定壓縮類型
        props.put("compression.type","none");

        //7 緩沖區大小,默認是32M
        props.put("buffer.size",33554432);

        //8 一個批次batch的大小,默認是16k,需要根據一條消息的大小去調整
        props.put("batch.size",323840);//設置為32k

        //9 如果一個batch沒滿,達到如下的時間也會發送出去
        props.put("linger.ms",200);

        //10 一條消息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯
        props.put("max.request.size",1048576);

        //11 一條消息發送出去后,多久還沒收到響應,就認為是超時
        props.put("request.timeout.ms",5000);

        //12 使用自定義分區器
        props.put("partitioner.class","com.boe.partitioner.MyPartitioner");


        //step2 創建生產者對象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);

        //step3 使用消息的封裝形式
        //自定義分區測試用的,可以看到自定了key,以下每條消息發送兩次
        //ProducerRecord<String,String> record=new ProducerRecord<String,String>("country","china","{'name':'china','population','14'}");
        //ProducerRecord<String,String> record=new ProducerRecord<String,String>("country","usa","{'name':'usa','population','3'}");
        //ProducerRecord<String,String> record=new ProducerRecord<String,String>("country","korea","{'name':'korea','population','1'}");


        //step4 調用生產者對象的send方法發送消息,有異步和同步兩種選擇

        //1 異步發送,一般使用異步
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception==null){
                    System.out.println("消息發送到分區"+metadata.partition()+"成功");
                }else{
                    System.out.println("消息發送失敗");
                    //TODO 寫入到redis
                }
            }
        });

        Thread.sleep(10*1000);

        //2 同步發送,需要等待一條消息發送完成,才能發送下一條消息
        //RecordMetadata recordMetadata = producer.send(record).get();
        //System.out.println("發送到的分區是:"+recordMetadata.partition());

        //step5 關閉連接
        producer.close();
    }

}

為了驗證分區器的效果,先創建一個測試的topic。

# 三個分區,三個replica,topic名為country
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 3 --partitions 3 --topic country
Created topic "country".

然后上面生產者代碼執行發送消息,每發送一條使用kafka shell查看一次結果,發現數據都發送到了指定的分區。最后每個分區,都是2條消息,實現分區的效果。

# key="china"->分區0 key="usa"->分區1 key="korea"->分區2
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092 --topic country
country:2:2
country:1:2
country:0:2

以上,理解不一定正確,但學習就是一個不斷了解和糾錯的過程。

參考博文:

(1)《Apache Kafka實戰》

(2)http://kafka.apache.org/documentation.html#producerconfigs 生產者配置說明

(3)http://kafka.apache.org/documentation.html#consumerconfigs 消費者配置說明

(4)https://www.cnblogs.com/youngchaolin/p/12535704.html controller獲取元數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM