試驗環境:
CDH 5.15.1
CentOS 7
Python 3.7.0
kafka 1.1.1
kafka-python :https://pypi.org/project/kafka-python/#files
實驗目的:
通過python線程,不斷的將指定接口中的數據取出,並將數據不斷發送到kafka服務中。
實驗步驟-1:
先將kafka-python下載並安裝成功;
進行一個python調用kafka的簡單測試:
進入python3的終端:
>>> from kafka import KafkaProducer >>> producer = KafkaProducer(bootstrap_servers=["master:9092"]) >>> producer.send("test",b"Hello world") <kafka.producer.future.FutureRecordMetadata object at 0x7f4bf56fbda0> >>> producer.send("test",b"Hello world") <kafka.producer.future.FutureRecordMetadata object at 0x7f4bf5715438>
啟動kafka消費者:
kafka-console-consumer --zookeeper master:2181 --from-beginning --topic test
輸出結果:
Hello world
Hello world
實驗步驟-2:
實驗代碼:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @File : ParsePS.py # @Author: cjj # @Date : 2019/6/4 # @Desc : 請求接口,獲取數據,對數據進行清洗 import re import threading import time from urllib.error import URLError from kafka import KafkaProducer from kafka.errors import KafkaError from suds.client import Client class Data_clean: # 獲取測點數據的函數 def get_data(observation_point_name): try: # 獲取接口數據 user_url = 'http://xxx.xxx.xxx.xxx/ServiceSL/ServiceGetInsqlData.svc?wsdl' client = Client(user_url) result = client.service.GetSingleTagInfo(observation_point_name) # 1.對數據進行清洗 # 1.1 先將數據轉換成字符串 str1 = str(result) # 1.2 取出所有雙引號里面的數據,並將列表轉換成字符串 pattern = re.compile('"(.*)"') str2 = str(pattern.findall(str1)) # 1.3 將單引號去掉 str3 = str2.replace('\'', '') # 1.4 將逗號換成制表符 str4 = str3.replace(', ', '\t') # 1.5 去掉字符串前后的[] str5 = str4[:-1][1:] return str5 except TimeoutError as e: print("\033[1;31;0m>>>>>>TimeoutError ->->->->->-> 對接口的請求超時<<<<<<\033[0m") # print(e) except URLError as e: print("\033[1;31;0m>>>>>>URLError ->->->->->-> 連接不到sql服務器<<<<<<\033[0m") except: print("\033[1;31;0m>>>>>>其它原因報錯<<<<<<\033[0m") try: producer = KafkaProducer(bootstrap_servers='master:9092') while 1: msg = Data_clean.get_data("SLWS_ps_1hzybqz_WD.PV") print(msg) # 指定主題和發送內容,將數據發送到kafka producer.send('test', msg.encode('utf-8')) time.sleep(5) except KafkaError as e: print(e) finally: producer.close() print('done!!!')
將代碼上傳到Linux服務器
執行代碼:python3 ParsePS.py
查看kafka消費者結果: