python調用kafka服務(使用kafka-python庫)


試驗環境:

CDH 5.15.1

CentOS 7

Python 3.7.0

kafka 1.1.1

kafka-python :https://pypi.org/project/kafka-python/#files

實驗目的:

通過python線程,不斷的將指定接口中的數據取出,並將數據不斷發送到kafka服務中。

實驗步驟-1:

先將kafka-python下載並安裝成功;

進行一個python調用kafka的簡單測試:

進入python3的終端:

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers=["master:9092"])
>>> producer.send("test",b"Hello world")
<kafka.producer.future.FutureRecordMetadata object at 0x7f4bf56fbda0>
>>> producer.send("test",b"Hello world")
<kafka.producer.future.FutureRecordMetadata object at 0x7f4bf5715438>

啟動kafka消費者:

kafka-console-consumer  --zookeeper master:2181 --from-beginning --topic test

輸出結果:

Hello world
Hello world

實驗步驟-2:

實驗代碼:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File  : ParsePS.py
# @Author: cjj
# @Date  : 2019/6/4
# @Desc  : 請求接口,獲取數據,對數據進行清洗


import re
import threading
import time
from urllib.error import URLError

from kafka import KafkaProducer
from kafka.errors import KafkaError
from suds.client import Client

class Data_clean:
    # 獲取測點數據的函數
    def get_data(observation_point_name):

        try:
            # 獲取接口數據
            user_url = 'http://xxx.xxx.xxx.xxx/ServiceSL/ServiceGetInsqlData.svc?wsdl'
            client = Client(user_url)
            result = client.service.GetSingleTagInfo(observation_point_name)
            # 1.對數據進行清洗
            # 1.1 先將數據轉換成字符串
            str1 = str(result)
            # 1.2 取出所有雙引號里面的數據,並將列表轉換成字符串
            pattern = re.compile('"(.*)"')
            str2 = str(pattern.findall(str1))
            # 1.3 將單引號去掉
            str3 = str2.replace('\'', '')
            # 1.4 將逗號換成制表符
            str4 = str3.replace(', ', '\t')
            # 1.5 去掉字符串前后的[]
            str5 = str4[:-1][1:]

            return str5
        except TimeoutError as e:
            print("\033[1;31;0m>>>>>>TimeoutError ->->->->->-> 對接口的請求超時<<<<<<\033[0m")
            # print(e)
        except URLError as e:
            print("\033[1;31;0m>>>>>>URLError ->->->->->-> 連接不到sql服務器<<<<<<\033[0m")
        except:
            print("\033[1;31;0m>>>>>>其它原因報錯<<<<<<\033[0m")

try:
    producer = KafkaProducer(bootstrap_servers='master:9092')
    while 1:
        
        msg = Data_clean.get_data("SLWS_ps_1hzybqz_WD.PV")
        print(msg)

        # 指定主題和發送內容,將數據發送到kafka
        producer.send('test', msg.encode('utf-8'))
        time.sleep(5)

except KafkaError as e:
    print(e)
finally:
    producer.close()
    print('done!!!')

將代碼上傳到Linux服務器

執行代碼:python3 ParsePS.py

查看kafka消費者結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM