kafka 冪等生產者及事務(kafka0.11之后版本新特性)


1. 冪等性設計
1.1 引入目的
生產者重復生產消息。生產者進行retry會產生重試時,會重復產生消息。有了冪等性之后,在進行retry重試時,只會生成一個消息。

1.2 冪等性實現
1.2.1 PID 和 Sequence Number
為了實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。

PID。每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對用戶是不可見的。
Sequence Numbler。(對於每個PID,該Producer發送數據的每個<Topic, Partition>都對應一個從0開始單調遞增的Sequence Number
Broker端在緩存中保存了這seq number,對於接收的每條消息,如果其序號比Broker緩存中序號大於1則接受它,否則將其丟棄。

這樣就可以實現了消息重復提交了。但是,只能保證單個Producer對於同一個<Topic, Partition>的Exactly Once語義。不能保證同一個Producer一個topic不同的partion冪等。

標准實現

 

 

發生重試時

 

 

實現冪等之后

 

 

 發生重試時

 

 

1.2.2  生成PID的流程

//在執行創建事務時
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//會創建一個Sender,並啟動線程,執行如下run方法
Sender{
    void run(long now) {
        if (transactionManager != null) {
            try {
                 ........
                if (!transactionManager.isTransactional()) {
                    // 為idempotent producer生成一個producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {

1.3.演示實例
enable.idempotence,需要設置為ture,此時就會默認把acks設置為all,所以不需要再設置acks屬性了。

private Producer buildIdempotProducer(){
         // create instance for properties to access producer configs
        Properties props = new Properties(); 
        // bootstrap.servers是Kafka集群的IP地址。多個時,使用逗號隔開
        props.put("bootstrap.servers", "localhost:9092"); 
        props.put("enable.idempotence",true); 
        //If the request fails, the producer can automatically retry,
        props.put("retries", 3); 
        //Reduce the no of requests less than 0
        props.put("linger.ms", 1); 
        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        props.put("buffer.memory", 33554432); 
        // Kafka消息是以鍵值對的形式發送,需要設置key和value類型序列化器
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        Producer<String, String> producer = new KafkaProducer<String, String>(props);        
        return producer;
}
//發送消息    
public void produceIdempotMessage(String topic, String message) {
        // 創建Producer
        Producer producer = buildIdempotProducer();
        // 發送消息
        producer.send(new ProducerRecord<String, String>(topic, message));
        producer.flush();
}

此時,因為我們並沒有配置transaction.id屬性,所以不能使用事務相關API,如:producer.initTransactions();

否則會出現如下錯誤:

Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.

    at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)

    at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)

    at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)

2.事務
2.1 事務屬性
事務屬性是2017年Kafka 0.11.0.0引入的新特性。類似於數據庫事務,只是這里的數據源是Kafka,kafka事務屬性是指一系列的生產者生產消息和消費者提交偏移量的操作在一個事務,或者說是是一個原子操作),同時成功或者失敗。

注意:在理解消息的事務時,一直處於一個錯誤理解就是如下代碼中,把操作db的業務邏輯跟操作消息當成是一個事務。

其實這個是有問題的,操作DB數據庫的數據源是DB,消息數據源是kfaka,這是完全不同兩個數據,一種數據源(如mysql,kafka)對應一個事務。

所以它們是兩個獨立的事務:kafka事務指kafka一系列 生產、消費消息等操作組成一個原子操作;db事務是指操作數據庫的一系列增刪改操作組成一個原子操作。

void  kakfa_in_tranction(){
  // 1.kafa的操作:讀取消息或者生產消息
 kafkaOperation(); 
   // 2.db操作
  dbOperation()
 
}

2.2 引入目的
在事務屬性之前先引入了生產者冪等性,它的作用為:

生產者多次發送消息可以封裝成一個原子操作,要么都成功,要么失敗
consumer-transform-producer模式下,因為消費者提交偏移量出現問題,導致在重復消費消息時,生產者重復生產消息。

需要將這個模式下消費者提交偏移量操作和生成者一系列生成消息的操作封裝成一個原子操作。
消費者提交偏移量導致重復消費消息的場景:消費者在消費消息完成提交偏移量o2之前掛掉了(假設它最近提交的偏移量是o1),此時執行再均衡時,其它消費者會重復消費消息(o1到o2之間的消息)。

2.3 操作的API

  //producer提供的事務方法
   /**
     * 初始化事務。需要注意的有:
     * 1、前提
     * 需要保證transation.id屬性被配置。
     * 2、這個方法執行邏輯是:
     *   (1)Ensures any transactions initiated by previous instances of the producer with the same
     *      transactional.id are completed. If the previous instance had failed with a transaction in
     *      progress, it will be aborted. If the last transaction had begun completion,
     *      but not yet finished, this method awaits its completion.
     *    (2)Gets the internal producer id and epoch, used in all future transactional
     *      messages issued by the producer.
     *
     */
    public void initTransactions();
 
    /**
     * 開啟事務
     */
    public void beginTransaction() throws ProducerFencedException ;
 
    /**
     * 為消費者提供的在事務內提交偏移量的操作
     */
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                         String consumerGroupId) throws ProducerFencedException ;
 
    /**
     * 提交事務
     */
    public void commitTransaction() throws ProducerFencedException;
 
    /**
     * 放棄事務,類似回滾事務的操作
     */
    public void abortTransaction() throws ProducerFencedException ;

2.4 演示實例
在一個原子操作中,根據包含的操作類型,可以分為三種情況:

a) 只有Producer生產消息;
b) 消費消息和生產消息並存,這個是事務場景中最常用的情況,就是我們常說的“consume-transform-produce ”模式
c) 只有consumer消費消息,
前兩種情況是事務引入的場景,最后一種情況沒有使用價值(跟使用手動提交效果一樣)。

2.4.1 屬性配置說明
使用kafka的事務api時的一些注意事項:

a) 需要消費者的自動模式設置為false,並且不能再手動的執行consumer#commitSync或者consumer#commitAsyc
b) 生產者配置transaction.id屬性
c) 生產者不需要再配置enable.idempotence,因為如果配置了transaction.id,則此時enable.idempotence會被設置為true
d) 消費者需要配置Isolation.level。在consume-trnasform-produce模式下使用事務時,必須設置為READ_COMMITTED。

2.4.2 只有寫

    /**
     * 在一個事務只有生產消息操作
     */
    public void onlyProduceInTransaction() {
        Producer producer = buildProducer(); 
        // 1.初始化事務
        producer.initTransactions(); 
        // 2.開啟事務
        producer.beginTransaction();
 
        try {
            // 3.kafka寫操作集合
            // 3.1 do業務邏輯 
            // 3.2 發送消息
            producer.send(new ProducerRecord<String, String>("test", "transaction-data-1")); 
            producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
            // 3.3 do其他業務邏輯,還可以發送其他topic的消息。
 
            // 4.事務提交
            producer.commitTransaction(); 
 
        } catch (Exception e) {
            // 5.放棄事務
            producer.abortTransaction();
        } 
    }
    /**
     * 需要:
     * 1、設置transactional.id
     * 2、設置enable.idempotence
     * @return
     */
    private Producer buildProducer() { 
        // create instance for properties to access producer configs
        Properties props = new Properties(); 
        // bootstrap.servers是Kafka集群的IP地址。多個時,使用逗號隔開
        props.put("bootstrap.servers", "localhost:9092"); 
        // 設置事務id
        props.put("transactional.id", "first-transactional"); 
        // 設置冪等性
        props.put("enable.idempotence",true); 
        //Set acknowledgements for producer requests.
        props.put("acks", "all"); 
        //If the request fails, the producer can automatically retry,
        props.put("retries", 1); 
        //Specify buffer size in config,這里不進行設置這個屬性,如果設置了,還需要執行producer.flush()來把緩存中消息發送出去
        //props.put("batch.size", 16384); 
        //Reduce the no of requests less than 0
        props.put("linger.ms", 1); 
        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        props.put("buffer.memory", 33554432); 
        // Kafka消息是以鍵值對的形式發送,需要設置key和value類型序列化器
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        Producer<String, String> producer = new KafkaProducer<String, String>(props); 
        return producer;
    }

2.4.3 消費-生產並存

    /** 
     * 在一個事務內,即有生產消息又有消費消息,即常說的Consume-tansform-produce模式
     */
    public void consumeTransferProduce() {
        // 1.構建上產者
        Producer producer = buildProducer();
        // 2.初始化事務(生成productId),對於一個生產者,只能執行一次初始化事務操作
        producer.initTransactions();
        // 3.構建消費者和訂閱主題
        Consumer consumer = buildConsumer();
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            // 4.開啟事務
            producer.beginTransaction();
            // 5.1 接受消息
            ConsumerRecords<String, String> records = consumer.poll(500);
            try {
                // 5.2 do業務邏輯;
                System.out.println("customer Message---");
                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
                for (ConsumerRecord<String, String> record : records) {
                    // 5.2.1 讀取消息,並處理消息。print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s\n",
                            record.offset(), record.key(), record.value());
 
                    // 5.2.2 記錄提交的偏移量
                    commits.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset()));
 
                    // 6.生產新的消息。比如外賣訂單狀態的消息,如果訂單成功,則需要發送跟商家結轉消息或者派送員的提成消息
                    producer.send(new ProducerRecord<String, String>("test", "data2"));
                }
 
                // 7.提交偏移量
                producer.sendOffsetsToTransaction(commits, "group0323");
 
                // 8.事務提交
                producer.commitTransaction();
 
            } catch (Exception e) {
                // 7.放棄事務
                producer.abortTransaction();
            }
        }
    }
    /**
     * 需要:
     * 1、關閉自動提交 enable.auto.commit
     * 2、isolation.level為read_committed
     * 而且在代碼里面也不能使用手動提交commitSync( )或者commitAsync( )
     * @return
     */
    public Consumer buildConsumer() {
        Properties props = new Properties();
        // bootstrap.servers是Kafka集群的IP地址。多個時,使用逗號隔開
        props.put("bootstrap.servers", "localhost:9092");
        // 消費者群組
        props.put("group.id", "group0323");
        // 設置隔離級別
        props.put("isolation.level","read_committed");
        // 關閉自動提交
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer
                <String, String>(props);
        return consumer;
    }


2.4.4 只有讀

    /**
     * 在一個事務只有消費消息操作
     * 這種操作其實沒有什么意義,跟使用手動提交效果一樣,無法保證消費消息操作和提交偏移量操作在一個事務。
     */
    public void onlyConsumeInTransaction() {
        Producer producer = buildProducer();
        // 1.初始化事務
        producer.initTransactions();
        // 2.開啟事務
        producer.beginTransaction();
        // 3.kafka讀消息的操作集合
        Consumer consumer = buildConsumer();
        while (true) {
            // 3.1 接受消息
            ConsumerRecords<String, String> records = consumer.poll(500);
 
            try {
                // 3.2 do業務邏輯;
                System.out.println("customer Message---");
                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
                for (ConsumerRecord<String, String> record : records) {
                    // 3.2.1 處理消息 print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s\n",
                            record.offset(), record.key(), record.value());
 
                    // 3.2.2 記錄提交偏移量
                    commits.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset()));
                }
 
                // 4.提交偏移量
                producer.sendOffsetsToTransaction(commits, "group0323");
 
                // 5.事務提交
                producer.commitTransaction();
 
            } catch (Exception e) {
                // 6.放棄事務
                producer.abortTransaction();
            }
        }
 
    }

3 冪等性和事務性的關系
3.1 兩者關系
事務屬性實現前提是冪等性,即在配置事務屬性transaction id時,必須還得配置冪等性;但是冪等性是可以獨立使用的,不需要依賴事務屬性。

冪等性引入了Porducer ID
事務屬性引入了Transaction Id屬性。
使用場景

enable.idempotence = true,transactional.id不設置:只支持冪等性。
enable.idempotence = true,transactional.id設置:支持事務屬性和冪等性
enable.idempotence = false,transactional.id不設置:沒有事務屬性和冪等性的kafka
enable.idempotence = false,transactional.id設置:無法獲取到PID,此時會報錯

3.2 tranaction id 、producerId 和 epoch
一個app有一個tid,同一個應用的不同實例PID是一樣的,只是epoch的值不同。如:

同一份代碼運行兩個實例,分步執行如下:在實例1沒有進行提交事務前,開始執行實例2的初始化事務

step1  實例1-初始化事務。的打印出對應productId和epoch,信息如下:

[2018-04-21 20:56:23,106] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 123 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)

step2 實例1-發送消息。

step3 實例2-初始化事務。初始化事務時的打印出對應productId和epoch,信息如下:

18-04-21 20:56:48,373] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 124 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)

step4  實例1-提交事務,此時報錯

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.

我今天使用Flink-connector-kafka-0.11時,遇到這個現象

 

step5 實例2-提交事務

為了避免這種錯誤,同一個事務ID,只有保證如下順序epch小producer執行init-transaction和committransaction,然后epoch較大的procuder才能開始執行init-transaction和commit-transaction,如下順序:

有了transactionId后,Kafka可保證:

跨Session的數據冪等發送。當具有相同Transaction ID的新的Producer實例被創建且工作時,舊的且擁有相同Transaction ID的Producer將不再工作【上面的實例可以驗證】。

kafka保證了關聯同一個事務的所有producer(一個應用有多個實例)必須按照順序初始化事務、和提交事務,否則就會有問題,這保證了同一事務ID中消息是有序的(不同實例得按順序創建事務和提交事務)。

3.3 事務最佳實踐-單實例的事務性
通過上面實例中可以看到kafka是跨Session的數據冪等發送,即如果應用部署多個實例時常會遇到上面的問題“org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.”,必須保證這些實例生成者的提交事務順序和創建順序保持一致才可以,否則就無法成功。其實,在實踐中,我們更多的是如何實現對應用單實例的事務性。可以通過spring-kafaka實現思路來學習,即每次創建生成者都設置一個不同的transactionId的值,如下代碼:

在spring-kafka中,對於一個線程創建一個producer,事務提交之后,還會關閉這個producer並清除,后續同一個線程或者新的線程重新執行事務時,此時就會重新創建producer。

public class ProducerFactoryUtils{
/**
     * Obtain a Producer that is synchronized with the current transaction, if any.
     * @param producerFactory the ConnectionFactory to obtain a Channel for
     * @param <K> the key type.
     * @param <V> the value type.
     * @return the resource holder.
     */
    public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
            final ProducerFactory<K, V> producerFactory) {
 
        Assert.notNull(producerFactory, "ProducerFactory must not be null");
 
        // 1.對於每一個線程會生成一個唯一key,然后根據key去查找resourceHolder
        @SuppressWarnings("unchecked")
        KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
                .getResource(producerFactory);
        if (resourceHolder == null) {
            // 2.創建一個消費者
            Producer<K, V> producer = producerFactory.createProducer();
            // 3.開啟事務
            producer.beginTransaction();
            resourceHolder = new KafkaResourceHolder<K, V>(producer);
            bindResourceToTransaction(resourceHolder, producerFactory);
        }
        return resourceHolder;
    }
}
//創建消費者代碼
public class DefaultKafkaProducerFactory{
    protected Producer<K, V> createTransactionalProducer() {
        Producer<K, V> producer = this.cache.poll();
        if (producer == null) {
            Map<String, Object> configs = new HashMap<>(this.configs);
            // 對於每一次生成producer時,都設置一個不同的transactionId
            configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                    this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
            producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
            // 1.初始化話事務。
            producer.initTransactions();
            return new CloseSafeProducer<K, V>(producer, this.cache);
        }
        else {
            return producer;
        }
    }
}

3.4 Consume-transform-Produce的流程

 

流程1 :查找Tranaction Corordinator。

Producer向任意一個brokers發送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址。

流程2:初始化事務 initTransaction

Producer發送InitpidRequest給事務協調器,獲取一個Pid。InitpidRequest的處理過程是同步阻塞的,一旦該調用正確返回,Producer就可以開始新的事務。

TranactionalId通過InitpidRequest發送給Tranciton Corordinator,然后在Tranaciton Log中記錄這<TranacionalId,pid>的映射關系。

除了返回PID之外,還具有如下功能:

對PID對應的epoch進行遞增,這樣可以保證同一個app的不同實例對應的PID是一樣的,但是epoch是不同的。
回滾之前的Producer未完成的事務(如果有)。
流程3: 開始事務beginTransaction

執行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀態為開始狀態。

注意:這個操作並沒有通知Transaction Coordinator。

流程4: Consume-transform-produce loop

流程4.0: 通過Consumtor消費消息,處理業務邏輯

流程4.1: producer向TransactionCordinantro發送AddPartitionsToTxnRequest

在producer執行send操作時,如果是第一次給<topic,partion>發送數據,此時會向Trasaction Corrdinator發送一個AddPartitionsToTxnRequest請求,

Transaction Corrdinator會在transaction log中記錄下tranasactionId和<topic,partion>一個映射關系,並將狀態改為begin。AddPartionsToTxnRequest的數據結構如下:

      AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]
      TransactionalId => string
      PID => int64
      Epoch => int16
      Topic => string
      Partition => int32

流程4.2:  producer#send發送 ProduceRequst,生產者發送數據,雖然沒有還沒有執行commit或者absrot,但是此時消息已經保存到kafka上,

可以參考如下圖斷點位置處,此時已經可以查看到消息了,而且即使后面執行abort,消息也不會刪除,只是更改狀態字段標識消息為abort狀態。

 

流程4.3: AddOffsetCommitsToTxnRequest,Producer通過KafkaProducer.sendOffsetsToTransaction 向事務協調器器發送一個AddOffesetCommitsToTxnRequests:

    AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID
    TransactionalId => string
     PID => int64
     Epoch => int16
     ConsumerGroupID => string

在執行事務提交時,可以根據ConsumerGroupID來推斷_customer_offsets主題中相應的TopicPartions信息。這樣在

流程4.4: TxnOffsetCommitRequest,Producer通過KafkaProducer.sendOffsetsToTransaction還會向消費者協調器Cosumer Corrdinator發送一個TxnOffsetCommitRequest,在主題_consumer_offsets中保存消費者的偏移量信息。

TxnOffsetCommitRequest   => ConsumerGroupID
                            PID
                            Epoch
                            RetentionTime
                            OffsetAndMetadata
  ConsumerGroupID => string
  PID => int64
  Epoch => int32
  RetentionTime => int64
  OffsetAndMetadata => [TopicName [Partition Offset Metadata]]
    TopicName => string
    Partition => int32
    Offset => int64
    Metadata => string

流程5: 事務提交和事務終結(放棄事務),通過生產者的commitTransaction或abortTransaction方法來提交事務和終結事務,這兩個操作都會發送一個EndTxnRequest給Transaction Coordinator。

流程5.1:EndTxnRequest。Producer發送一個EndTxnRequest給Transaction Coordinator,然后執行如下操作:

Transaction Coordinator會把PREPARE_COMMIT or PREPARE_ABORT 消息寫入到transaction log中記錄
執行流程5.2
執行流程5.3
流程5.2:WriteTxnMarkerRequest

WriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker [Topic [Partition]]]
 CoordinatorEpoch => int32
 PID => int64
 Epoch => int16
 Marker => boolean (false(0) means ABORT, true(1) means COMMIT)
 Topic => string
 Partition => int32

對於Producer生產的消息。Tranaction Coordinator會發送WriteTxnMarkerRequest給當前事務涉及到每個<topic,partion>的leader,leader收到請求后,會寫入一個COMMIT(PID) 或者 ABORT(PID)的控制信息到data log中
對於消費者偏移量信息,如果在這個事務里面包含_consumer-offsets主題。Tranaction Coordinator會發送WriteTxnMarkerRequest給Transaction Coordinartor,Transaction Coordinartor收到請求后,

會寫入一個COMMIT(PID) 或者 ABORT(PID)的控制信息到 data log中
流程5.3:Transaction Coordinator會將最終的COMPLETE_COMMIT或COMPLETE_ABORT消息寫入Transaction Log中以標明該事務結束。

只會保留這個事務對應的PID和timstamp。然后把當前事務其他相關消息刪除掉,包括PID和tranactionId的映射關系。

3.4.1 文件類型和查看命令

 kafka文件主要包括broker的data(主題:test)、事務協調器對應的transaction_log(主題:__tranaction_state)、偏移量信息(主題:_consumer_offsets)三種類型。

如下圖

這三種文件類型其實都是topic的分區,所以對於每一個目錄都包含*.log、*.index、*.timeindex、*.txnindex文件(僅這個文件是為了實現事務屬性引入的)。

查看文件內容:

bin/kafka-run-class.sh kafka.tools.DumpLogSegments   –files /kafka-logs/firtstopic-0/00000000000000000002.log   –print-data-log

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM