1.python-kafka:
api送上:https://kafka-python.readthedocs.io/en/latest/apidoc/KafkaConsumer.html
2.實現一個broker、topic可配置的生產者與消費者:
#coding=utf-8
import time import logging import sys import json import etc.config as conf sys.path.append('***********/kafka-python-1.3.3') from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError from kafka import TopicPartition def log_name(): base_name = conf.kafka_logDir date = time.strftime('%Y%m%d',time.localtime(time.time())) + '.log'
return base_name + date logging.basicConfig(level=logging.DEBUG, format='%(asctime)-15s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=log_name(), filemode='a' ) console = logging.StreamHandler() console.setLevel(logging.INFO) logging.getLogger('').addHandler(console) class kfkProducer(object): # producer = None
def __init__(self, broker, kafkaPort, kafkaTopic=''): self._broker = broker self._kafkaPort = kafkaPort self._kafkaTopic = kafkaTopic def __str__(self): logging.info("--------------------------------") logging.info("kafka-producer params ...") logging.info("[KAFKA-BROKER]:%s" %self._broker) logging.info("[KAFKA-PORT]:%s" %self._kafkaPort) logging.info("[KAFKA-TOPIC]:%s" %self._kafkaTopic) logging.info("--------------------------------") def registerKfkProducer(self): try: producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self._broker, kafka_port=self._kafkaPort )) except KafkaError as e: logging.info(e) return producer def produceMsg(self, topic, msg, partition=0): # 自動將輸入字符串轉化為json格式,產出消息
if(topic in ('', None)): logging.error("topic is None, plz check!") else: try: # parmas_message = json.dumps(msg)#轉化為json格式
producer = self.registerKfkProducer() producer.send(topic, value=msg, partition=partition) producer.flush() # time.sleep(1)
except KafkaError as e: logging.info(e) class kfkConsumer(object): # consumer = None
def __init__(self, broker, kafkaPort, kafkaTopic=''): self._broker = broker self._kafkaPort = kafkaPort self._kafkaTopic = kafkaTopic def __str__(self): logging.info("--------------------------------") logging.info("kafka-consumer params ...") logging.info("[KAFKA-BROKER]:%s" %self._broker) logging.info("[KAFKA-PORT]:%s" %self._kafkaPort) logging.info("[KAFKA-TOPIC]:%s" %self._kafkaTopic) logging.info("--------------------------------") def registerConsumer(self): try: consumer = KafkaConsumer( bootstrap_servers=[self._broker+':'+self._kafkaPort], auto_offset_reset='earliest') except KafkaError as e: logging.info(e) return consumer def consumerMsg(self, topic, partition=0): if(topic in ('', None)): logging.error("topic is None, plz check!") else: try: v_consumer = self.registerConsumer() v_consumer.assign([TopicPartition(topic,partition)]) # self.registerConsumer.subscribe([self._kafkaTopic])
for message in v_consumer: # message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')
logging.info("%s:%d:%d: msg=%s" % (message.topic, message.partition, message.offset, message.value.decode('utf-8'))) except KafkaError as e: logging.info(e)
3.實現命令行輸入topic和partition,即可生產消息:
#coding=utf-8
import os import sys import json import etc.config as conf from PykafkaMgr import kfkProducer #從json文件獲取消息
def getMsgFromJsonfile(filePath): if(not os.path.isfile(filePath)): print(u"[%s] 輸入的json文件路徑有誤,請檢查..." %filePath) else: with open(filePath) as json_file: return json.load(json_file) def except4v(): if(len(sys.argv) <= 1): print(u"未輸入topic和partition!\n你可以--help查看具體使用方法...") elif(sys.argv[1].startswith("--")): option = sys.argv[1][2:] # print(option)
if(option in ("version", "Version")): print("Version 1.0 \nPython 2.7.3 (default, Nov 6 2015, 14:11:14) \ \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2") elif(option == "help"): print(u"produceMsg.py 接收兩個參數, 第一個是topic, 第二個是partition \neg:python produceMsg.py test 0 \n向topic名為test第0分區生產消息") def calcMsg(jsonMsg): sumMsg, sumAcct = 0, 0 msgNum = len(jsonMsg) print("------------------------------------------") for i in range(msgNum): acct_num = len(jsonMsg[i]["MSGBODY"]) print(u"第[%d]條消息,包含ACCT_ID賬戶數:[%d]個"%(i+1, acct_num)) sumMsg = i+1 sumAcct += acct_num acct_num = 0 print(u"本次生產消息總共[%d]條, 總共賬戶數:[%d]個"%(sumMsg, sumAcct)) print("------------------------------------------") if __name__ == '__main__': except4v() if(len(sys.argv) == 3): topic = sys.argv[1] partition = int(sys.argv[2]) produce = kfkProducer(conf.kafka_mgr["broker"], conf.kafka_mgr["port"], topic) produce.__str__() jsonMsg = getMsgFromJsonfile(conf.kafka_produce) for i in range(len(jsonMsg)): produce.produceMsg(topic, ('%s'%jsonMsg[i]).encode('utf-8'), partition) calcMsg(jsonMsg)
4.設置兩個配置文件:
第一個是config.py
#coding=utf-8 #broker配置還有一種方式是:kafka_mgr={"broker":'ip1:port,ip2:port,...,ipn:port'},就是改為kafka集群,不過代碼要稍微作調整(參數列表改下就行了)。當然配置兩種,通過一個開關去控制也可以。自選 kafka_mgr = { "broker" : '10.***.***.***', "port" : 6667, } kafka_logDir = r"/*******/log/****" #生產者輸入json文件 kafka_produce = r"/**********/data/input/produceMsg.json"
生產者輸入json文件:
produceMsg.json
json文件附上說明,具體可以按照說明配置 hi, welcome here~ produceMsg.json ================================= 輸入json格式數據,作為生產者消息的輸入。 1.支持多條json數據輸入。格式如下: [ json1, json2, ..., jsonN ] 總體結構是:[ , ] 2.此json文件不能加注釋,因為會破壞json文件格式,導致無法解析 3.輸入只要是json格式,不需要關注是不是一行或多行,多換行、空格等都不影響解析
消費者也是利用以上兩個配置文件去實現即可。此處代碼略