python小例-生產、消費
生產
#!/usr/bin/env python # -*- coding: utf-8 -*- from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092') for i in range(10): msg_dict = { 'id': '349834', 'task_id': '9kjifewo' } msg = json.dumps(msg_dict) producer.send('topic_a', msg) producer.close()
消費
#!/usr/bin/env python # -*- coding: utf-8 -*- from kafka import KafkaConsumer consumer = KafkaConsumer('topic_a', bootstrap_servers='localhost:9092') print consumer print "<<" * 10 for msg in consumer: print msg.value, type(msg.value) print ">>" * 10
基本用法
topic_name = 'my_topic_name' consumer = KafkaConsumer(topic_name, bootstrap_servers=['localhost:9092']) # consumer是一個消息隊列,當后台有消息時,這個消息隊列就會自動增加.所以遍歷也總是會有數據,當消息隊列中沒有數據時,就會堵塞等待消息到來 for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
指定分區、offset、消費組
#encoding:utf8 from kafka import KafkaConsumer, TopicPartition my_topic = "my_topic_name" # 指定需要消費的主題 consumer = KafkaConsumer( bootstrap_servers = "192.168.70.221:19092,192.168.70.222:19092,192.168.70.223:19092", # kafka集群地址 group_id = "my_group_a", # 消費組id enable_auto_commit = True, # 每過一段時間自動提交所有已消費的消息(在迭代時提交) auto_commit_interval_ms = 5000, # 自動提交的周期(毫秒) ) consumer.assign([ TopicPartition(topic=my_topic, partition=0), TopicPartition(topic=my_topic, partition=1), TopicPartition(topic=my_topic, partition=2) ]) consumer.seek(TopicPartition(topic=my_topic, partition=0), 12) # 指定起始offset為12 consumer.seek(TopicPartition(topic=my_topic, partition=1), 0) # 可以注冊多個分區,此分區從第一條消息開始接收 # consumer.seek(TopicPartition(topic=my_topic, partition=2), 32) # 沒有注冊的分區上的消息不會被消費 for msg in consumer: # 迭代器,等待下一條消息 print msg # 打印消息
注:因指定了分區、偏移量,不會消費分區為2的信息;如果開啟2個相同服務,會把同樣的消息消費2次
手動提交
enable_auto_commit = True
手動提交
消費了會自動提交offset, 如果想保證業務處理完再手動提交,需要 設置 enable_auto_commit = False
from kafka import KafkaConsumer, OffsetAndMetadata tp = TopicPartition(my_topic, 0) consumer.commit(offsets={tp: (OffsetAndMetadata(msg.offset + 1, None))}
注意提交的偏移量是下次消費開始的位置。如果設置為當前offset,下次會重復消費
附
KafkaConsumer構造函數參數列表
*topics ,要訂閱的主題 bootstrap_servers :kafka節點或節點的列表,不一定需要羅列所有的kafka節點。格式為: ‘host[:port]’ 。默認值是:localhost:9092 client_id (str) : 客戶端id,默認值: ‘kafka-python-{version}’ group_id (str or None):分組id key_deserializer (callable) :key反序列化函數 value_deserializer (callable):value反序列化函數 fetch_min_bytes:服務器應每次返回的最小數據量 fetch_max_wait_ms (int): 服務器應每次返回的最大等待時間 fetch_max_bytes (int) :服務器應每次返回的最大數據量 max_partition_fetch_bytes (int) : request_timeout_ms (int) retry_backoff_ms (int) reconnect_backoff_ms (int) reconnect_backoff_max_ms (int) max_in_flight_requests_per_connection (int) auto_offset_reset (str) enable_auto_commit (bool) auto_commit_interval_ms (int) default_offset_commit_callback (callable) check_crcs (bool) metadata_max_age_ms (int) partition_assignment_strategy (list) max_poll_records (int) max_poll_interval_ms (int) session_timeout_ms (int) heartbeat_interval_ms (int) receive_buffer_bytes (int) send_buffer_bytes (int) socket_options (list) consumer_timeout_ms (int) skip_double_compressed_messages (bool) security_protocol (str) ssl_context (ssl.SSLContext) ssl_check_hostname (bool) ssl_cafile (str) – ssl_certfile (str) ssl_keyfile (str) ssl_password (str) ssl_crlfile (str) api_version (tuple)
KafkaConsumer 函數
assign(partitions):手動為該消費者分配一個topic分區列表。 assignment():獲取當前分配給該消費者的topic分區。 beginning_offsets(partitions):獲取給定分區的第一個偏移量。 close(autocommit=True):關閉消費者 commit(offsets=None):提交偏移量,直到成功或錯誤為止。 commit_async(offsets=None, callback=None):異步提交偏移量。 committed(partition):獲取給定分區的最后一個提交的偏移量。 end_offsets(partitions):獲取分區的最大偏移量 highwater(partition):分區最大的偏移量 metrics(raw=False):返回消費者性能指標 next():返回下一條數據 offsets_for_times(timestamps):根據時間戳獲取分區偏移量 partitions_for_topic(topic):返回topic的partition列表,返回一個set集合 pause(*partitions):停止獲取數據paused():返回停止獲取的分區poll(timeout_ms=0, max_records=None):獲取數據 position(partition):獲取分區的偏移量 resume(*partitions):恢復抓取指定的分區 seek(partition, offset):seek偏移量 seek_to_beginning(*partitions):搜索最舊的偏移量 seek_to_end(*partitions):搜索最近可用的偏移量 subscribe(topics=(), pattern=None, listener=None):訂閱topics subscription():返回當前消費者消費的所有topic topics():返回當前消費者消費的所有topic,返回的是unicode unsubscribe():取消訂閱所有的topic
