kafka 日常使用和數據副本模型的理解
在使用Kafka過程中,有時經常需要查看一些消費者的情況、Kafka健康狀況、臨時查看、同步一些數據,又由於Kafka只是用來做流式存儲,又沒有像Mysql或者Redis提供方便的查詢方法查看數據。只能通過在命令行執行Kafka 腳本方式操作kafka(當然也有一些第三方的kafka監控工具),這里就主要收集一些常用的Kafka命令。
在看到 kafka ISR 副本時,實在忍不住就多扯了一點背后的原理,將Kafka、Redis、ElasticSearch三者對比起來看各自的存儲模型,比如說Redis主要用來做緩存,那采用異步復制能夠減少Client的時延,Redis的P2P結構注定了它采用Gossip協議傳播集群狀態。另外,將Redis里面的基於Raft的選舉算法與ES里面的master選舉對比,也有助於理解分布式系統的選舉理論。當然了,各個原理介紹都淺嘗輒止,僅是自己的一些淺見,里面的每一點都值得仔細深究。
1 查看Kafka集群里面都有哪些消費者組 consumer group
./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
2 查看 每個consumer 對各個分區的消費情況
找到consumer group后,接下來可查看這個 consumer group的消費情況,比如:比如是否有消費延時(LAG)、一共有多少個consumer(OWNER)、還能看到這個consumer 所消費的TOPIC 各個 分區 的消費情況:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group GROUP_ID
3 查看Topic的同步情況
在Kafka中數據采用多個副本存儲。主副本接收生產者、消費者的請求,從副本(replica)只從主副本那里同步數據。
./bin/kafka-topics.sh --zookeeper ZK_IP:2181 --topic user_update_info_topic --describe
Topic:user_update_info_topic PartitionCount:6 ReplicationFactor:2 Configs:retention.ms=259200000
Topic: user_update_info_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: user_update_info_topic Partition: 1 Leader: 4 Replicas: 4,3 Isr: 4,3
Topic: user_update_info_topic Partition: 2 Leader: 0 Replicas: 0,4 Isr: 0,4
Topic: user_update_info_topic Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: user_update_info_topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: user_update_info_topic Partition: 5 Leader: 3 Replicas: 3,4 Isr: 3,4
在這里根據Leader、Replicas、Isr 能發現一個Kafka Broker可能存在的一些故障(這些數字是 server.properties配置項 broker.id)。比如某個broker.id沒有出現在Isr中,可能這台節點上的副本同步出現了問題。
這里介紹一個Kafka的ISR機制:
要保證數據的可靠性,數據會存儲多份,即多個副本。引入多個副本會帶來2個難題:一是各個副本之間的數據如何保證一致?二是對Client寫性能(寫操作)的影響?因為數據有多份時,Client寫入一條消息,什么時候給Client返回ACK成功確認呢?是將寫入的消息成功"同步"給了所有的副本才返回ACK,還是只要主副本"寫入"了就返回ACK?
注意:這里的"寫入"、"同步"都加了引號,是因為:它們只是一個抽象的描述。就拿"寫入"來說:是寫入內存即可、還是寫入到磁盤?這里牽涉到一系列的過程,可參考Redis persistent這篇文章。
對於Kafka而言,生產者發送消息時有一個ack參數(ack=0、ack=1、ack=all),默認情況下ack=1意味着:發送的消息寫入主副本后,返回ACK給生產者。ack=all 意味着:發送的消息寫入所有的ISR集合中的副本后返回ACK給生產者。
某個Topic 的 ISR集合中的副本和它所有的副本是有區別的:ISR集合中的副本是"最新的",這也說明:ISR集合中的副本是動態變化的,比如當發生網絡分區時,某個節點上的從副本與主副本斷開了連接,這些從副本就會從ISR集合中移除。
生產者發送一條消息給Kafka主副本並且收到了ACK返回確認 和 這條消息已提交是兩回事。返回ACK確認的過程上面已經分析了,消息已提交跟參數"min.insync.replicas"息息相關。默認情況下,min.insync.replicas=1,意味着只要主副本寫入了消息,就認為這條消息已經提交了。
下面舉個例子說明生產者ack參數和服務端min.insync.replicas參數分別所起的作用:假設三個副本、ack=1、min.insync.replicas=2,生產者發送的消息只要寫入了主副本就會返回ACK確認給生產者,但是這條消息要成功同步到2個副本之后才算是已提交。若此時ISR集合里面只有主副本(一個),意味着生產者雖然收到了寫入消息的ACK確認,但是這些消息都不能被消費者消費到,因為只有已提交的消息才能被消費者消費到。
min.insync.replicas可提高消息存儲的可靠性:如果ack=1、min.insync.replicas=1,生產者將消息寫入主副本,收到返回ACK確認,但主副本還未來得及將消息同步給其他副本時,主副本所在節點宕機了,那么:就會丟失已經返回了ACK確認的消息。(在《Kafka實戰》一書提到:ack=all時,min.insync.replicas才有意義,這一點需要讀源碼才能驗證了),看了下Kafka官方文檔對 min.insync.replicas 參數的解釋:
When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
根據官方文檔可知:ack=-1或者ack=all時,可將min.insync.replicas設置成 brokers 數量的 majority,從而保證:Producer收到寫入消息的ack時,就能保證該消息一定在大多數副本中寫成功了。
因此,ISR機制加上min.insync.replicas參數就能提高數據的可靠性。當ISR集合里面副本個數大於等於min.insync.replicas時,一切正常。為什么要用ISR機制呢?
ISR機制是基於同步復制和異步復制之間的一種中間狀態,Redis Cluster 采用了異步復制策略,其Redis Sentinel 官方文檔提到,Redis 里面無法保證已經返回給Client ACK確認的消息不丟失,並推薦了兩種補救方案:
- Use synchronous replication (and a proper consensus algorithm to run a replicated state machine).
- Use an eventually consistent system where different versions of the same object can be merged.
而ISR機制較好地兼容了同步復制帶來的寫性能消耗和異步復制導致的數據可靠性問題。ISR集合里面實時維護着一組副本,即同步副本,這組副本的數據與主副本數據是一致的,在Kafka中:同步副本是指:過去6s內向zookeeper發送過心跳或者過去10s內從首領副本那里同步過消息的副本。
其實,ElasticSearch的數據副本模型也是采用的ISR機制,ES中的primary shard負責接收index操作,然后將index的數據同步給各個replica,ES里面提供了參數:wait_for_actives_shards參數來保證當index操作寫入多少個replica后才返回ACK給Client。
Kafka和ElasticSearch的數據副本模型是類似的,只是在Kafka中,生產者寫入和消費者讀取都是由主副本完成,而ES中數據寫入primary shard,但是讀取可以讀各個replica,ES中讀操作存在 read unacknowledged 和 dirty reads。由於ES是個搜索引擎,文檔索引到ES后,用戶關注的問題是:什么時候能夠被搜索到?ES提供的是近實時搜索,因為文檔需要刷新成Segment才能被搜索,同時ES又提供了real time GET API,能實時獲取剛索引的文檔。
此外,ES與Kafka都有一個master節點(這里的master是針對節點而言,不是數據副本的主從),master節點負責集群狀態變更(待完善)。而Redis Cluster采用的是P2P結構,通過Gossip協議來傳播集群狀態,並沒有master節點這樣的角色。在ES中通過Hash文檔ID(murmur3)將文檔hash到各個primary shard,而在Redis Cluster中則是 CRC 再 mod 16384 進行數據分片,以固定的"槽數目"為中間橋梁,將鍵映射到Redis節點上,當動態增刪節點時只需遷移部分槽上的鍵。從數據分布這一點看,ES和Redis是類似的。哈希方式進行數據分布的優勢是:不需要存儲數據分布的元信息,hash結果均勻的話,也能保證較均勻的數據分布。
從集群(節點的角度)結構上看,ES和Kafka都有一個主節點(不管叫master,還是叫controller),而Redis是去中心化P2P結構,所以在傳播集群狀態結構時,ES采用的是兩階段提交協議,Kafka的我沒有去研究,而Redis采用的是Gossip協議。在討論"主從"時,要明確指出是基於節點、還是基於數據副本?在ES中,一個節點上既可存儲主副本(primary shard),也可存儲從副本(replica),ES的 shard allocation 策略是不會把同一個索引的主副本、從副本放在一個節點上,而是說:索引A的主副本在節點1上,索引B的從副本也在節點1上。而對於Redis集群而言,一個主節點存儲一部分鍵空間上的數據,這個主節點可以有若干個從節點,這些從節點從主節點上異步復制數據做副本備份。
ES的master節點(master-eligible)既可用來存儲數據,也可只專心作為master(在配置文件中將node.data配置為false)。因此,ES集群中的節點角色比Redis Cluster要多得多(master-eligible node、data node、coordinating only node)。
4 測試查看一條Kafka消息
由於Kafka的流式特征,在寫代碼的時候需要先知道一下消息的格式。用下面的命令就可以查看一條Kafka Topic上的消息。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC--from-beginning --max-messages 1
5 跨集群數據同步
有個時候,生產環境上的Kafka集群在本地開發環境下不能直接連接導致測試Debug不方便,可采用mirror-make將一部分生產環境上的數據同步到測試環境做測試,^
./bin/kafka-mirror-maker.sh --consumer.config ./config/mirror-maker-consumer.properties --producer.config ./config/mirror-maker-producer.properties --whitelist "TOPIC_NAME"
基本的mirror-maker-producer.properties配置參數如下:
bootstrap.servers=測試環境KafkaIP:9092
acks=1
linger.ms=100
batch.size=16384
retries=3
基本的mirror-maker-consumer.properties配置參數如下:
bootstrap.servers=生產環境上的kafka集群BOOT_STRAP_SERVERS
group.id=mirror-maker
auto.offset.reset=earliest
enable.auto.commit=true
auto.commit.interval.ms=1000
原文:https://www.cnblogs.com/hapjin/p/10785710.html