一、同步的通信方式解決多個服務之間通信的問題,會存在性能和穩定性的問題
同步的方式存在的問題:系統的響應時間太長(商城系統為例:下訂單之后,要去調用創建訂單,減庫存,發優惠卷這些服務,服務之間的通信時間是比較長的,全部完成后才響應用戶訂單創建成功)
在同步的過程中要保證每個服務都順利執行完,整個鏈路才執行完,因為網絡等其他問題,整個鏈路成功執行完的成功率會受影響,導致用戶體驗較差。
異步的通信方式:將下訂單消息發給消息隊列,訂單直接創建成功,微服務去訂閱拉取消息,再去執行。
優勢:明顯提升系統的吞吐量
即使有服務失敗,也可以通過分布式事務解決方案來保證業務執行后的最終一致性
二、MQ的介紹
消息隊列的目的是為了通訊,屏蔽底層復雜的通訊協議,定義了一套應用層的、更加簡單的通信協議,這種協議遵循生產者/消費者模型。
消息隊列同時還有異步、解耦的作用。
消息隊列的流派:有broker的MQ:重topic:kafka、rocketMQ、activeMQ(整個broker,必須依據topic進行消息的中轉)
輕topic:rabbitMQ (topic只是它眾多中轉模式的一種)
無broker的MQ:zeroMQ(直接使用socket進行通信)
kafka是消息處理性能最快的MQ
三、kafka介紹:kafka系一個分布式、支持分區的、多副本的,基於zookeeper的分布式消息系統,它可以實時處理大量數據。
1.kafka使用場景:
日志收集
消息系統
用戶活動跟蹤
運營指標
2.kafka的安裝(需要安裝jdk配置好環境變量)
https://kafka.apache.org/downloads.html官網下一個
解壓之后,進入config目錄編輯server.properties,修改下日志存放位置,服務器端口號,zookeeper的連接地址
broker.id=0
log.dirs=/opt/kafka/klog/kafka-logs
listeners=PLAINTEXT://:9092
zookeeper.connect=121.43.37.22:2181
然后進入bin目錄下,./kafka-server-start.sh --daemon(守護台運行) ../config/server.properties
進入zookeeper目錄下,進入zk客戶端,ls / brokers/ids 看有沒有0這個節點,有就啟動成功咯
3.kafka中的基本概念
Broker:消息中間件處理節點,一個kafka節點就是一個Broker,一個或多個Broker可以構成一個kafka集群
Topic:邏輯上的概念,kafka根據topic對消息進行分類,發布到kafka集群的每條消息都必須指定一個Topic
Producer:消息生產者,向Broker發送消息的客戶端
Consumer:消息消費者,從Broker讀取消息的客戶端
ConsumerGroup:每個Consumer屬於一個特定的Consumer組,一條消息可以被多個不同的ConsumerGroup中的Consumer消費,但是一個ConSumerGroup中只有一個Consumer能夠消費該條消息
Partition:物理上的分區,一個topic可以分為多個partition,每個partion內部消息是有序的。
ReplicationFactor:副本就是分區的備份,放到集群的其他節點上,副本數一般設置和broker節點數量一致
4.創建主題topic
./kafka-topics.sh --create --zookeeper 121.43.37.22:2181 --replication-factor 1(1個副本) --partitions 1(1個分區) --topics mj666
查看當前kafka有哪些topic
./kafka-topics.sh --list --zookeeper 121.43.37.22:2181
5.發送消息給broker中的某個topic
kafka⾃帶了⼀個producer命令客戶端
./kafka-console-producer.sh --broker-list 121.43.37.22:9092 --topic mj666
6.消費消息
對於consumer,kafka同樣也攜帶了⼀個命令⾏客戶端
⽅式⼀:從最后⼀條消息的偏移量+1開始消費(就是從最新發送的消息開始消費) ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --topic mj666
⽅式⼆:從頭開始消費 ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --from-beginning --topic mj666
消息是會被順序存儲的,消息是有偏移量的,可以指明偏移量進行消費
進入到我們指定好的log目錄
cd 進入mj666分區目錄 , ll命令,會看到一個index目錄文件,和一個log消息文件,broker會將消息保存在本地的日志文件中,每個消費者消費消息的偏移量是通過內置的_consumer_offsets主題保存的,這個主題有0-49這50個分區
kafka采用了分段存儲,每一個log文件的大小默認是1GB,沒生成一個log文件就會對應產生一個index文件,是和log文件的命名相同的。這樣在進行消息檢索的時候可以快速利用二分的方法進行查找,定位到某一個分段文件中。
index文件中並沒有為每一條message建立索引。而是采用了稀疏存儲的方式,每隔一定字節的數據建立一條索引
這里具體的細節看下這位大哥寫的十分詳細:https://blog.csdn.net/shudaqi2010/article/details/90815675
7.單波消息和多播消息
單波消息:多個消費者在同一個消費組中,只有一個消費者可以收到訂閱的topic中的消息./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --consumer-999mj group.id=mjGroup --topic mj666
多播消息:不同的消費組訂閱同一個topic,不同的消費組中只有一個消費者能收到消息。換言之,有多個消費者收到了同一條消息。
./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --consumer-mj1 group.id=mjGroup --topic mj666
./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --consumer-mj2 group.id=mjGroup1 --topic mj666
8.查看消費組以及信息
查看當前主題下有哪些消費組./kafka-consumer-groups.sh --bootstrap-server 121.43.37.22:9092 --list
查看消費組中的具體信息:⽐如當前偏移量、最后⼀條消息的偏移量、堆積的消息數量 ./kafka-consumer-groups.sh --bootstrap-server 121.43.37.22:9092 --describe --group mjGroup
四、kafka主題和分區的概念
一個主題中的消息量非常大,可以通過設置分區來分布式存儲這些消息,如一個topic創建了3個分區,那么topic中的消息會分別存放在3個分區中。
- 分區存儲,解決了同一存儲文件過大的問題
- 提供了讀寫的吞吐量,讀和寫同時可以在多個分區中進行
主題創建多個分區 ./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic mj1234
查看topic的分區信息./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mj1234
- 每個消費者定期將⾃⼰消費分區的offset提交給kafka內部topic:__consumer_offsets,提交過去的時候,key是(consumerGroupId+topic+分區號),value就是當前offset的值,kafka會定期清理topic⾥的消息,最后就保留最新的那條數據
- 因為__consumer_offsets可能會接收⾼並發的請求,kafka默認給其分配50個分區(可以通過offsets.topic.num.partitions設置),這樣可以通過加機器的⽅式抗⼤並發。
- 通過如下公式可以選出consumer消費的offset要提交到__consumer_offsets的哪個分區公式:hash(consumerGroupId) % __consumer_offsets主題的分區數
五、kafka集群的搭建
1.搭建3個broker的集群,再准備一個server1.properties,server2.properties,復制server.properties,做如下更改
broker.id=1
listeners=PLAINTEXT://:9093(這一條千萬不要填ip地址,否則集群啟動不起來,只有一個能起來,其他的要報端口被9092占用)
log.dir=/usr/local/data/kafka-logs-2
(advertised_listeners
是對外暴露的服務端口,真正建立連接用的是 listeners) 雲服務器上要把這條配置打開,填上你的ip地址端口號,不然訪問不到。
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/usr/local/data/kafka-logs-3
啟動:./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
搭建完后通過查看zk中的/brokers/ids 看是否啟動成功,此時有這[0,1,3]2個節點
*(內存不足:請查看這位大哥的https://blog.csdn.net/xukaics/article/details/48543881)
編輯kafka-server-start.sh 成這樣就行了
2.副本的概念
副本是對分區的備份。在集群中,不同的副本會被部署在不同的broker上。下⾯例⼦:創建1個主題,2個分區、3個副本。
./kafka-topics.sh --create --zookeeper 121.43.37.22:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
查看主題信息:/kafka-topics.sh --describe --zookeeper 121.43.37.22:2181 --topic my-replicated-topic
replicas:當前副本存在的broker節點
isr:可以同步的broker節點和已同步的broker節點,存放在isr集合中,如果isr中的節點性能較差,會被踢出isr集合
leader:副本⾥的概念,消息發送⽅要把消息發給哪個broker?就看副本的leader是在哪個broker上⾯。副本⾥的leader專⻔⽤來接收消息。接收到消息,其他follower通過poll的⽅式來同步數據。
follower:leader處理所有針對這個partition的讀寫請求,⽽follower被動復制leader,不提供讀寫。如果leader所在的broker掛掉,那么就會進⾏新leader的選舉
- 集群中有多個borker,創建主題時可以指明主題有多個分區(拆分存儲),可以為分區創建多個副本,不同的副本放在不同的broker中。
- _consumer_offsets只有一個broker節點中有
3.集群消息的發送
./kafka-console-producer.sh --broker-list 121.43.37.22:9092,121.43.37.22:9093,121.43.37.22:9094 --topic my-replicated-topic
4.kafka集群消息的消費
./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092,121.43.37.22:9093,121.43.37.22:9094 --from-beginning --topic my-replicated-topic
指定消費組消費消息
./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092,121.43.37.22:9093,121.43.37.22:9094 --from-beginning --consumer-mj123 group.id=mj1234 --topic my-replicated-topic
5.關於分區消費組消費者的細節
- 每個broker中有多個partition,⼀個partition只能被⼀個消費組⾥的某⼀個消費者消費,是為了保證消費順序。Kafka只在partition的范圍內保證消息消費的局部順序性,當消息發給多個partition的話,消費者partition順序不能保證是發送 消息的順序,所以不能在同⼀個topic中的多個partition中保證總的消費順序性。
- 一個消費者掛了的話,kafka的rebalance機制會把這個消費者的分區消息拿給其他消費者消費。
- 消費組中消費者的數量不能⽐⼀個topic中的partition數量多,否則多出來的消費者消費不到消息。
六、kafka的java客戶端-⽣產者的實現
1.⽣產者的基本實現
-引⼊依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
-代碼
public class MySimpleProducer {
private final static String TOPIC_NAME="abc";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"121.43.37.22:9092,
121.43.37.22:9093,
121.43.37.22:9094
");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
//第一個參數為主題名字,第二個參數為分區號,第三個參數key在不傳入分區號的情況下通過算法(hash(key)%partitionnum)決定往哪個分區發消息,第4個參數為發送的消息
//final ProducerRecord<String,String> record = new ProducerRecord(TOPIC_NAME,"mykeyValue", "helloKafka");
final ProducerRecord<String,String> record = new ProducerRecord(TOPIC_NAME,0,"mykeyValue", "helloKafka");
//1.生產者同步發消息
// RecordMetadata recordMetadata = producer.send(record).get();
// System.out.println(recordMetadata.topic()+":"+recordMetadata.partition()+":"+recordMetadata.offset());
//2.生產者異步發消息
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null){
System.out.println("失敗"+e.getStackTrace());
}
if (recordMetadata!=null){
System.out.println("成功"+recordMetadata.topic()+recordMetadata.partition()+recordMetadata.offset());
}
}
}) ;
Thread.sleep(1000000000L);
}
}
2.生產者的同步發送消息:如果⽣產者發送消息沒有收到kafka的ack,⽣產者會阻塞,阻塞到3s的時間,如果還沒有收到消息,會進⾏重試。重試的次數3次。
⽣產者的異步發送消息:異步發送,⽣產者發送完消息后就可以執⾏之后的業務,broker在收到消息后異步調⽤⽣產者提供的callback回調⽅法。
例如上面的異步發送代碼,生產者發送完消息之后主線程直接關閉了,sleep之后才能看到回調的消息。
3.生產者相關配置
在同步發送的前提下,⽣產者在獲得集群返回的ack之前會⼀直阻塞。那么集群什么時候返回ack呢?
ack配置:ack = 0 kafka-cluster不需要任何的broker收到消息,就⽴即返回ack給⽣產者,最容易丟消息的,效率是最⾼的
ack=1(默認): 多副本之間的leader已經收到消息,並把消息寫⼊到本地的log中,才會返回ack給⽣產者,性能和安全性是最均衡的
ack=-1/all。⾥⾯有默認的配置min.insync.replicas=2(默認為1,推薦配置⼤於等於2),此時就需要leader和⼀個follower同步完后,才會返回ack給⽣產者(此時集群中有2個broker已完成數據的接收),這種⽅式最安全,但性能最差。
ack和重試(如果沒有收到ack,就開啟重試)的配置:
props.put(ProducerConfig.ACKS_CONFIG, "1");
發送失敗會重試,默認重試間隔100ms,重試能保證消息發送的可靠性,但是也可能造成消息重復發送,⽐如⽹絡抖動,所以需要在 接收者那邊做好消息接收的冪等性處理
props.put(ProducerConfig.RETRIES_CONFIG, 3);//重試次數設置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);//重試間隔設置
4.消息發送的緩沖區配置
kafka默認會創建⼀個消息緩沖區,⽤來存放要發送的消息,緩沖區是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
kafka本地線程會去緩沖區中⼀次拉16k的數據,發送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
如果線程拉不到16k的數據,間隔10ms也會將已拉到的數據發到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
七、kafka的java客戶端-消費者的實現
1.基本實現
public class MyConsumer {
private final static String TOPIC_NAME = "1885";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"121.43.37.22:9092,121.43.37.22:9093");
//添加消費組
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 創建⼀個消費者的客戶端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 消費者訂閱主題列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) { /* * poll() API 是拉取消息的⻓輪詢 */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
{ System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }
}
2.消費者自動提交和手動提交offset
無論是自動還是手動,都需要把所屬的消費組+消費的主題+消費的分區和消費的偏移量提交到集群的_consumer_offsets主題里面
(1)自動提交:消費者poll消息下來以后自動提交offset,自動提交會丟失消息,消費者在消費前提交offset,可能提交完后還沒有消費就掛掉,下一個消費組中的消費者會從提交的offset開始消費,之前未被消費的消息就丟失了。
// 是否⾃動提交offset,默認就是true。 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// ⾃動提交offset的間隔時間 。 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
(2)手動提交: // 自動提交配置改為false props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手動同步:消息消費完后,提交offset,會阻塞到offset提交成功后集群返回ACK。
//手動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) { /* * poll() API 是拉取消息的⻓輪詢 */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
{ System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }
//所有的消息已經消費完
if (records.count() > 0) {//有消息
// ⼿動同步提交offset,當前線程會阻塞直到offset提交成功
// ⼀般使⽤同步提交,因為提交之后⼀般也沒有什么邏輯代碼了
consumer.commitSync();
}
}
手動異步:消息消費完后提交,不會阻塞不需要等待集群ACK,直接執行之后的邏輯,可以設置一個回調方法,供集群調用
*⼀般使⽤同步提交,因為提交之后⼀般也沒有什么邏輯代碼了
//手動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) { /* * poll() API 是拉取消息的⻓輪詢 */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
{ System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }
//所有的消息已經消費完
if (records.count() > 0) {//有消息
// ⼿動同步提交offset,當前線程會阻塞直到offset提交成功
// ⼀般使⽤同步提交,因為提交之后⼀般也沒有什么邏輯代碼了
consumer.commitSync();
// ⼿動異步提交offset,當前線程提交offset不會阻塞,可以繼續處理后⾯的程序邏輯
consumer.commitAsync(new OffsetCommitCallback()
{
@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception)
{ if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
} } });}
}
} 3.⻓輪詢poll消息
默認情況下,消費者⼀次會poll500條消息。
//⼀次poll最⼤拉取消息的條數,可以根據消費速度的快慢來設置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
代碼中設置了⻓輪詢的時間是1000毫秒
while (true) { /* * poll() API 是拉取消息的⻓輪詢 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }
*那么:(1)如果⼀次poll到500條,就直接執⾏for循環
如果這⼀次沒有poll到500條。且時間在1秒內,那么⻓輪詢繼續poll,要么到500條,要么到1s。如果多次poll都沒達到500條,且1秒時間到了,那么直接執⾏for循環
(2)如果兩次poll的間隔超過30s,集群會認為該消費者的消費能⼒過弱,該消費者被踢出消費組,觸發rebalance機制,rebalance機制會造成性能開銷。
可以通過設置這個參數,讓⼀次poll的消息條數少⼀點
⼀次poll最⼤拉取消息的條數,可以根據消費速度的快慢來設置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
如果兩次poll的時間如果超出了30s的時間間隔,kafka會認為其消費能⼒過弱,將其踢出消費組。將分區分配給其他消費者。-rebalance props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
4.消費者的健康狀態檢查
消費者每隔1s向kafka集群發送⼼跳,集群發現如果有超過10s沒有續約的消費者,將被踢出消費組,觸發該消費組的rebalance機制,將該分區交給消費組⾥的其他消費者進⾏消費。
//consumer給broker發送⼼跳的間隔時間 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超過10秒沒有收到消費者的⼼跳,則會把消費者踢出消費組,進⾏rebalance,把分區分配給其他消費者。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
5.指定分區和偏移量、時間消費
指定分區消費:consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
從頭消費: consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
指定offset消費:consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
指定時間消費:根據時間,去所有的partition中確定該時間對應的offset,然后去所有的partition中找到該offset之后的消息開始消費
public class TimestampConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", ":9092,:9092,:9092");
props.put("group.id", "dev3-yangyunhe-topic001-group001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "lj";
try {
// 獲取topic的partition信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
long nowTime = now.getTime();
System.out.println("當前時間: " + df.format(now));
long fetchDataTime = nowTime - 1000 * 60 * 60; // 計算30分鍾之前的時間戳
for(PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}
consumer.assign(topicPartitions);
// 獲取每個partition一個小時之前的偏移量
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
System.out.println("開始設置各分區初始偏移量...");
for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果設置的查詢偏移量的時間點大於最大的索引記錄時間,那么value就為空
offsetTimestamp = entry.getValue();
if(offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
System.out.println("partition = " + partition +
", time = " + df.format(new Date(timestamp))+
", offset = " + offset);
// 設置讀取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
System.out.println("設置各分區初始偏移量結束...");
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
6.新消費組的消費offset規則
新消費組中的消費者在啟動以后,默認會從當前分區的最后⼀條消息的offset+1開始消費(消費新消息)。可以通過以下的設置,讓新的消費者第⼀次從頭開始消費。之后開始消費新消息(最后消費的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
earliest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none
topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
八、springboot項目整合kafka
1.引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.編寫配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 47.108.203.233:9092,47.108.203.233:9095
#獨立的生產者和消費者只需要各自加各自的配置即可
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
ack-mode: manual_immediate
# ⼿動調⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤這種
#redis:
#host: 121.43.37.22
#port: 6379
3.創建生產者controller
@RestController
@RequestMapping("/msg")public class MyKafkaController {
private final static String TOPIC_NAME = "lj";
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
System.out.println("發送成功");
return "send success!";
}}
4.創建消費者
@Component
public class MyConsumer {
/* @KafkaListener(topics = "lj", groupId = "MyGroup778")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿動提交offset
ack.acknowledge();
}*/
//通過注解指定消費多個主題,多個分區,並指定消費的偏移量,指定消費者總數
@KafkaListener(groupId = "99",
topicPartitions = {
@TopicPartition(topic = "lj", partitions = {"0", "1"}),
@TopicPartition(topic = "lb", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "30")) },concurrency = "3")
//concurrency就是同組下的消費者個數,就是並發消費數,建議⼩於等於分區總數
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿動提交offset(這里配置的是每一條記錄,即處理一條消息馬上提交)
ack.acknowledge();
}
}
九、kafka集群中的controller、rebalance、HW
1.controller
每個broker啟動的時候會在zk創建一個臨時序號節點,獲得最小序號的那個broker就為成為kafka集群controller。
- 當某個分區的leader副本出現故障時,由控制器負責為該分區選舉新的leader副本。選舉規則:isr集合中最左邊的節點(性能最高的)
- 當檢測到某個分區的ISR集合發⽣變化時(broker的數量發生變化),由控制器負責通知所有broker更新其元數據信息。
- 當使⽤kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責讓新分區被其他節點感知到。
2.rebalance機制
前提:消費者沒有指明分區消費。
觸發: 當消費組⾥消費者和分區的關系發⽣變化,那么就會觸發rebalance機制,重新調整消費者消費哪個分區
在觸發rebalance機制之前,消費者消費哪個分區有三種策略:
range:通過公式來計算某個消費者消費哪個分區 (第一個消費者:sum(分區總數)/n(消費者數量)+1 (sum/n有余數時) 其他消費者:sum/n)
輪詢: 輪流去分分區來消費
sticky:在觸發了rebalance后,在消費者消費的原分區不變的基礎上進⾏調整。(若是沒有開啟粘合這種策略的話,觸發rebalance之后,則會切掉之前所有的關系,重新按照range或者輪詢分配)
3.HW和LEO
LEO是某個副本(副本的作用是高可用)最后消息的消息位置(log-end-offset)
HW是已完成同步的位置(一個partition對應的isr中最小的LEO),consumer最多只能消費到HW所在的位置。
每個replica都有HW,leader和follower各⾃負責更新⾃⼰的HW的狀態。對於leader新寫⼊的消息,consumer不能⽴刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費。
這樣的⽬的是防⽌leader所在的broker掛掉造成消息的丟失,該消息仍可以從新選舉的leader中獲取。
⼗、Kafka中的優化問題
1.如何防⽌消息丟失
發送⽅: ack是1 或者-1/all 可以防⽌消息丟失,ack設成all,把min.insync.replicas設置>=2
消費⽅:把⾃動提交改為⼿動提交。
2.如何防⽌重復消費
在防⽌消息丟失的⽅案中,如果⽣產者發送完消息后,因為⽹絡抖動,沒有收到ack,但實際上broker已經收到了。此時⽣產者會進⾏重試,於是broker就會收到多條相同的消息,⽽造成消費者的重復消費。
⽣產者關閉重試:會造成丟消息(不建議)
消費者解決⾮冪等性消費問題:所謂的冪等性:多次訪問的結果是⼀樣的。(對於rest的請求(get(冪等)、post(⾮冪等)、put(冪等)、delete(冪等))
解決⽅案:(1)mysql插⼊業務id(unique),所以⼀次只能插⼊⼀條.(創建表之后ALTER TABLE `表名` ADD unique(`serverid`))
(2)使⽤redis或zk的分布式鎖,以業務id為鎖。保證只有⼀條記錄能夠創建成功
3.如何做到消息的順序消費
⽣產者:保證消息按順序消費,且消息不丟失——使⽤同步的發送,ack設置成⾮0的值。等到發送成功再發送下⼀條。確保消息是順序發送的。
消費者:主題只能設置一個分區,消費組只能有一個消費者。
kafka順序消費犧牲掉了性能,使用場景不多,RocketMQ有實現的功能
4.解決消息積壓問題(消費者消費消息速度跟不上生產速度,導致kafka中有大量的數據沒有被消費,隨着沒有被消費的消息越來越多,消費者尋址的性能會越來越差,導致整個kafka集群對外提供服務的性能越來越差,造成服務雪崩)
解決方案:
⽅案⼀:在⼀個消費者中啟動多個線程,讓多個線程同時消費。——提升⼀個消費者的消費能⼒。
(還可通過業務的架構設計,提升業務層面的消費性能)
方案二:如果⽅案⼀還不夠的話,創建多個消費組,啟動多個消費者,多個消費者部署在不同的服務器上。其實多個消費者部署在同⼀服務器上也可以提⾼消費能⼒——充分利⽤服務器的cpu資源。
⽅案三:讓⼀個消費者去把收到的消息往另外⼀個topic上發,另⼀個topic設置多個分區和多個消費者 ,進⾏具體的業務消費。(不常用)
5.延遲隊列
1)應⽤場景
訂單創建后,超過30分鍾沒有⽀付,則需要取消訂單,這種場景可以通過延時隊列來實現
2)具體⽅案
- kafka中創建創建相應的主題(topic_5s、topic_30m)
- 消費者消費該主題的消息(輪詢)
- 消費者消費消息時判斷消息的創建時間和當前時間是否超過30分鍾(前提是訂單沒⽀付)
-如果是:去數據庫中修改訂單狀態為已取消
-如果不是:記錄當前消息的offset,並不再繼續消費之后的消息。
等待1分鍾后,再次向kafka拉取該offset及之后的消息,繼續進⾏判斷,以此反復
十一、Kafka-eagle監控平台(EFAK)
1)安裝jdk
2)官網下載:http://download.kafka-eagle.org/解壓
3)配置環境變量:vim /etc/profile 配置立即生效source /etc/profile
export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
4)修改配置文件:vim system-config.properties
修改⾥⾯的zk的地址和mysql的地址
5)啟動:./ke.sh start
6)訪問 公網ip:8048/ke
輸入這個綠色的賬號密碼
就可以查看信息了