kafka系列文章之python-api的使用。
在使用kafka-python時候需要注意,一定要版本兼容,否則在使用生產者會報 無法更新元數據的錯誤。
在本片測試中java版本為如下,kafka版本為0.10.0,kafka-python版本為1.3.1,目前最新的版本為1.4.4
[root@test2 bin]# java -version java version "1.7.0_79" Java(TM) SE Runtime Environment (build 1.7.0_79-b15) Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
從官網下載kafka-python,源碼安裝即可!https://pypi.org/project/kafka-python/1.3.1/
安裝完成之后一個簡易的測試:
#一個簡單的生成者
>>>: from kafka import KafkaProducer >>>: producer = KafkaProducer(bootstrap_servers=["10.0.102.204:9092"]) >>>: producer.send("science",b"Hello world") <kafka.producer.future.FutureRecordMetadata object at 0x0000000003B28080>
#我們向science主題發送了一個“Hello world”消息。可以在控制台使用消費者查看如下
[root@test3 bin]# ./kafka-console-consumer.sh --zookeeper=10.0.102.204:2181 --topic science --from-beginning
Hello world
上面只是一個簡單的實例,主要用來驗證當前的python api是否可以使用;下面會詳細說明python-kafka的使用。
kafka生成者
一個應用程序在很多情況下需要往kafka寫入消息:記錄用戶的活動(用於審計和分析),記錄度量指標,保存日志消息,與其他應用程序進行異步通信,緩沖即將寫入到數據庫的數據,等等。
盡管生產者API使用起來很簡單,但消息的發送過程還是比較復雜,如下圖(摘自kafka權威指南)
首先從創建一個ProducerRecord對象開始,ProducerRecord對象需要包含目標主題和要發送的內容。我們還可以指定鍵和分區。在發送ProducerRecord對象時,生產者首先要把鍵和值對象進行序列化,這樣他們才能在網絡上傳輸。python3.x中需要序列化為bytes類型,才能傳輸。
然后,數據被傳給分區器。如果之前在ProducerRecord對象里指定了分區,那么分區器就不會再做任何事情,直接把指定的分區返回。如果沒有指定分區,那么分區器會根據ProducerRecord對象來選擇一個分區。選好分區之后,生成者就指知道該往哪個主題和分區發送這條記錄。緊接着,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的broker上。
服務器在收到消息之后會返回一個響應。如果消息成功寫入kafka,就返回一個RecodMetaDate對象,它包含了主題和分區信息,以及記錄在分區里的偏移量。如果寫入失敗,則會返回一個錯誤。
要往kafka寫入消息,首先要創建一個生產者對象,並設置一些屬性。下面介紹一些kafka的屬性。
bootstrap.servers:該屬性指定broker的地址清單,地址格式為host:port。清單里不需要包含所有的broker地址,生產者會從給定的broker中查找到其他的broker的信息。不過建議
至少要提供兩個broker信息,一旦其中一個宕機,生產者仍然能連接到集群上。 key_serializer (callable) – used to convert user-supplied keys to bytes If not None, called as f(key), should return bytes. Default: None. value_serializer (callable) – used to convert user-supplied message values to bytes. If not None, called as f(value), should return bytes. Default: None. 上面的兩個數值指定了鍵和值怎么序列化。默認是none。
在實例化了一個生成者對象之后,實例有一個config屬性,返回的是當前生產者的默認配置如下: producer.config {'bootstrap_servers': ['10.0.102.204:9092'], 'client_id': 'kafka-python-producer-1', 'key_serializer': None, 'value_serializer': None, 'acks': 1, 'compression_type': None, 'retries': 0, 'batch_size': 16384, 'linger_ms': 0, 'partitioner': <kafka.partitioner.default.DefaultPartitioner object at 0x0000000003A037B8>, 'buffer_memory': 33554432, 'connections_max_idle_ms': 600000, 'max_block_ms': 60000, 'max_request_size': 1048576, 'metadata_max_age_ms': 300000, 'retry_backoff_ms': 100, 'request_timeout_ms': 30000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(6, 1, 1)], 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None, 'ssl_crlfile': None, 'api_version': (0, 10), 'api_version_auto_timeout_ms': 2000, 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'selector': <class 'selectors.SelectSelector'>, 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None} data = producer.config type(data) <class 'dict'> data是一個字典對象,然后就可以使用修改字典的方法修改對應的屬性。 從上面可以看到生成者有很多可以配置的參數,他們大部分的參數都有合理的默認值,所有沒有必要修改它們。不過有幾個參數在內存中使用,性能和可靠性方面對生產者影響比較大,下面會介紹這些參數。 acks,這個參數指定了必須有多少個分區副本收到消息,生產者才會認為消息寫入是成功的。這個參數對消息丟失的可能性有重要影響。 如果acks=0;生成者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現了問題,導致服務器沒有接收到消息,那么生產者是不知道的,消息也就是丟失了。
不過,因為生成者不需要等待服務器響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。 如果acks=1:只要集群的首領節點收到消息,生成者會收到一個來自服務器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新的首領還沒有被選舉出來),生產者會收到
一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。這個時候的吞吐量取決於使用的是同步發送還是異步
發送。如果讓客戶端等待服務器的響應(通過調用future對象的get()方法),顯然會增加延遲。如果客戶端使用回調,延遲問題可以得到緩解,不過吞吐量還是會受發送中
消息數量的限制。 如果acks=all,只有當所有參與復制的節點全部收到消息時,生成者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器
發生崩潰,整個集群仍然可以運作。不過,它的延遲比acks=1時更高,因為我們要等待不只一個服務器節點接收消息。 buffer.memory: 該參數用來設置生成者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。
這個時候send()方法調用要么被阻塞,要么拋出異常,取決於如何設置block.on.buffer.full參數(0.9.0.0之后的版本中被替換為max.block.ms,表示在拋出異常之
前可以阻塞一段時間) compression.type: 默認情況下,消息發送不會被壓縮。該參數可以設置為snappy, gzip或lz4,它指定了消息被發送給broker之前使用哪一種壓縮算法進行壓縮。snappy壓縮,
占用較少的CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。gzip壓縮算法一般會占用較多的CPU,但會提供更高的壓縮比,
所以如果網絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網絡傳輸開銷和存儲開銷。 retries:生成者從服務器收到的錯誤有可能是臨時性的錯誤。在這種情況下,retries參數的值決定了生成者可以重發消息的次數,如果達到這個次數,生產者會放棄重試並返回錯誤。
默認情況下,生產者會在每次重試之間等待100ms,不過可以通過retry.backoff.ms參數來個改變這個時間間隔。 batch.size:當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,安裝字節數計算。當批次被填滿時,批次里所
有的消息都會被發送出去。不過生產者並不一定都會等到批次填滿才發送,半滿的批次,甚至只包含一個消息的批次也有可能被發送。所以就算把批次帶下設置得很大,也不會
造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產者需要更頻繁地發送消息,會增加一些額外的開銷。 linger.ms :該參數指定了生產者在發送批次之前等待更多消息加入批次的時間。kafka生產者會在批次填滿或者linger.ms達到上限時把批次發送出去。默認情況下,只要有可用的線程,生產者就會把消息發送出去,就算批次里只有一個消息。把linger.ms設置成比0大的數,讓生產者在發送批次之前等待一會兒,使更多的消息加入到這個批次。雖然會增加延遲,但是會提升吞吐量(因為一次發送更多的消息,每個消息的開銷變小了) client.id: 該參數可以是任意字符串,服務器會用它來識別消息的來源,還可以用在日志和配額指標里。 max.in.flight.requests.per.connection: 該參數指定了生產者在收到服務器響應之前可以發送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。把它設置
為1可以保證消息是按照發送的順序寫入服務器的,即使發生了重試。 timeout.ms: 指定了broker等待同步副本返回消息確認的時間,與acks的配置相匹配----如果在指定時間內沒有收到同步副本的確認,那么broker就會返回一個錯誤。 request.timeout.ms: 指定了生產者在發送數據時等待服務器返回響應的時間。 metadata.fetch.timeout.ms:指定了生產者在獲取元數據時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發送數據,要么返回一個錯誤。 max.block.ms: 該參數指定了在調用send()方法或使用partitionFor()方法獲取元數據時生產者阻塞時間,當生產者的發送緩沖區已滿,或者沒有可用的元數據時,這些方法就會阻塞,
在阻塞時間達到max.block.ms,生產者會拋出異常。 max.request.size: 該參數用於控制生成者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為1MB,那么可以發送的
單個最大消息為1MB,或者生產者可以在單個請求里發送一個批次,該批次包含了1000個消息,每個消息大小為1KB。另外,broker對可接收的消息最大值也有自己的限制,
所以兩邊的配置最好可以匹配,避免生產者發送的消息被broker拒絕。 receive.buffer.bytes和send.buffer.bytes:這兩個參數分別指定了TCP socket接收和發送數據包的緩沖區大小。如果它們被設置為-1.就是用操作系統的默認值。如果生產者或
消費者與broker處於不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
kafka消費者
應用程序從kafkaconsumer向kafka訂閱主題,並從訂閱的主題上接收消息。從kafka讀取數據不同於從其他消息系統讀取數據,它涉及一些概念,需要先理解一下!
消費者和消費者群組
設想一種情況:應用程序從kafka訂閱主題,讀取消息,但是我們知道生產者在向主題寫入消息時,可以是多個生產者並發寫入的,這時候生產者向主題寫入消息的速度超過了應用程序驗證數據的速度,這個時候該怎么處理?如果只使用單個消費者處理消息,應用程序會遠遠跟不上消息的生成速度。顯然,此時很有必要對消費者進行橫向伸縮,就像多個生產者向相同主題寫入消息一樣,我們也可以使用多個消費者從同一個逐日讀取消息,對消息進行分流。
kafka消費者從屬於消費者群組。一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。如果往群組里添加更多的消費者,超過主題的分區數量,那么有一部分消費者就會被閑置,不會接收到任何消息。
往群組里添加消費者是橫向伸縮消費能力的主要方式。kafka消費者經常會做一些高延遲的操作,比如把數據寫到數據庫或HDFS,或者使用數據進行比較耗時的計算。在這些情況下,單個消費者無法跟上數據生成的速度,所以可以增加更多的消費者,讓他們分擔負載,每個消費者只處理部分分區的消息,這就是橫向伸縮的主要手段。我們有必要為主題創建大量的分區,在負載增長時可以加入更多的消費者。
kafka設計的主要目標之一,就是要讓kafka主題里的數據能夠滿足企業各種應用場景的需求。在這些場景中,每個應用程序可以獲取到所有的消息,而不只是其中的一部分。只要保證每個應用程序有自己的消費者群組,就可以讓他們獲取到主題的所有的消息。不同於傳統的消息系統,橫向伸縮kafka消費者和消費者群組並不會對性能造成負面影響。【每個消費者群組得道的是所有的消息,而不是部分的消息】
消費者群組與分區再均衡
主要說一些概念性的東西
一個新的悄費者加入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩憤時,它就離開群組,原本由它讀取的分區將由群組里的其他消費者來讀取。在主題發生變化時, 比如管理員添加了新的分區,會發生分區重分配。
分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。再均衡非常重要, 它為消費者群組帶來了高可用性和伸縮性(我們可以放心地添加或移除梢費者),不過在正常情況下,我們並不希望發生這樣的行為。在再均衡期間,消費者無法讀取消息,造成整個群組一小段時間的不可用。另外,當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。我們將在本章討論如何進行安全的再均衡,以及如何避免不必要的再均衡。
消費者通過向被指派為群組協調器的broker(不同的群組可以有不同的協調器)發送心跳來維持它們和群組的從屬關系以及它們對分區的所有權關系。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區里的消息。消費者會在輪詢消息(為了獲取消息)或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。如果一個消費者發生崩憤,並停止讀取消息,群組協調器會等待幾秒鍾,確認它死亡了才會觸發再均衡。在這幾秒鍾時間里,死掉的消費者不會讀取分區里的消息。在清理消費者時,消費者會通知協調器它將要離開群組,協調器會立即觸發一次再均衡,盡量降低處理停頓。
在0. 10.1 版本里, Kafka 社區引入了一個獨立的心跳線程,可以在輪均消息的空檔發送心跳。這樣一來,發送心跳的頻率(也就是消費者群紐用於檢測發生崩潰的消費者或不再發送心跳的消費者的時間)與消息輪詢的頻率(由處理消息所花費的時間未確定)之間就是相互獨立的。在新版本的Kafka 里,可以指定消費者在離開群紐並觸發再均衡之前可以有多長時間不進行消息輪詢,這樣可以避免出現活鎖(livel ock ) ,比如有時候應用程序並沒有崩潰,只是由於某些原因導致無法正常運行。這個配直與
session.timeout.ms 是相互獨立的,后者用於控制檢測消費者發生崩潰的時間和停止發送心跳的時間。
當消費者要加入群組時,它會向群組協調器發送一個JoinGroup 請求。第一個加入群組的消費者將成為“群主”。群主從協調器那里獲得群組的成員列表(列表中包含了所有最近發送過心跳的消費者,它們被認為是活躍的),並負責給每一個悄費者分配分區。它使用一個實現了partitionAssignor接口的類來決定哪些分區應該被分配給哪個消費者。
上面介紹了消費者和消費者群組的一些理論性東西,下面來簡單創建一個消費者!
from kafka import KafkaConsumer consumer = KafkaConsumer("science", bootstrap_servers=["10.0.102.204:9092"], auto_offset_reset='earliest') for i in consumer: print(i)
#這里KafkaConsumer接收了三個參數,主題,borker的服務器地址和端口,以及消費者從哪里開始讀取消息
#上面的代碼若是consumer有數據則會打印出數據,若是主題中沒有消息,則會一致阻塞等待
上面print打印出的一個數值如下:
ConsumerRecord(topic='science', partition=0, offset=0, timestamp=1545358023897, timestamp_type=0, key=None, value=b'test message', checksum=1777691423, serialized_key_size=-1, serialized_value_size=12)
#返回的是一個consumer對象,包含了一些元數據信息。
消息輪詢是消費者API的核心,通過一個簡單的輪詢向服務器請求數據。一旦消費者訂閱了主題,輪詢就會處理所有的細節,包括群組協調,分區再均衡,發送心跳和獲取數據,開發者只需要使用一組簡單的API來處理從分區返回的數據。上面的代碼是一個簡單的利用for循環的輪詢。
消費者的參數配置
1. fetch.min.bytes:該屬性指定了消費者從服務器獲取記錄的最小字節數。broker在收到消費者的數據請求時,如果可用的數據量小於fetch.min.bytes 指定的大小,那么它會等到
有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和broker的工作負載,因為它們在主題不是很活躍的時候(或者一天里的低谷時段)就不需要來來回回地處理消息。如果沒有
很多可用數據,但消費者的CPU 使用率卻很高,那么就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值設置得大一點可以降低broker 的工作負載。 2. fetch.max.wait.ms:我們通過fetch.min.bytes告訴Kafka ,等到有足夠的數據時才把它返回給消費者。而fetch.max.wait.ms 則用於指定broker 的等待時間,默認是500ms
,如果沒有足夠的數據流入Kafka ,消費者獲取最小數據量的要求就得不到滿足,最終導致500ms 的延遲。如果要降低潛在的延遲(為了滿足SLA ),可以把該參數值設置得小一些。
如果fetch.max.wait.ms被設為lOOms ,並且fetch.min.bytes被設為1MB ,那么Kafka 在收到消費者的請求后,要么返回1MB 數據,要么在1OOms 后返回所有可用的數據,
就看哪個條件先得到滿足。 3. max.partition.fetch.bytes:該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值是1MB,也就是說kafkaconsumer.poll() 方法從每個分區里返回的記錄最
多不超過max.partition.fetch.bytes指定的字節。如果一個主題有20 個分區和5 個消費者,那么每個消費者需要至少4MB 的可用內存來接收記錄。在為消費者分配內存時,
可以給它們多分配一些,因為如果群組里有消費者發生崩憤,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes的值必須比broker 能夠接收的最大消息的字節
數(通過max.message.size屬性配置)大, 否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。在設置該屬性時,另一個需要考慮的因素是消費者處理數據的時間。
消費者需要頻繁調用poll()方法來避免會話過期和發生分區再均衡,如果單次調用poll()返回的數據太多,消費者需要更多的時間來處理,可能無怯及時進行下一個輪詢來避免會
話過期。如果出現這種情況, 可以把max.partition.fetch.bytes值改小,或者延長會i舌過期時間。 4. session.timeout.ms:該屬性指定了消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是3s 。如果消費者沒有在session.timeout.ms 指定的時間內發送心跳給群組協調
器,就被認為已經死亡,協調器就會觸發再均衡,把它的分區分配給群組里的其他消費者。該屬性與heartbeat.interval.ms緊密相關。heartbeat.interval.ms指定了poll()
方法向協調器發送心跳的頻率, session.timeout.ms 則指定了消費者可以多久不發送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms必須比
session.timeout.ms 小, 一般是session.timeout.ms 的三分之一。如果session.timeout.ms 是3s ,那么heartbeat.interval.ms應該是1s 。
把session.timeout.ms 值設得比默認值小,可以更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。把該屬性的值設 置得大一些,可以減少意外的再均衡,不過檢測節點崩憤-需要更長的時間。 5.auto.offset.reset: 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時井被刪除)該作何處理。它的默認
值是latest , 意思是說,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)。另一個值是earlist,意思是說,在偏移量無效的情況下,
消費者將從起始位置讀取分區的記錄。 6.enable.auto.commit: 該屬性指定了消費者是否自動提交偏移量,默認值是true。為了盡量避免出現重復數據和數據丟失,可以把它設為false ,由自己控制何時提交偏移量。如果把
它設為true ,還可以通過配置auto.commit.interval.ms屬性來控制提交的頻率。 7.partition.assignment.strategy: 我們知道,分區會被分配給群組里的消費者。PartitionAssignor根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者。
Kafka 有兩個默認的分配策略。
Range: 該策略會把主題的若干個連續的分區分配給消費者。假設悄費者c1 和消費者c2 同時訂閱了主題t1 和主題t2 ,井且每個主題有3 個分區。那么消費者c1有可能分配到這兩個主題的分區0
和分區1 ,而消費者C2 分配到這兩個主題的分區2。因為每個主題擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最后分配到比第二個消費者更多的分區。只要使用了Range
策略,而且分區數量無怯被消費者數量整除,就會出現這種情況。 RoundRobin: 該策略把主題的所有分區逐個分配給消費者。如果使用RoundRobin 策略來給消費者c1和消費者c2 分配分區,那么消費者c1 將分到主題T1的分區0 和分區2 以及主題t2的分區1 ,消費
者C2 將分配到主題t1 的分區1 以及主題t2的分區0 和分區2 。一般來說,如果所有消費者都訂閱相同的主題(這種情況很常見) , RoundRobin 策略會給所有消費者分配相同數量的分區(或最多就差一個分區)。
可以通過設置partition.assignment.strategy來選擇分區策略。默認使用的是org.apache.kafka.clients.consumer.RangeAssignor,這個類實現了Range策略,不過也可以把
它改為org.apache.kafka.clients.consumer.RoundRobinAssignor。還可以自定義策略,在這種情況下,partition.assignment,strategy屬性的值就是自定義類的名字。 8.client.id:該屬性可以是任意字符串, broker 用它來標識從客戶端發送過來的消息,通常被用在日志、度量指標和配額里。
9.max.poll.records:該屬性用於控制單次調用call() 方法能夠返回的記錄數量,可以幫你控制在輪詢里需要處理的數據量。 10. receive.buffer.bytes和send.buffer.bytes: socket 在讀寫數據時用到的TCP 緩沖區也可以設置大小。如果它們被設為-1 ,就使用操作系統的默認值。如果生產者或消費者
與broker 處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
上面介紹了kafka-python中生成者與消費者的理論概念,下一篇博文會給出怎么使用kafka的API接口!