kafka + mqtt接口實現


由於底層的CNC機床必須使用MQTT通信,所以采用MQTT作為底層的通訊協議,再上傳到kafka集群。

現在有兩種實現方案:一種是使用mqtt-kafka connector,相當於直接使用兩者之間的接口通信;另一種是設置一個gateway。

由於暫未找到合適的python接口,所以自己簡單實現了一個,代碼如下。

 mqtt發布端:

import sys
import datetime
import socket, sys
import paho.mqtt.publish as publish
import time

def transmitMQTT(strMsg):
    # strMqttBroker = "led_power.mqtt.iot.gz.baidubce.com"
    # strMqttBroker = "127.0.0.1"
    strMqttBroker = "192.168.3.10"
    strMqttChannel = "confluent-kafka-topic"
    print(len(str(strMsg)))
    publish.single(strMqttChannel, strMsg, hostname=strMqttBroker)

if __name__ == '__main__':
    while True:
        transmitMQTT("testtest")
    time.sleep(0.1)

        print (time.time())

接口(mqtt收+kafka生產消息):

import confluent_kafka
import time
import paho.mqtt.client as mqtt
import time
import sys
import random


def on_connect(client, userdata, flags, rc):
    print("Connected with result code"+str(rc))
    client.subscribe(topic)


def on_message(client, userdata, msg):
    print(msg.payload)
    confluent_kafka_producer_performance(msg.payload)


def confluent_kafka_producer_performance(msg_payload):
    # topic = 'confluent-kafka-topic'
    conf = {'bootstrap.servers': '192.168.3.102:9092'}
    producer = confluent_kafka.Producer(**conf)
    messages_to_retry = 0

    producer_start = time.time()
    for i in range(10):
        try:
            producer.produce(topic, value=msg_payload)
        except BufferError as e:
            messages_to_retry += 1

    # hacky retry messages that over filled the local buffer
    for i in range(messages_to_retry):
        producer.poll(0)
        try:
            producer.produce(topic, value=msg_payload)
        except BufferError as e:
            producer.poll(0)
            producer.produce(topic, value=msg_payload)

    producer.flush()

    return time.time() - producer_start


topic = 'confluent-kafka-topic'
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect('192.168.3.10', 1883, 60)
client.loop_forever()

kafka接收端:

from confluent_kafka import Consumer, KafkaError

settings = {
    'bootstrap.servers': '192.168.3.10:9092',
    'group.id': 'mygroup',
    'client.id': 'client-1',
    'enable.auto.commit': True,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'}
}

c = Consumer(settings)

c.subscribe(['confluent-kafka-topic'])

try:
    while True:
        msg = c.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            print('Received message: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(), msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))

except KeyboardInterrupt:
    pass

finally:
    c.close()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM