依賴包導入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
producer開發
producer參數說明
- metadata.broker.list:指定kafka節點列表,用於獲取metadata,不必全部指定.如metadata.broker.list=192.168.1.10:9092,192.168.1.11:9092
- partitioner.class:指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區。partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner
- compression.codec:是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。compression.codec=none
- serializer.class:指定序列化處理類,默認為kafka.serializer.DefaultEncoder,即byte[]
- compressed.topics:如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮
- request.required.acks:producer接收消息ack的時機,默認為0。0: producer不會等待broker發送ack。1: 當leader接收到消息之后發送ack 。 2: 當所有的follower都同步消息成功后發送ack。
- request.timeout.ms:在向producer發送ack之前,broker允許等待的最大時間 。如果超時,broker將會向producer發送一個error ACK.意味着上一次消息因為某種原因未能成功(比如follower未能同步成功) 。
- producer.type:同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量,也意味着消息將會在本地buffer中,並適時批量發送,但是也可能導致丟失未發送過去的消息。
- queue.buffering.max.ms :在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker,默認為5000ms,此值和batch.num.messages協同工作。
- queue.buffering.max.messages:在async模式下,producer端允許buffer的最大消息量,無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積,此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000。
- batch.num.messages:如果是異步,指定每次批量發送數據量,默認為200
- queue.enqueue.timeout.ms:當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后 ,阻塞一定時間后,隊列仍然沒有enqueue(producer仍然沒有發送出任何消息),此時producer可以繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間。-1: 無阻塞超時限制,消息不會被拋棄 ,0:立即清空隊列,消息被拋棄
- message.send.max.retries:當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數。因為broker並沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失) 有可能導致broker接收到重復的消息,默認值為3。
- topic.metadata.refresh.interval.ms:producer刷新topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況 ,因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即刷新 ,(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置額外的刷新機制,默認值600000
一個簡單的生產者例子如下:
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args){
Properties props = new Properties();
props.put("metadata.broker.list", "127.0.0.1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for(int i=0;i<10;i++){
String ip = "192.168.1." + (i+1);
String msg = "receive msg from " + ip;
KeyedMessage<String,String> data = new KeyedMessage<String, String>("simple_test",ip,msg);
producer.send(data);
}
producer.close();
}
}
指定關鍵字key,發送消息到指定partitions
如果需要實現自定義partitions消息發送,需要實現Partitioner接口
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class MyPartitioner implements Partitioner {
public MyPartitioner(VerifiableProperties props){
}
/**
* 返回分區索引編號
* @param key
* @param numPartitions topic中的分區總數
* @return
*/
public int partition(Object key, int numPartitions) {
String partKey = (String) key;
if("test1".equals(partKey)){
return 1;
}
return 0;
}
}
consumer開發
consumer參數說明
- zookeeper.connect:zookeeper連接服務器地址
- zookeeper.session.timeout.ms:zookeeper的session過期時間,默認5000ms,用於檢測消費者是否掛掉
- zookeeper.sync.time.ms:當consumer reblance時,重試失敗時時間間隔
- group.id:指定消費組
- auto.commit.enable:當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息 ,注意offset信息並不是每消費一次消息就向zk提交一次,而是現在本地保存(內存),並定期提交,默認為true
- auto.commit.interval.ms:自動更新時間。默認60 * 1000
- conusmer.id:當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤消息消費情況,便於觀察
- client.id:消費者客戶端編號,用於區分不同客戶端,默認客戶端程序自動產生
- queued.max.message.chunks:最大取多少塊緩存到消費者(默認10)
- rebalance.max.retries:當有新的consumer加入到group時,將會reblance,此后將會有partitions的消費端遷移到新 的consumer上,如果一個consumer獲得了某個partition的消費權限,那么它將會向zk注冊 "Partition Owner registry"節點信息,但是有可能此時舊的consumer尚沒有釋放此節點, 此值用於控制,注冊節點的重試次數。
- fetch.min.bytes:獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk每次feth將得到多條消息,此值為總大小,提升此值,將會消耗更多的consumer端內存
- fetch.wait.max.ms: 當消息的尺寸不足時,server阻塞的時間,如果超時,消息將立即發送給consumer
- auto.offset.reset:如果zookeeper沒有offset值或offset值超出范圍。那么就給個初始的offset。有smallest、largest、 anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
- derializer.class:指定序列化處理類,默認為kafka.serializer.DefaultDecoder,即byte[]
多線程並行消費topic
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class SimpleConsumer implements Runnable{
private KafkaStream stream;
private int threadNum;
public SimpleConsumer(KafkaStream stream, int threadNum) {
this.stream = stream;
this.threadNum = threadNum;
}
public void run() {
ConsumerIterator<byte[],byte[]> it = this.stream.iterator();
while (it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
}
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimpleConsumerTest {
private ConsumerConnector connector;
private String topic;
private ExecutorService executorService;
public SimpleConsumerTest(String a_zookeeper, String a_groupId, String a_topic) {
this.connector = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
this.topic = a_topic;
}
private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect",a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "60000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void shutdown(){
if(connector != null){
connector.shutdown();
}
if(executorService != null){
executorService.shutdown();
}
}
public void start(int threadNum){
Map<String,Integer> topicCount = new HashMap<String,Integer>();
topicCount.put(topic,new Integer(threadNum));
Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = connector.createMessageStreams(topicCount);
List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic);
executorService = Executors.newFixedThreadPool(threadNum);
int thread = 1;
for(KafkaStream<byte[],byte[]> stream : streams){
executorService.submit(new SimpleConsumer(stream,thread++));
}
}
public static void main(String[] args) throws InterruptedException {
String zks = "127.0.0.1:2181";
String gruopId = "fdf";
String topic = "tets";
SimpleConsumerTest simpleConsumerTest = new SimpleConsumerTest(zks,gruopId,topic);
simpleConsumerTest.start(10);
Thread.sleep(10000);
simpleConsumerTest.shutdown();
}
}
