kafka知識總結
//切換到安裝路徑命令
cd /home/kafka/kafka_2.11-0.10.2.1/bin
//啟動kafka服務,三台主機分別輸入此指令:
./kafka-server-start.sh $KAFKA_HOME/config/server.properties &
//以后台的方式啟動
nohup ./kafka-server-start.sh $KAFKA_HOME/config/server.properties &
//查看topic名
./kafka-topics.sh --list --zookeeper 10.101.22.41:2181
./kafka-topics.sh --list --zookeeper 10.101.22.41:2181,10.101.22.42:2181,10.101.22.43:2181
//查詢topic內容
./kafka-console-consumer.sh --bootstrap-server 10.101.22.41:9092,10.101.22.42:9093,10.101.22.43:9094 --topic oth_cpd_active_realtime_data --from-beginning
//查看某個Topic的詳情
./kafka-topics.sh --topic oth_cpd_active_realtime_data --describe --zookeeper 10.101.22.41:2181
./kafka-topics.sh --topic stat_cpd-flow_cv-data --describe --zookeeper 10.101.22.41:2181
//查看消費者
./kafka-consumer-groups.sh --bootstrap-server 10.101.22.41:9092 --list
//查看消息隊列生產隊列堆積情況
./kafka-consumer-groups.sh --bootstrap-server 10.101.22.41:9092 --group ad-statistics-state-press --describe
//修改分區數
./kafka-topics.sh --zookeeper 10.101.22.41:2181 -alter --partitions 4 --topic oth_cpd_active_realtime_data
//kafka刪除topic的數據
修改server.properties,添加以下內容
delete.topic.enable=true
刪除命令:./kafka-run-class.sh kafka.admin.TopicCommand --delete --topicstat_cpd-flow_cv-data --zookeeper 10.101.22.41:2181,10.101.22.42:2181,10.101.22.43:2181
刪除kafka存儲目錄(server.properties文件log.dirs配置)相關topic的數據目錄。如果有多個分區,要到kafka群里的每台機器上,刪除相關topic的數據目錄。
進入到zk的bin目錄
sh zkCli.sh -server 10.101.22.41:2181,10.101.22.42:2181,10.101.22.43:2181
ls /brokers/topics
rmr /brokers/topics/stat_cpd-flow_cv-data
ls /admin/delete_topics
rmr /admin/delete_topics/stat_cpd-flow_cv-data
ls /config/topics
rmr /config/topics/stat_cpd-flow_cv-dataa
//創建topic
./kafka-topics.sh --create --zookeeper 10.101.22.41:2181 --replication-factor 1 --partitions 4 --topic oth_cpd_active_realtime_data
//kafka重啟步驟:
1、kill掉kafka進程(集群中每台機器都要)
ps -ef|grep kafka (查看進程PID)
kill -9 PID (可能需要root權限)
2、清空kafka 的data文件和log文件(集群中每台機器都要)
rm -rf /home/press/kafka/kafka-2.11-Cluster/kafka-2.11-1/kafka-logs/ *
3、 啟動三個zookeeper及kafka服務
cd /home/kafka/zookeeper-3.4.10/bin
./zkServer.sh restart
cd /home/kafka/kafka_2.11-0.10.2.1/bin
./kafka-server-start.sh -daemon ../config/server.properties
# 4、查詢es進程
ps -ef|grep kafka
踩坑總結:
#問題一:啟動kafaka集群,必須先要啟動zookeeper集群。
#問題二:kafka端口查看及修改,配置文件路徑: vim /home/kafka/kafka_2.11-0.10.2.1/config/server.properties IP+端口號:listeners=PLAINTEXT://10.101.22.41:9092
#問題三:配置中心的kafka配置中ip:port是kafka的端口,不是zookeeper的端口
#問題四:測試topic不同分區的消費的性能時,建議從分區數小開始(分區數不可修改變小,只能刪除后新建topic重新分區)
#問題五:消費者數量和topic分區數不是倍數時,會出現消費不均的情況(正常現象)
python生成數據腳本:
#-*- coding:utf-8 -*-
# 向kafka發送數據進行測試
import sys
import time
import json
import threading, logging, time
from kafka import SimpleProducer
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.client import KafkaClient
from kafka.errors import KafkaError
#file=open('D:\\My Documents\\Desktop\\t_adId_materialUUID.txt')
#dataMat=[]
#for line in file.readlines():
#curLine=line.strip().split("\t")
# floatLine=map(float,curLine)#這里使用的是map函數直接把數據轉化成為float類型
#dataMat.append(curLine[0:2])
#print ("dataMat = " +str(len(dataMat)))
# 對應kafka的IP:端口
producer = KafkaProducer(bootstrap_servers='10.101.22.41:9092')
# 填寫對應的topic
topic = 'oth_cpd_active_realtime_data'
def test1():
# 只執行一次
a1 = time.strftime("%H:%M:%S", time.localtime())
# 每分鍾統計速率
num1 = 0
# 消息總數初始化
countSum = 0
# 發送消息的速率/min
rate = 60
while(1):
t = round(time.time() * 1000) # 當前時間戳
#print("時間戳 = "+str(t))
ideaId = 20009677 #創意id,需參數化
adsResp = '{"cfrom": "219","cvTime":' + str(t) + ',"cvType":7,"ideaId":' + str(ideaId) + ',"appId": "1695285","appPackage": "com.jzyd.coupon","changeId": "","channelType": 1,"chargeMode": -1,"cp": "23","cpdps": "20190815085536,20384,,72c19c1c35444d5aa6c1297d72d4f53b,tt-300,5.05E-4,59760801D3E3762F,afterDownloadRecommend","cvDate": "20190828","cvTs": 1565923987000,"downloadTime": 1565830560000,"imei": "864092048676953","page": "others","placeType": "afterDownloadRecommend","price": 3.8,"reqId": "72c19c1c35444d5aa6c1297d72d4f53b"}'
# 循環體
a2 = time.strftime("%H:%M:%S", time.localtime())
if (a1 == a2):
# 每分鍾消息總數
if (num1 < rate // 60):
num1 = num1 + 1;
# kafka 發送生產者消息
producer.send(topic, adsResp.encode())
# 消息總數
countSum = countSum + 1
print ('發送總的消息——計數:' + str(countSum))
#print ("end : 當前時間 = " + a2 + '__每分鍾速率=' + str(rate // 60 * 60) + '/min__每秒鍾速率current=' + str(
#num1) + "/秒__當前發送消息總數=" + str(countSum))
else:
print ('end : 當前時間 = ' + a2 + '每分鍾理論速率=' + str(rate // 60 * 60) +'/min__每秒鍾理論速率=' +str(rate//60)+'/秒__每分鍾實際速率='+ str(num1*60) +'/min__每秒鍾實際速率='+str(num1)+'/秒__當前發送消息總數=' + str(countSum))
# 每分鍾消息——清零
num1 = 0
print ("start :重新計時 = " + a2 + '__清空計數:' + str(num1))
# 重新獲取當前時間
a1 = time.strftime("%H:%M:%S", time.localtime())
print ('發送總的消息——計數:' + str(countSum))
if __name__ == '__main__':
# 循環調用
print('send to kafka start!----- ')
test1()
print('send to kafka finished! ----- ')