python讀取kafka,輸出到Vertica數據庫


# 主測試
# https://docs.python.org/2/library/json.html
import sys
import json
import vertica_python
import time
import os
from pykafka import KafkaClient  # 導入的vertica_python和pykafka包需要pip install安裝

# 顯示當前時間
print('開始時間', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
print(sys.getdefaultencoding())
client = KafkaClient(hosts="192.168.1.1:9092")  # 填寫kafka地址和端口,一般是9092端口
# client.topics  # 查看所有topic
topic = client.topics[b'topic']  # 選擇一個topic
consumer = topic.get_simple_consumer(consumer_timeout_ms=2000, auto_commit_enable=1)  # 等待5秒無新數據,退出
data_group = []
conn_info = {'host': '192.168.1.1', 'port': 1, 'user': 'a', 'password': 'b',
             'database': 'c', 'read_timeout': 600, 'unicode_error': 'strict', 'ssl': False}  # 填寫數據庫連接信息
# simple connection, with manual close
connection = vertica_python.connect(**conn_info)
cur = connection.cursor()
a_error_count = 0
a_success_count = 0
path_os = os.path.abspath('offset.txt') # 將數據偏移量offset寫入文件
f1 = open(path_os, 'r', encoding='utf8')
a_offset_start = int(f1.readline())  # 從a_offset_start開始讀數據
print(a_offset_start)
# a_offset_start = 3000 # 可以手工指定從哪里開始讀取數據,排錯用
f1.close()
for message in consumer:  # 循環0
    if message is not None and message.offset > a_offset_start:
        try:
            a = message.value.decode('UTF-8')
            data_group.append(json.loads(a))
            c = message.offset
            for item in data_group:
                str1 = "insert into 表名(列名) values "+ "('" + str(c) \ # 將offset值也寫入數據庫 
          + "'," + "'%s','%s');\r\n" % ( item['列名1'], item['列名2']) print(str1) cur.execute(str1) connection.commit() a_success_count += 1 data_group.pop() except: print('error_message') a_error_count += 1 continue c1 = message.offset f = open(path_os, 'w+' , encoding='utf8') f.truncate() f.write(str(c1)) f.write('\n' + '開始時間=' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) f.write('\n' + 'a_success_count=' + str(a_success_count)) f.write('\n' + 'a_error_count=' + str(a_error_count)) f.close()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM