KAFKA-使用問題


本篇文檔使用kafka版本為:0.9.0.0

問題1、在現場項目中,kafka連接正常一直無數據?

  1)通常是確認配置是否正確,包含任務配置,ip端口號;

2)查看topic offset:是否有新數據進來,數據是否被消費掉了,

3)然后檢查kafka服務是否正常,查看服務是否有節點掛掉,topic配置是否做了副本,

4)如果kafka是集群,而topic沒有設置副本,那么掛掉一個節點就會導致無法拉取數據;

5)其次是網絡是否是通的,通過ping命令ping ip;

之前遇到的棘手問題,上述這些都是正常的,程序講過檢測也是沒問題的,后面發現一個現場網絡很差,情況和上述類似;

再經過抓包工具發現上述現場數據包發送成功很低,網咯很不穩定,發現網路問題導致無法接收數據。

 

問題2、Occur runtime exception:org.apache.kafka.common.errors.RecordTooLargeException:

There are some messages at [Partition=Offset]: {HIK_SMART_METADATA_TOPIC-1=8155364} whose size is larger than the fetch size 1048576 and hence cannot be ever returned.

Increase the fetch size, or decrease the maximum message size the broker will allow.

問題原因:消費下級Kafka數據,存在單條數據大小大於1048576 bytes (1M),造成工具無法消費該數據,該區縣數據消費也會一直阻塞在這

(上述單條數據大於1M僅個別數據)

解決辦法:

配置項中消費端增加kafka配置項  max.partition.fetch.bytes=1572864  (1.5M)

  • max.partition.fetch.bytes:每次從單個分區中拉取的消息最大尺寸(byte),默認為1M

 

 

問題3 、錯誤信息 :java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

         at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:706)

         at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:453)

         at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)

         at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.sendRecordToKafka(KafkaToKafkaStep.java:453)

         at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep$KafkaToKafkaGroupTask.run(KafkaToKafkaStep.java:204)

         at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

         at java.util.concurrent.FutureTask.run(Unknown Source)

         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

         at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

問題原因: 定位原因是發送數據到上級Kafka時,producer.send(record)  執行邏輯是從服務端獲取metadata信息,然后send數據 ,

若數據量過大、或者發送過於頻繁等原因,更新metadata信息有可能超時(60s),捕獲異常,然后重新發送,異常捕獲后會打印出發送失敗的數據,並打印異常信息

 

問題4

區縣kafka集群是8台服務器,而HIK_SMART_METADATA_TOPIC partion僅在兩台服務器上,重啟另外幾台kafka,仍是如此,其他topic無此現象

 

解決辦法:

1、重建topic,需要topic數據全部被消費

2、均衡topic

kafka副本相關.docx

問題5

日志報如下錯誤

2018-06-01 18:52:39 ERROR [node_2_Worker-18] core.JobRunShell (run:211) - Job com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.node_2_(kafka_BoLe)_(kafka_sanhui) threw an unhandled Exception:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.startStep(KafkaToKafkaStep.java:126)
at com.hikvision.bigdata.hwf.workflow.node.steps.AbstractWorkflowNodeStep.execute(AbstractWorkflowNodeStep.java:143)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Unable to establish loopback connection
at org.apache.kafka.common.network.Selector.<init>(Selector.java:98)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:122)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:272)
... 5 more
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.<init>(Unknown Source)
at sun.nio.ch.SelectorProviderImpl.openPipe(Unknown Source)
at java.nio.channels.Pipe.open(Unknown Source)
at sun.nio.ch.WindowsSelectorImpl.<init>(Unknown Source)
at sun.nio.ch.WindowsSelectorProvider.openSelector(Unknown Source)
at java.nio.channels.Selector.open(Unknown Source)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:96)
... 7 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): bind
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)
at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)

 

問題結論:  本機tcp連接被占用,導致無法建立tcp(socket)連接

方法1:重啟服務器釋放連接解決

方法2: 調整本機tcp連接數設置,增大buffer大小,找出tcp連接未被釋放原因

cmd

netstat -an

 

netstat -an中state含義
LISTEN:偵聽來自遠方的TCP端口的連接請求
SYN-SENT:再發送連接請求后等待匹配的連接請求
SYN-RECEIVED:再收到和發送一個連接請求后等待對方對連接請求的確認
ESTABLISHED:代表一個打開的連接
FIN-WAIT-1:等待遠程TCP連接中斷請求,或先前的連接中斷請求的確認
FIN-WAIT-2:從遠程TCP等待連接中斷請求
CLOSE-WAIT:等待從本地用戶發來的連接中斷請求
CLOSING:等待遠程TCP對連接中斷的確認
LAST-ACK:等待原來的發向遠程TCP的連接中斷請求的確認
TIME-WAIT:等待足夠的時間以確保遠程TCP接收到連接中斷請求的確認
CLOSED:沒有任何連接狀態

有關kafka offset的一些方法:

查詢topic的offset的范圍
用下面命令可以查詢到topic:test broker:suna:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2

輸出

test:0:1288

查詢offset的最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -1

輸出

test:0:7885

從上面的輸出可以看出topic:test只有一個partition:0 offset范圍為:[1288,7885]

設置consumer group的offset
啟動zookeeper client

/zookeeper/bin/zkCli.sh

通過下面命令設置consumer group:testgroup topic:test partition:0的offset為1288:

set /consumers/testgroup/offsets/test/0 1288

注意如果你的kafka設置了zookeeper root,比如為/kafka,那么命令應該改為:

set /kafka/consumers/testgroup/offsets/test/0 1288

重啟相關的應用程序,就可以從設置的offset開始讀數據了。

手動更新Kafka存在Zookeeper中的偏移量。我們有時候需要手動將某個主題的偏移量設置成某個值,這時候我們就需要更新Zookeeper中的數據了。Kafka內置為我們提供了修改偏移量的類:kafka.tools.UpdateOffsetsInZK,我們可以通過它修改Zookeeper中某個主題的偏移量,具體操作如下:

 


[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

 

 

在不輸入參數的情況下,我們可以得知kafka.tools.UpdateOffsetsInZK類需要輸入的參數。我們的consumer.properties文件配置內容如下:

 


zookeeper.connect=www.iteblog.com:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=group

 


這個工具只能把Zookeeper中偏移量設置成earliest或者latest,如下:

[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK \

    earliest config/consumer.properties iteblog
updating partition 0 with new offset: 276022922
updating partition 1 with new offset: 234360148
updating partition 2 with new offset: 157237157
updating partition 3 with new offset: 106968019
updating partition 4 with new offset: 80696130
updating partition 5 with new offset: 317144986
updating partition 6 with new offset: 299182459
updating partition 7 with new offset: 197012246
updating partition 8 with new offset: 230433681
updating partition 9 with new offset: 120971431
updating partition 10 with new offset: 51200673
updated the offset
for 11 partitions

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM