引入maven包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
一、同步發送消息
1、創建topic:
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --partitions 3 --replication-factor 1 --topic test-syn
2、代碼
package com.example.demo.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SynProducer {
private static Properties getProps(){
Properties props = new Properties();
props.put("bootstrap.servers", "47.52.199.51:9092");
props.put("acks", "all"); // 發送所有ISR
props.put("retries", 2); // 重試次數
props.put("batch.size", 16384); // 批量發送大小
props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置
props.put("linger.ms", 1000); // 發送頻率,滿足任務一個條件發送
props.put("client.id", "producer-syn-1"); // 發送端id,便於統計
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static void main(String[] args) {
KafkaProducer<String, String> producer = new KafkaProducer<>(getProps());
for(int i=0; i< 1000; i++){
// 三個參數,topic,key:用戶分配partition,value:發送的值
ProducerRecord<String, String> record = new ProducerRecord<>("test-syn", "topic_"+i,"test-syn-"+i);
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = metadataFuture.get();
System.out.println("發送成功!");
System.out.println("topic:"+recordMetadata.topic());
System.out.println("partition:"+recordMetadata.partition());
System.out.println("offset:"+recordMetadata.offset());
} catch (InterruptedException|ExecutionException e) {
System.out.println("發送失敗!");
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
3、測試
1、創建consumer
./bin/kafka-console-consumer.sh --bootstrap-server 47.52.199.52:9092 --topic test-syn --group test-1 --from-beginning
2、運行程序
二、異步發送消息
package com.example.demo.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ASynProducer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("acks", "all"); // 發送所有ISR props.put("retries", 2); // 重試次數 props.put("batch.size", 16384); // 批量發送大小 props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置 props.put("linger.ms", 1000); // 發送頻率,滿足任務一個條件發送 props.put("client.id", "producer-asyn-1"); // 發送端id,便於統計 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(getProps()); for(int i=0; i< 1000; i++){ ProducerRecord<String, String> record = new ProducerRecord<>("test-asyn", "topic_"+i,"test-asyn-"+i); // 相比同步發送,異步發送需要傳入callback,發送結果回來回調callback方法 producer.send(record, (recordMetadata, e) -> { if(e != null){ System.out.println("發送失敗!"); e.printStackTrace(); }else { System.out.println("發送成功!"); System.out.println("topic:"+recordMetadata.topic()); System.out.println("partition:"+recordMetadata.partition()); System.out.println("offset:"+recordMetadata.offset()); } }); } producer.flush(); producer.close(); } }
三、及時發送消息
相比前兩種方式,該方式不關心結果,只管發送,所以比較快。
package com.example.demo.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class FireProducer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("acks", "all"); // 發送所有ISR props.put("retries", 2); // 重試次數 props.put("batch.size", 16384); // 批量發送大小 props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置 props.put("linger.ms", 1000); // 發送頻率,滿足任務一個條件發送 props.put("client.id", "producer-syn-1"); // 發送端id,便於統計 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(getProps()); for(int i=0; i< 1000; i++){ ProducerRecord<String, String> record = new ProducerRecord<>("test-syn", "topic_"+i,"test-syn-"+i); // 不關心發送結果 producer.send(record); } producer.flush(); producer.close(); } }
四、自動提交offset
package com.example.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @author 王柱星 * @version 1.0 * @title * @time 2018年12月11日 * @since 1.0 */ public class AutoCommitConsumer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.52:9092"); props.put("group.id", "test_3"); props.put("session.timeout.ms", 30000); // 如果其超時,將會可能觸發rebalance並認為已經死去,重新選舉Leader props.put("enable.auto.commit", "true"); // 開啟自動提交 props.put("auto.commit.interval.ms", "1000"); // 自動提交時間 props.put("auto.offset.reset","earliest"); // 從最早的offset開始拉取,latest:從最近的offset開始消費 props.put("client.id", "producer-syn-1"); // 發送端id,便於統計 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); // 拉取任務超時時間 for(;;){ ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } } } } }
五、同步提交
同步提交,提交后broke會阻塞等結果返回,在成功提交或碰到無怯恢復的錯誤之前,commitSync()會一直重試
package com.example.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @author 王柱星 * @version 1.0 * @title * @time 2018年12月11日 * @since 1.0 */ public class CommitConsumer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("group.id", "test_3"); props.put("session.timeout.ms", 30000); // 如果其超時,將會可能觸發rebalance並認為已經死去,重新選舉Leader props.put("enable.auto.commit", "false"); // 開啟自動提交 props.put("auto.commit.interval.ms", "1000"); // 自動提交時間 props.put("auto.offset.reset","earliest"); // 從最早的offset開始拉取,latest:從最近的offset開始消費 props.put("client.id", "consumer-2"); // 發送端id,便於統計 props.put("max.poll.records","1000"); // 每次批量拉取條數 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } // 當前批次offset consumer.commitSync(); } } } }
六、異步提交
commitAsync()方法提交最后一個偏移量。在成功提交或碰到無怯恢復的錯誤之前,commitSync()會一直重試,但是commitAsync()不會,這也是commitAsync()不好的一個地方。它之所以不進行重試,是因為在它收到服務器響應的時候, 可能有一個更大的偏移量已經提交成功。假設我們發出一個請求用於提交偏移量2000,這個時候發生了短暫的通信問題,服務器收不到請求,自然也不會作出任何響應。與此同時,我們處理了另外一批消息,並成功提交了偏移量3000。如果commitAsync()重新嘗試提交偏移量2000 ,它有可能在偏移量3000之后提交成功。系統會記錄最后提交的偏移量,這個時候如果發生再均衡,就會出現重復消息,會從2000開始。
public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } // 當前批次offset consumer.commitAsync(); } } }
異步提交支持回調方法,可以記錄提交錯誤的值
public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } // 當前批次offset consumer.commitAsync((map, e) -> { if(e != null){ System.out.println("提交失敗:"+map.get("")); } }); } } }
七、同步和異步組合提交
package com.example.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @author 王柱星 * @version 1.0 * @title * @time 2018年12月11日 * @since 1.0 */ public class CommitAsynCallbackConsumer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("group.id", "test_4"); props.put("session.timeout.ms", 30000); // 如果其超時,將會可能觸發rebalance並認為已經死去,重新選舉Leader props.put("enable.auto.commit", "false"); // 開啟自動提交 props.put("auto.commit.interval.ms", "1000"); // 自動提交時間 props.put("auto.offset.reset","earliest"); // 從最早的offset開始拉取,latest:從最近的offset開始消費 props.put("client.id", "consumer-3"); // 發送端id,便於統計 props.put("max.poll.records","200"); // 每次批量拉取條數 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } // 當前批次offset consumer.commitAsync((map, e) -> { if(e != null){ System.out.println("提交失敗:"+map.get("")); } }); } } } }
八、提交到特定的partition、偏移量
public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); // 指定topic 、partition TopicPartition topicPartition = new TopicPartition("producer-syn",0); // 指定offset OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100); // 可以提交多個topic offsets.put(topicPartition, offsetAndMetadata); // 提交offset consumer.commitSync(offsets); } } }
為提高消費吞吐量,可使用線程處理,消費者只負責接收消息,由線程池處理。
九、從特定偏移量處開始消費
前面都是consumer.poll()之后讀取該批次的消息,kafka還提供了從分區的開始或者末尾讀消息的功能:
seekToEnd(Collection<TopicPartition> partitions)
seekToBeginning(Collection<TopicPartition> partitions)
另外kafka還提供了從指定偏移量處讀取消息,可以通過seek()方法來處理:
// 只有先pull一次,seek才會生效,啟動后第一次拉取不返回數據
seek(TopicPartition partition, long offset)
示例:
public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { TopicPartition topicPartition = new TopicPartition("producer-syn",0); List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } // 只有先pull一次,seek才會生效,啟動后第一次拉取不返回數據 consumer.seek(topicPartition,100L); // new ArrayList<>(topicPartition) List list = new ArrayList<>(); list.add(topicPartition); // consumer.seekToEnd(list); // consumer.seekToBeginning(list); consumer.commitSync(); } } }
十、從特定時間開始消費
offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long>)
- 入參:parttition、時間戳
- 返回:響應的offset
- 調用 seek(TopicPartition partition, long offset)開始消費
示例:
public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { TopicPartition topicPartition = new TopicPartition("producer-syn",0); List<String> topics = new ArrayList<>(); topics.add("producer-syn"); consumer.subscribe(topics); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); // 通過時間戳獲取offset timestampsToSearch.put(topicPartition,1544594780946L); Map<TopicPartition, OffsetAndTimestamp> timestampMap = consumer.offsetsForTimes(timestampsToSearch); // 指定offset consumer.seek(topicPartition,timestampMap.get(topicPartition).offset()); consumer.commitSync(); } } }
其他場景:
通過時間戳查詢指定分區的offsets,前后兩個時間戳就是指定的時間段,所有分區相加就是指定的主題。所以可以通過時間戳查詢指定分區的offsets方法來查詢指定時間段內指定主題的偏移量。結果可以用來核對生產或者同步的消息數量。
十一、監聽rebalance提交
前面我們說過當發生consumer退出或者新增,partition新增的時候會觸發再均衡。那么發生再均衡的時候如果某個consumer正在消費的任務沒有消費完該如何提交當前消費到的offset呢?kafka提供了再均衡監聽器,在發生再均衡之前監聽到,當前consumer可以在失去分區所有權之前處理offset關閉句柄等。
消費者API中有一個()方法:
subscribe(Collection<TopicPartition> var1, ConsumerRebalanceListener var2);
ConsumerRebalanceListener對象就是監聽器的接口對象,我們需要實現自己的監聽器繼承該接口。接口里面有兩個方法需要實現:
void onPartitionsRevoked(Collection<TopicPartition> var1); void onPartitionsAssigned(Collection<TopicPartition> var1);
第一個方法會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取了。
第二個會在重新分配分區之后和消費者開始讀取消息之前被調用。
示例:
public static void main(String[] args) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps()) ; List<String> topics = new ArrayList<>(); topics.add("test-syn"); // 指定offset Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); consumer.subscribe(topics, new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { System.out.println("發生rebalance!提交:"+currentOffset); consumer.commitAsync(currentOffset,null); } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { } }); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); Long offset = 0L; for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); offset = consumerRecord.offset(); } TopicPartition topicPartition = new TopicPartition("test-syn",0); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset); currentOffset.put(topicPartition, offsetAndMetadata); consumer.commitSync(); } }
十二、主動分配分區消費
到目前為止我們討論的都是消費者群組,分區被自動分配給群組的消費者,群組的消費者有變動會觸發再均衡。那么是不是可以回歸到別的消息隊列的方式:不需要群組消費者也可以自己訂閱主題?
kafka也提供了這樣的案例,因為kafka的主題有分區的概念,那么如果沒有群組就意味着你的自己訂閱到特定的一個分區才能消費內容。如果是這樣的話,就不需要訂閱主題,而是為自己分配分區。一個消費者可以訂閱主題(並加入消費者群組),或者為自己分配分區,但不能同時做這兩件事情。注:group.id配置不能與其他consumer重復,否則會報錯,直接去掉也行。
package com.example.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; /** * @author 王柱星 * @version 1.0 * @title * @time 2018年12月11日 * @since 1.0 */ public class PartitionConsumer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("session.timeout.ms", 30000); // 如果其超時,將會可能觸發rebalance並認為已經死去,重新選舉Leader props.put("enable.auto.commit", "false"); // 開啟自動提交 props.put("auto.commit.interval.ms", "1000"); // 自動提交時間 props.put("auto.offset.reset","earliest"); // 從最早的offset開始拉取,latest:從最近的offset開始消費 props.put("client.id", "consumer-3"); // 發送端id,便於統計 props.put("max.poll.records","200"); // 每次批量拉取條數 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } public static void main(String[] args) { try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getProps())) { List<PartitionInfo> partitionInfoList = consumer.partitionsFor("test-syn"); List<TopicPartition> topicPartitionList = new ArrayList<>(); if(partitionInfoList != null){ for(PartitionInfo partitionInfo : partitionInfoList){ topicPartitionList.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition())); consumer.assign(topicPartitionList); } } Map map = consumer.metrics(); System.out.println(map); for(;;){ // 拉取任務超時時間 ConsumerRecords<String,String> records = consumer.poll(1000); for(ConsumerRecord consumerRecord : records){ System.out.println("partition:"+consumerRecord.partition()); System.out.println("offset:"+consumerRecord.offset()); System.out.println("key:"+consumerRecord.key()); System.out.println("value:"+consumerRecord.value()); } consumer.commitSync(); } } } }
注:
- consumer.partitionsFor(“主題”)方法允許我們獲取某個主題的分區信息。
- 知道想消費的分區后使用assign()手動為該消費者分配分區。