kafka 項目實戰


 

一、動態修改消息體。

1、默認消息體大小 1 M

2、修改某一個topic消息體大小為3M

## 修改參數
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test --config max.message.bytes=3000000

## 刪除參數
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test --delete-config max.message.bytes=3000000

3、參數詳解

broker: message.max.bytes

設置kafka broker可接受消息體的大小。它限制了可以發送的最大消息的大小和生產者可以在一個請求中發送的消息數。例如,默認的最大請求大小為1 MB,可以發送的最大消息為1 MB,或者生產者可以將1,024個大小為1 KB的消息批量處理為一個請求。


partition:replica.fetch.max.bytes
其決定了每個分區可以返回到消費者的最大大小。顯然,此參數必須大於message.max.bytes
api
另外,對於對接kafka的應用程序,其api需要調整其生產或消費的大小,對於消費者,修改fetch.message.max.bytes 屬性。

 

 

二、動態操作topic

## 動態調整partition數量
./kafka-topics.sh –zookeeper localhost:2181 -alter –partitions 5 –topic topicName


## 查看執行topic信息
./kafka-topics.sh –zookeeper localhost:2181 --describe –topic topicName


## 刪除topic
./kafka-topics.sh –zookeeper localhost:2181 --delete --topic topicName ## 需刪除zookeeper節點信息,並刪除數據目錄中topic的信息

 

 

三、遷移partitions

## 創建json文件,將要執行遷移的topic寫入文件中
{
    "topics":[
        {
            "topic":"topic_1"
        },
        {
            "topic":"topic_2"
        }
    ],
    "version":1
}


## 獲取請求方案 .
/bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --topics-to-move-json-file think_tank-to-move.json --broker-list "101,102,103,104,105" --generate --broker-lsit 的參數 "101,102,103,104,105"是指集群中每個broker的id,由於我們是需要將所有topic均勻分配到擴完結點的5台機器上,所以要指定。同理,當業務改變為將原來的所有數據從舊節點(01,102,103)遷移到新節點(104105)實現數據平滑遷移,這時的參數應"104,105".
## 將返回結果中的建議粘貼到json文件

think_tank_reassignment.json

## 使用腳本執行遷移 ./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file think_tank_reassignment.json --execute ## 查看分配過程 ./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file think_tank_reassignment.json --verify

 

四、刪除遷移時失敗的任務

ls /admin/reassign_partitions
rmr ls /admin/reassign_partitions

 

五、kafka性能測試

## 生產消息
kafka-producer-perf-test.sh --messages 5000000
  --topics test_producer_perf --threads 10 --broker-list  dn1:9092,
 dn2:9092, dn3:9092

--threads 設置線程數


# 使用同步模式發送消息數據
[hadoop@dn1 ~]$ kafka-producer-perf-test-0.8.sh --messages 5000000
  --topics test_producer_perf_s2 --threads 3 --broker-list  dn1:9092,
 dn2:9092, dn3:9092 --sync

2.2 測試方法

壓力發起:kafka官方提供的自測腳本

4項指標:總共發送消息量(以MB為單位),每秒發送消息量(MB/second),發送消息總數,每秒發送消息數(records/second)

監控信息:自定義腳本來記錄服務情況,包括CPU、內存、磁盤、網絡等情況。

(1)  生產者測試腳本kafka-producer-perf-test.sh

參數說明:

--topic topic名稱,

--num-records 總共需要發送的消息數,

--record-size 每個記錄的字節數,

--throughput 每秒鍾發送的記錄數,

--producer-props bootstrap.servers=localhost:9092 發送端的配置信息

## 測試方法
消費者測試腳本kafka-consumer-perf-test.sh

參數說明:

--zookeeper 指定zookeeper的鏈接信息,

      --topic 指定topic的名稱,

--fetch-size 指定每次fetch的數據的大小,

--messages 總共要消費的消息個數

# 使用異步模式發送消息記錄
[hadoop@dn1 ~]$ kafka-producer-perf-test-0.8.sh --messages 5000000
  --topics test_producer_perf_s2 --threads 3 --broker-list  dn1:9092,
 dn2:9092, dn3:9092

# 以批處理模式發送,大小為1000條
[hadoop@dn1 ~]$ kafka-producer-perf-test-0.8.sh --messages 5000000
  --topics test_producer_perf_s2 --threads 3 --broker-list  dn1:9092,
 dn2:9092, dn3:9092 --batch-size 1000  --request-timeout-ms 100000


# 發送消息,長度為200字節
[hadoop@dn1 ~]$ kafka-producer-perf-test-0.8.sh --messages 5000000
  --topics test_producer_perf_s2 --threads 3 --broker-list  dn1:9092,
 dn2:9092, dn3:9092 --batch-size 3000  --request-timeout-ms 100000
 --message-size 200


# 用一個線程讀取數據到擁有12個分區的主題中
[hadoop@dn1 ~]$ kafka-consumer-perf-test.sh –zookeeper
 dn1:2181,dn2:2181,dn3:2181 --messages 5000000 –topic
 test_consumer_perf_p24_--group g3 --threads 1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM