python kafka生產者 進程間通信和使用線程池


from sfo_common.import_common import *
import random
from pykafka import KafkaClient

class ProduceKafkaInfo(object):
"""
Kafka數據生產類
"""
def __init__(self):
self.random_size = 1000000

def produce_kafka_info(self, topic,data):
"""
將data數據生產到kafka指定的主題中
:param topic:
:param data:
:return:
"""
try:
if not topic:
raise IOError, 'the param topic can not be None'
if not data:
raise IOError, 'the param data can not be None'
if topic and data:
p_key = random.randint(1, self.random_size)
pclient = KafkaClient(zookeeper_hosts=config.zookeeper_server,socket_timeout_ms=1000*5)
if pclient.topics.has_key(topic):
tp = pclient.topics[topic]
pid = tp.partitions
pkey = p_key % len(pid)
with tp.get_producer(partitioner=lambda pid, key: pid[pkey]) as producer:
producer.produce(message=data,partition_key=b'partition_key_{}'.format(str(pkey)))
producer.stop()
except Exception as ex:
logger.exception("produce_kafka_info function execute exception:%s,the topic is %s,the data is %s" % (str(ex),str(topic),str(data)))

from sfo_common.agent import Agent
from sfo_common.import_common import *
import uuid
import time
import socket
import json
import signal
import schedule
from multiprocessing import Queue
queue = Queue()
upload_heart_queue = Queue()


def beat_heart():
"""
發送心跳數據,數據生成到kafka中
:return:
"""
kfk = ProduceKafkaInfo()
try:
isok = True
heart = {}
heart['guid'] = str(uuid.uuid1())
heart['data_model'] = 'BeatHeart'
heart['cluster_name'] = config.swift_cluster_name
heart['hostname'] = socket.getfqdn()
heart['add_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
data = json.dumps(heart, encoding="UTF-8", ensure_ascii=True)
#檢測到所有網關是否聯通,全部連通才發送心跳
gates = util.exct_cmd("route -n |awk '{print $2}'|grep -v '0.0.0.0'|sed -n '3,$p'")
bad_gate = []
for gate in gates:
isok = isok and util.check_ip_alive(gate)
if not isok:
bad_gate.append(gate)
if isok:
if upload_heart_queue.empty():
upload_heart_queue.put(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
kfk.produce_kafka_info(config.kafka_sys_topic, data)
else:
last_time = upload_heart_queue.get(1)
now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
if util.datediff_seconds(last_time,now_time) < config.upload_refresh:
upload_heart_queue.put(last_time)
else:
upload_heart_queue.put(now_time)
kfk.produce_kafka_info(config.kafka_sys_topic, data)
else:
alarmdata = {}
alarmdata['guid'] = str(uuid.uuid1())
alarmdata['data_model'] = 'SfoAlarmLog'
alarmdata['alarm_device'] = "node-heartbeat-{}".format(str(data['host_name']))
alarmdata['alarm_type'] = "software"
alarmdata['hostname'] = data['host_name']
alarmdata['device_name'] = 'HEART-BEAT'
alarmdata['alarm_message'] = 'the host connect to the gate {} failed'.format(str(bad_gate))
alarmdata['alarm_level'] = 'critical'
alarmdata['alarm_result'] = '0'
alarmdata['add_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
alert = json.dumps(alarmdata, encoding='utf-8', ensure_ascii=True)
if alert:
kfk.produce_kafka_info(config.kafka_sys_topic, alert)
except Exception as ex:
logger.exception("sys beatheart function execute exception:" + str(ex))

def get_beat_heart_schl():
"""
起線程定時執行心跳數據發送
:return:
"""
try:
queue.put(beat_heart)
except Exception as ex:
logger.exception("get_beat_heart_schl function execute exception:" + str(ex))

class BeatHeartAgnet(Agent):
def __init__(self, pidfile):
Agent.__init__(self, pidfile)

def run(self):
"""
重寫守護進程的run函數,實現定時發送心跳功能
:return:
"""
try:
sys.stdout.flush()
sys.stderr.flush()
hostname = socket.getfqdn()
hostip = socket.gethostbyname(hostname)
logger.info("hostname is {}, ip is {}".format(hostname, hostip))
pid = os.fork()
if pid < 0:
logger.info("fork process failed")
elif pid == 0:
          #主進程退出后回收子進程
signal.signal(signal.SIGTERM,signal.SIG_DFL)
while True:
            #子進程起線程池消費隊列中的任務
with ThreadPoolExecutor(config.thread_workers) as executor:
if queue.empty():
time.sleep(0.1)
else:
for i in range(queue.qsize()):
th = queue.get(1)
executor.submit(th)
else:
          #父進程產生定時任務
schedule.every(config.heart_refresh).seconds.do(get_beat_heart_schl)
schedule.run_all(0)
while True:
schedule.run_pending()
time.sleep(0.1)
except Exception as ex:
logger.exception("BeatHeartAgnet run function execute exception:" + str(ex))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM