4.kafka生產者---向Kafka中寫入數據(轉)


轉:  https://www.cnblogs.com/sodawoods-blogs/p/8969513.html

(1)生產者概覽

(1)不同的應用場景對消息有不同的需求,即是否允許消息丟失重復延遲以及吞吐量的要求不同場景對Kafka生產者的API使用和配置會有直接的影響。

例子1:信用卡事務處理系統,不允許消息的重復和丟失,延遲最大500ms,對吞吐量要求較高。

例子2:保存網站的點擊信息,允許少量的消息丟失和重復,延遲可以稍高(用戶點擊鏈接可以馬上加載出頁面即可),吞吐量取決於用戶使用網站的頻度。

(2)Kafka發送消息的主要步驟

消息格式:每個消息是一個ProducerRecord對象,必須指定消息所屬的Topic和消息值Value,此外還可以指定消息所屬的Partition以及消息的Key。

1:序列化ProducerRecord

2:如果ProducerRecord中指定了Partition,則Partitioner不做任何事情;否則,Partitioner根據消息的key得到一個Partition。這是生產者就知道向哪個Topic下的哪個Partition發送這條消息。

3:消息被添加到相應的batch中,獨立的線程將這些batch發送到Broker上

4:broker收到消息會返回一個響應。如果消息成功寫入Kafka,則返回RecordMetaData對象,該對象包含了Topic信息、Patition信息、消息在Partition中的Offset信息;若失敗,返回一個錯誤

 

 

(3)Kafka的順序保證。Kafka保證同一個partition中的消息是有序的,即如果生產者按照一定的順序發送消息,broker就會按照這個順序把他們寫入partition,消費者也會按照相同的順序讀取他們。

例子:向賬戶中先存100再取出來  和  先取100再存進去是完全不同的,因此這樣的場景對順序很敏感。

如果某些場景要求消息是有序的,那么不建議把retries設置成0,。可以把max.in.flight.requests.per.connection設置成1,會嚴重影響生產者的吞吐量,但是可以保證嚴格有序。

 

(2)創建Kafka生產者

要往Kafka中寫入消息,需要先創建一個Producer,並設置一些屬性。

Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:port1, broker2:port2");
kafkaProps.put("key.serializer", "org.apache.kafka.common.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);

Kafka的生產者有如下三個必選的屬性:

(1)bootstrap.servers,指定broker的地址清單

(2)key.serializer必須是一個實現org.apache.kafka.common.serialization.Serializer接口的類,將key序列化成字節數組。注意:key.serializer必須被設置,即使消息中沒有指定key。

(3)value.serializer,將value序列化成字節數組

(3)發送消息到Kafka

(1)同步發送消息

ProducerRecord<String, String> record = new ProducerRecord<>("CustomCountry", "Precision Products", "France");//Topic Key Value
try{
    Future future = producer.send(record); 
    future.get();//不關心是否發送成功,則不需要這行。
} catch(Exception e) {
    e.printStackTrace();//連接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接拋出異常
} 

(2)異步發送消息

ProducerRecord<String, String> record = new ProducerRecord<>("CustomCountry", "Precision Products", "France");//Topic Key Value
producer.send(record, new DemoProducerCallback());//發送消息時,傳遞一個回調對象,該回調對象必須實現org.apahce.kafka.clients.producer.Callback接口

private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {//如果Kafka返回一個錯誤,onCompletion方法拋出一個non null異常。
            e.printStackTrace();//對異常進行一些處理,這里只是簡單打印出來
        }
    }
} 

(4)生產者的配置

(1)acks指定必須要有多少個partition副本收到消息,生產者才會認為消息的寫入是成功的。

      acks=0,生產者不需要等待服務器的響應,以網絡能支持的最大速度發送消息,吞吐量高,但是如果broker沒有收到消息,生產者是不知道的

      acks=1,leader partition收到消息,生產者就會收到一個來自服務器的成功響應

      acks=all,所有的partition都收到消息,生產者才會收到一個服務器的成功響應

(2)buffer.memory,設置生產者內緩存區域的大小,生產者用它緩沖要發送到服務器的消息。

(3)compression.type,默認情況下,消息發送時不會被壓縮,該參數可以設置成snappy、gzip或lz4對發送給broker的消息進行壓縮

(4)retries,生產者從服務器收到臨時性錯誤時,生產者重發消息的次數

(5)batch.size,發送到同一個partition的消息會被先存儲在batch中,該參數指定一個batch可以使用的內存大小,單位是byte。不一定需要等到batch被填滿才能發送

(6)linger.ms,生產者在發送消息前等待linger.ms,從而等待更多的消息加入到batch中。如果batch被填滿或者linger.ms達到上限,就把batch中的消息發送出去

(7)max.in.flight.requests.per.connection,生產者在收到服務器響應之前可以發送的消息個數

(5)序列化器

在創建ProducerRecord時,必須指定序列化器,推薦使用序列化框架Avro、Thrift、ProtoBuf等,不推薦自己創建序列化器。

在使用 Avro 之前,需要先定義模式(schema),模式通常使用 JSON 來編寫。

(1)創建一個類代表客戶,作為消息的value

class Custom {
    private int customID;
    private String customerName;
    
    public Custom(int customID, String customerName) {
        super();
        this.customID = customID;
        this.customerName = customerName;
    }

    public int getCustomID() {
        return customID;
    }

    public String getCustomerName() {
        return customerName;
    }
}

(2)定義schema

{  
  "namespace": "customerManagement.avro",  
   "type": "record",  
   "name": "Customer",  
   "fields":[  
       {  
          "name": "id", "type": "string"  
       },  
       {  
          "name": "name",  "type": "string"  
       },  
   ]  
}

(3)生成Avro對象發送到Kafka

Properties props = new Properties();  
      props.put("bootstrap", "loacalhost:9092");  
      props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");  
      props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");  
      props.put("schema.registry.url", schemaUrl);//schema.registry.url指向射麻的存儲位置
      String topic = "CustomerContacts";
      Producer<String, Customer> produer = new KafkaProducer<String, Customer>(props);
      
      //不斷生成消息並發送
      while (true) {
          Customer customer = CustomerGenerator.getNext();
          ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
          producer.send(record);//將customer作為消息的值發送出去,KafkaAvroSerializer會處理剩下的事情
      }

(6)Partition

ProducerRecord可以只包含Topic和消息的value,key默認是null,但是大多數應用程序會用到key,key的兩個作用:
(1)作為消息的附加信息

(2)決定消息該被寫到Topic的哪個partition,擁有相同key的消息會被寫到同一個partition。

如果key為空,kafka使用默認的partitioner,使用RoundRobin算法將消息均衡地分布在各個partition上;

如果key不為空,kafka使用自己實現的hash方法對key進行散列,相同的key被映射到相同的partition中。只有在不改變partition數量的前提下,key和partition的映射才能保持不變。

kafka也支持用戶實現自己的partitioner,用戶自己定義的paritioner需要實現Partitioner接口。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM