1、如何獲取topic主題的列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
2、生產者和消費者的命令行是什么?
生產者在主題上發布消息:
bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topic Hello-Kafka
注意這里的IP是server.properties中的listeners的配置。接下來每個新行就是輸入一條新消息。
消費者接受消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic Hello-Kafka --from-beginning
3、consumer是推還是拉?
Kafka最初考慮的問題是,customer應該從brokes拉取消息還是brokers將消息推送到consumer,也就是pull還push。在這方面,Kafka遵循了一種大部分消息系統共同的傳統的設計:producer將消息推送到broker,consumer從broker拉取消息。
一些消息系統比如Scribe和Apache Flume采用了push模式,將消息推送到下游的consumer。這樣做有好處也有壞處:由broker決定消息推送的速率,對於不同消費速率的consumer就不太好處理了。消息系統都致力於讓consumer以最大的速率最快速的消費消息,但不幸的是,push模式下,當broker推送的速率遠大於consumer消費的速率時,consumer恐怕就要崩潰了。最終Kafka還是選取了傳統的pull模式。
Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取數據。Push模式必須在不知道下游consumer消費能力和消費策略的情況下決定是立即推送每條消息還是緩存之后批量推送。如果為了避免consumer崩潰而采用較低的推送速率,將可能導致一次只推送較少的消息而造成浪費。Pull模式下,consumer就可以根據自己的消費能力去決定這些策略。
Pull有個缺點是,如果broker沒有可供消費的消息,將導致consumer不斷在循環中輪詢,直到新消息到t達。為了避免這點,Kafka有個參數可以讓consumer阻塞知道新消息到達(當然也可以阻塞知道消息的數量達到某個特定的量這樣就可以批量發送)。
4、講講kafka維護消費狀態跟蹤的方法
大部分消息系統在broker端的維護消息被消費的記錄:一個消息被分發到consumer后broker就馬上進行標記或者等待customer的通知后進行標記。這樣也可以在消息在消費后立馬就刪除以減少空間占用。
但是這樣會不會有什么問題呢?如果一條消息發送出去之后就立即被標記為消費過的,一旦consumer處理消息時失敗了(比如程序崩潰)消息就丟失了。為了解決這個問題,很多消息系統提供了另外一個個功能:當消息被發送出去之后僅僅被標記為已發送狀態,當接到consumer已經消費成功的通知后才標記為已被消費的狀態。這雖然解決了消息丟失的問題,但產生了新問題,首先如果consumer處理消息成功了但是向broker發送響應時失敗了,這條消息將被消費兩次。第二個問題時,broker必須維護每條消息的狀態,並且每次都要先鎖住消息然后更改狀態然后釋放鎖。這樣麻煩又來了,且不說要維護大量的狀態數據,比如如果消息發送出去但沒有收到消費成功的通知,這條消息將一直處於被鎖定的狀態,
Kafka采用了不同的策略。Topic被分成了若干分區,每個分區在同一時間只被一個consumer消費。這意味着每個分區被消費的消息在日志中的位置僅僅是一個簡單的整數:offset。這樣就很容易標記每個分區消費狀態就很容易了,僅僅需要一個整數而已。這樣消費狀態的跟蹤就很簡單了。
這帶來了另外一個好處:consumer可以把offset調成一個較老的值,去重新消費老的消息。這對傳統的消息系統來說看起來有些不可思議,但確實是非常有用的,誰規定了一條消息只能被消費一次呢?
5、講一下主從同步
https://blog.csdn.net/honglei915/article/details/37565289
6、為什么需要消息系統,mysql不能滿足需求嗎?
1.解耦:
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2.冗余:
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的”插入-獲取-刪除”范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3.擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4.靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5.可恢復性:
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
6.順序保證:
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性)
7.緩沖:
有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
8.異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
7、Zookeeper對於Kafka的作用是什么?
Zookeeper是一個開放源碼的、高性能的協調服務,它用於Kafka的分布式應用。
Zookeeper主要用於在集群中不同節點之間進行通信
在Kafka中,它被用於提交偏移量,因此如果節點在任何情況下都失敗了,它都可以從之前提交的偏移量中獲取
除此之外,它還執行其他活動,如: leader檢測、分布式同步、配置管理、識別新節點何時離開或連接、集群、節點實時狀態等等。
8、數據傳輸的事務定義有哪三種?
和MQTT的事務定義一樣都是3種。
(1)最多一次: 消息不會被重復發送,最多被傳輸一次,但也有可能一次不傳輸
(2)最少一次: 消息不會被漏發送,最少被傳輸一次,但也有可能被重復傳輸.
(3)精確的一次(Exactly once): 不會漏傳輸也不會重復傳輸,每個消息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的
9、Kafka判斷一個節點是否還活着有那兩個條件?
(1)節點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節點的連接
(2)如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久
10、Kafka 與傳統MQ消息系統之間有三個關鍵區別
(1).Kafka 持久化日志,這些日志可以被重復讀取和無限期保留
(2).Kafka 是一個分布式系統:它以集群的方式運行,可以靈活伸縮,在內部通過復制數據提升容錯能力和高可用性
(3).Kafka 支持實時的流式處理
11、講一講kafka的ack的三種機制
request.required.acks有三個值 0 1 -1(all)
0:生產者不會等待broker的ack,這個延遲最低但是存儲的保證最弱當server掛掉的時候就會丟數據。
1:服務端會等待ack值 leader副本確認接收到消息后發送ack但是如果leader掛掉后他不確保是否復制完成新leader也會導致數據丟失。
-1(all):服務端會等所有的follower的副本受到數據后才會受到leader發出的ack,這樣數據不會丟失
12、消費者如何不自動提交偏移量,由應用提交?
將auto.commit.offset設為false,然后在處理一批消息后commitSync() 或者異步提交commitAsync()
即:
ConsumerRecords<> records = consumer.poll();
for (ConsumerRecord<> record : records){
。。。
tyr{
consumer.commitSync()
}
。。。
}
13、消費者故障,出現活鎖問題如何解決?
出現“活鎖”的情況,是它持續的發送心跳,但是沒有處理。為了預防消費者在這種情況下一直持有分區,我們使用max.poll.interval.ms活躍檢測機制。 在此基礎上,如果你調用的poll的頻率大於最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分區。 發生這種情況時,你會看到offset提交失敗(調用commitSync()引發的CommitFailedException)。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,你必須持續調用poll。
消費者提供兩個配置設置來控制poll循環:
max.poll.interval.ms:增大poll的間隔,可以為消費者提供更多的時間去處理返回的消息(調用poll(long)返回的消息,通常返回的消息都是一批)。缺點是此值越大將會延遲組重新平衡。
max.poll.records:此設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調整此值,可以減少poll間隔,減少重新平衡分組的
對於消息處理時間不可預測地的情況,這些選項是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個線程中,讓消費者繼續調用poll。 但是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,並只有在線程完成處理后才為記錄手動提交偏移量(取決於你)。 還要注意,你需要pause暫停分區,不會從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創建新線程將導致你機器內存溢出)。
14、如何控制消費的位置
kafka使用seek(TopicPartition, long)指定新的消費位置。用於查找服務器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))
15、kafka分布式(不是單機)的情況下,如何保證消息的順序消費?
Kafka分布式的單位是partition,同一個partition用一個write ahead log組織,所以可以保證FIFO的順序。不同partition之間不能保證順序。但是絕大多數用戶都可以通過message key來定義,因為同一個key的message可以保證只發送到同一個partition。
Kafka 中發送1條消息的時候,可以指定(topic, partition, key) 3個參數。partiton 和 key 是可選的。如果你指定了 partition,那就是所有消息發往同1個 partition,就是有序的。並且在消費端,Kafka 保證,1個 partition 只能被1個 consumer 消費。或者你指定 key(比如 order id),具有同1個 key 的所有消息,會發往同1個 partition。
16、kafka的高可用機制是什么?
這個問題比較系統,回答出kafka的系統特點,leader和follower的關系,消息讀寫的順序即可。
https://www.cnblogs.com/qingyunzong/p/9004703.html
https://www.tuicool.com/articles/BNRza2E
https://yq.aliyun.com/articles/64703
17、kafka如何減少數據丟失
https://www.cnblogs.com/huxi2b/p/6056364.html
18、kafka如何不消費重復數據?比如扣款,我們不能重復的扣。
其實還是得結合業務來思考,我這里給幾個思路:
比如你拿個數據要寫庫,你先根據主鍵查一下,如果這數據都有了,你就別插入了,update 一下好吧。
比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產者發送每條數據的時候,里面加一個全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費到了之后,先根據這個 id 去比如 Redis 里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個 id 寫 Redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。
比如基於數據庫的唯一鍵來保證重復數據不會重復插入多條。因為有唯一鍵約束了,重復數據插入只會報錯,不會導致數據庫中出現臟數據。
