Kafka Producer接口


參考,

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

http://kafka.apache.org/08/configuration.html , 0.8版本,關於producer,consumer,broker所有的配置

 

因為Producer相對於consumer比較簡單,直接看代碼,需要注意的點

1. 配置參數,詳細參考上面鏈接
    1.1 metadata.broker.list, 不同於0.7,不需要給出zk的地址,而是給出一些broker地址,不用全部,這里建議給兩個防止一個不可用
          Kafka會自己找到相應topic,partition的leader broker
    1.2 serializer.class,需要給出message的序列化的encoder,這里使用的是簡單的StringEncoder
          並且對於key還可以單獨的設定,"key.serializer.class" 
          注意,除非明確知道message編碼,否則不要直接使用StringEncoder,
          因為源碼中的邏輯是如果沒有在初始化時指定編碼會默認按UTF8轉碼,會導致亂碼
          所以不明確的時候,不要指定serializer.class,默認的encoder邏輯是直接將byte[]放入broker,不會做轉碼
    1.3 partitioner.class,可以不設置,默認就是random partition,當然這里可以自定義,如何根據key來選擇partition
    1.4 request.required.acks, 是否要求broker給出ack,如果不設置默認是'fire and forget', 會丟數據
          默認為0,即和0.7一樣,發完不會管是否成功,lowest latency but the weakest durability
          1, 等待leader replica的ack,否則重發,折中的方案,當leader在同步數據前dead,會丟數據
          -1,等待all in-sync replicas的ack,只要有一個replica活着,就不會丟數據
    1.5 producer.type, 
         sync,單條發送
         async,buffer一堆請求后,再一起發送
         如果不是對丟數據非常敏感,請設為async,因為對throughput幫助很大,但是當client crash時,會丟數據
    1.6 compression.codec
         支持"none", "gzip" and "snappy"
         可以通過,compressed.topics,來指定壓縮的topic

    當producer.type選擇async的時候,需要關注如下配置
    queue.buffering.max.ms (5000), 最大buffer數據的時間,默認是5秒
    batch.num.messages (200), batch發送的數目,默認是200,producer會等待buffer的messages數目達到200或時間超過5秒,才發送數據
    queue.buffering.max.messages (10000), 最多可以buffer的message數目,超過要么producer block或把數據丟掉
    queue.enqueue.timeout.ms (-1), 默認是-1,即達到buffer最大meessage數目時,producer會block
                                                       設為0,達到buffer最大meessage數目時會丟掉數據

 

2. Producer發送的是kv數據
無論Producer或KeyedMessage都是<String, String>的泛型,這里是指key和value的類型

import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "host1:9092, host2:9092 "); //
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner"); //可以不設置
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); //指定topic,key,value
               producer.send(data);
        }
        producer.close();
    }
}

 

對於自定義partitioner也很簡單,

對於partition,兩個參數,key和partitions的數目
所要完成的邏輯就是,如果根據key在partitions中挑選一個合適的partition

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM