參考,
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
http://kafka.apache.org/08/configuration.html , 0.8版本,關於producer,consumer,broker所有的配置
因為Producer相對於consumer比較簡單,直接看代碼,需要注意的點
1. 配置參數,詳細參考上面鏈接
1.1 metadata.broker.list, 不同於0.7,不需要給出zk的地址,而是給出一些broker地址,不用全部,這里建議給兩個防止一個不可用
Kafka會自己找到相應topic,partition的leader broker
1.2 serializer.class,需要給出message的序列化的encoder,這里使用的是簡單的StringEncoder
並且對於key還可以單獨的設定,"key.serializer.class"
注意,除非明確知道message編碼,否則不要直接使用StringEncoder,
因為源碼中的邏輯是如果沒有在初始化時指定編碼會默認按UTF8轉碼,會導致亂碼
所以不明確的時候,不要指定serializer.class,默認的encoder邏輯是直接將byte[]放入broker,不會做轉碼
1.3 partitioner.class,可以不設置,默認就是random partition,當然這里可以自定義,如何根據key來選擇partition
1.4 request.required.acks, 是否要求broker給出ack,如果不設置默認是'fire and forget', 會丟數據
默認為0,即和0.7一樣,發完不會管是否成功,lowest latency but the weakest durability
1, 等待leader replica的ack,否則重發,折中的方案,當leader在同步數據前dead,會丟數據
-1,等待all in-sync replicas的ack,只要有一個replica活着,就不會丟數據
1.5 producer.type,
sync,單條發送
async,buffer一堆請求后,再一起發送
如果不是對丟數據非常敏感,請設為async,因為對throughput幫助很大,但是當client crash時,會丟數據
1.6 compression.codec
支持"none", "gzip" and "snappy"
可以通過,compressed.topics,來指定壓縮的topic
當producer.type選擇async的時候,需要關注如下配置
queue.buffering.max.ms (5000), 最大buffer數據的時間,默認是5秒
batch.num.messages (200), batch發送的數目,默認是200,producer會等待buffer的messages數目達到200或時間超過5秒,才發送數據
queue.buffering.max.messages (10000), 最多可以buffer的message數目,超過要么producer block或把數據丟掉
queue.enqueue.timeout.ms (-1), 默認是-1,即達到buffer最大meessage數目時,producer會block
設為0,達到buffer最大meessage數目時會丟掉數據
2. Producer發送的是kv數據
無論Producer或KeyedMessage都是<String, String>的泛型,這里是指key和value的類型
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "host1:9092, host2:9092 "); // props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); //可以不設置 props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); //指定topic,key,value producer.send(data); } producer.close(); } }
對於自定義partitioner也很簡單,
對於partition,兩個參數,key和partitions的數目
所要完成的邏輯就是,如果根據key在partitions中挑選一個合適的partition
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(String key, int a_numPartitions) { int partition = 0; int offset = key.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions; } return partition; } }