Kafka 真是一個異常穩定的組件,服務器上我們部署了 kafka_2.11-1.0.1 版本的 kafka 除了幾次計算時間太長觸發了 rebalance 以外,基本沒有處理過什么奇怪的問題。
但是還是感覺 Kafka 的配置非常全面非常多,也非常容易把人搞懵逼。有時候看官方文檔也就是一句話,經常搞得人不明所以。所以想仔細看看 然后總結一下。
讀取配置 server.properties 查看 「Broker」相關配置
1. 設置這個 broker 節點的 id 值
broker.id=0
2. 可以配置可以訪問 kafka 的對應端口號和 hostname
port=9092 host.name=10.171.97.1
這里需要注意,能訪問 kafka 還支持其他類型的監聽器,比如這種方式
這里的 PLAINTEXT 表示明文傳輸
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092
3. 配置 kafka broker 的集群存儲信息,這里有個 log basics 的區域。這一塊應該都等待你的配置不會有默認值的,
b# A comma seperated list of directories under which to store log files log.dirs=/opt/kafka/kafka_2.11-1.0.1/logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=10
這里第一個參數是值 kafka 生產數據的日志放在哪個路徑。
第二個參數就是創建 topic 的時候默認生成幾個 partitions.
log.dirs 這個 s 就代表着他可以被設置多個路徑存儲日志,這一點非常關鍵和重要。我們可以在這個參數位置設置上多個路徑來提升吞吐量。比如
log.dirs=/opt/kafka/kafka_2.11-1.0.1/logs,/opt/kafka/kafka_2.11-1.0.1/logs1,/opt/kafka/kafka_2.11-1.0.1/logs2
這種配置方法似乎在 1.1版本之后還引入了故障轉移機制(failover) 但是我沒有嘗試過,在這里記錄一下之后有時間仔細調研一下。
4. 下面是跟 zookeeper 通訊需要用到的配置。
############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=10.171.97.1:2181,10.163.13.219:2181,10.170.249.122:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
zookeeper.connect 是配置連接 zookeeper 的地址,因為 zookeeper 通常是集群,所以同樣可以使用逗號分隔配置多個地址和端口。
zookeeper.connection.timeout.ms 是配置連接 zk 的超時時間
這里注意,所有 ip:port 都可以被更換為 hostname:port 這沒什么問題。也有很多人推薦使用 host:port 這種寫法,我覺得不寫錯應該都沒啥問題- -
5. topic 相關的部分
delete.topic.enable=true auto.create.topics.enable=false unclean.leader.election.enable=false auto.leader.rebalance.enable=true
5.1. 參數是是否允許可以刪除對應的 topic 。這個默認是 False 可以設置打開。因為有時候我們是真的需要刪除這個 topic !!!
5.2. 自動創建 topics 開啟。這個玩意我建議不開,開啟就意味着隨便來個客戶端接上一個沒有的 topic 也會被自動創建上,看上去很方便。如果有不那么熟悉的隊友,你就知道這個還是關掉吧。
5.3. 允許除開 leader 副本之外的副本成為 leader。成為 leader 副本的條件就是這個副本是跟上最新的副本的而且 lag 少。但是當他們都掛了之后,設置了這個參數就運行沒有跟上的副本成為 leader 副本,不可避免的會產生一些數據丟失。
5.4. 開啟這個參數會定期進行 leader 的重新選舉。如果我們出現了機器的崩潰可能會造成我們的 副本 leader 進行偏移形成 leader 傾斜。比如我們本來是 0 1 2 三台機器上均衡的分布着各副本的 leader 突然 2 掛了 那么 2 是 leader 那一部分就會向 0 和 1 節點的 isr 副本進行轉移。這樣當 2重新活過來之后 他上面就沒 leader 了。如果我們不再均衡一下 leader 他就一直維持這樣了。
因為 kafka 是只有 leader 會對外進行服務的,這樣壓力就偏移了。開啟這個參數我們將可以又重新平衡 leader ,讓其恢復到初始狀態。
6. 日志數據保留相關的參數
############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000
message.max.bytes
6.1. log.retention.hours 設置一個日志保留時間,這個除了 hours 可選還有[minutes|ms] 可選比如 log.retention.ms ,感覺這個小時就差不多了吧 默認 168個小時 7天。
6.2. log.retention.bytes 我感覺設置了時間沒必要控制大小,這個默認是一個分區一個 G 數據,超過了會滾動刪除。如果我有10個 partitions ,就一共可以存儲 10G 數據。
下面圖片是一個 partition 里的情況。
6.3. log.segment.bytes 這個是設置消息片最大片大小 設置 1g 1g應該夠用了也就是到達一 g 之后會切日志並且等待過期。
6.4. log.retention.check.interval.ms 這個是設置多久檢查一下是否符合以上配置的間隔時間。
6.5 message.max.bytes 消息默認是 1M 如果生產者嘗試發送的消息超過這個大小,不僅消息不會被接收,還會收到 broker 返回的錯誤消息。跟其他與字節相關的配置參數一樣,該參數指的是壓縮后的消息大小,也就是說,只要壓縮后的消息小於 mesage.max.bytes,那么消息的實際大小可以大於這個值這個值對性能有顯著的影響。
7. 客戶端生產者參數
7.1 max_in_flight_requests_per_connection 設置成 1 可以保證在有 retry 的情況下也能在一個 partitions 里面進行順序發送。
7.2 retries 重試次數,如果不設置默認為 0 ,設置了在默認 max_in_flight_requests_per_connection == 5 的情況下可能因為重試機制而在單個 partitions 里亂序。
7.3 delivery.timeout.ms kafka-python 里還有一個沒有支持的 KIP-91 參數,這個參數用於設置發送傳輸以及重試消息的最大時間。超過 retries 次數和超過這個時間都會跳過該消息。默認是 120s
Reference:
https://matt33.com/2017/09/10/produccer-end/ Kafka 源碼解析之 Producer 單 Partition 順序性實現及配置說明
https://cwiki.apache.org/confluence/display/KAFKA/An+analysis+of+the+impact+of+max.in.flight.requests.per.connection+and+acks+on+Producer+performance An analysis of the impact of max.in.flight.requests.per.connection and acks on Producer performance
https://www.cnblogs.com/huxi2b/p/6815797.html Kafka 0.11版本新功能介紹 —— 空消費組延時rebalance