本篇文檔使用kafka版本為:0.9.0.0
問題1、在現場項目中,kafka連接正常一直無數據?
1)通常是確認配置是否正確,包含任務配置,ip端口號;
2)查看topic offset:是否有新數據進來,數據是否被消費掉了,
3)然后檢查kafka服務是否正常,查看服務是否有節點掛掉,topic配置是否做了副本,
4)如果kafka是集群,而topic沒有設置副本,那么掛掉一個節點就會導致無法拉取數據;
5)其次是網絡是否是通的,通過ping命令ping ip;
之前遇到的棘手問題,上述這些都是正常的,程序講過檢測也是沒問題的,后面發現一個現場網絡很差,情況和上述類似;
再經過抓包工具發現上述現場數據包發送成功很低,網咯很不穩定,發現網路問題導致無法接收數據。
問題2、Occur runtime exception:org.apache.kafka.common.errors.RecordTooLargeException:
There are some messages at [Partition=Offset]: {HIK_SMART_METADATA_TOPIC-1=8155364} whose size is larger than the fetch size 1048576 and hence cannot be ever returned.
Increase the fetch size, or decrease the maximum message size the broker will allow.
問題原因:消費下級Kafka數據,存在單條數據大小大於1048576 bytes (1M),造成工具無法消費該數據,該區縣數據消費也會一直阻塞在這
(上述單條數據大於1M僅個別數據)
解決辦法:
配置項中消費端增加kafka配置項 max.partition.fetch.bytes=1572864 (1.5M)
- max.partition.fetch.bytes:每次從單個分區中拉取的消息最大尺寸(byte),默認為1M
問題3 、錯誤信息 :java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:706)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:453)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.sendRecordToKafka(KafkaToKafkaStep.java:453)
at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep$KafkaToKafkaGroupTask.run(KafkaToKafkaStep.java:204)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
問題原因: 定位原因是發送數據到上級Kafka時,producer.send(record) 執行邏輯是從服務端獲取metadata信息,然后send數據 ,
若數據量過大、或者發送過於頻繁等原因,更新metadata信息有可能超時(60s),捕獲異常,然后重新發送,異常捕獲后會打印出發送失敗的數據,並打印異常信息
問題4
區縣kafka集群是8台服務器,而HIK_SMART_METADATA_TOPIC partion僅在兩台服務器上,重啟另外幾台kafka,仍是如此,其他topic無此現象
解決辦法:
1、重建topic,需要topic數據全部被消費
2、均衡topic
問題5
日志報如下錯誤
2018-06-01 18:52:39 ERROR [node_2_Worker-18] core.JobRunShell (run:211) - Job com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.node_2_(kafka_BoLe)_(kafka_sanhui) threw an unhandled Exception:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
at com.hikvision.bigdata.hwf.workflow.node.steps.impl.KafkaToKafkaStep.startStep(KafkaToKafkaStep.java:126)
at com.hikvision.bigdata.hwf.workflow.node.steps.AbstractWorkflowNodeStep.execute(AbstractWorkflowNodeStep.java:143)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Unable to establish loopback connection
at org.apache.kafka.common.network.Selector.<init>(Selector.java:98)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:122)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:272)
... 5 more
Caused by: java.io.IOException: Unable to establish loopback connection
at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
at sun.nio.ch.PipeImpl$Initializer.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at sun.nio.ch.PipeImpl.<init>(Unknown Source)
at sun.nio.ch.SelectorProviderImpl.openPipe(Unknown Source)
at java.nio.channels.Pipe.open(Unknown Source)
at sun.nio.ch.WindowsSelectorImpl.<init>(Unknown Source)
at sun.nio.ch.WindowsSelectorProvider.openSelector(Unknown Source)
at java.nio.channels.Selector.open(Unknown Source)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:96)
... 7 more
Caused by: java.net.SocketException: No buffer space available (maximum connections reached?): bind
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)
at sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)
問題結論: 本機tcp連接被占用,導致無法建立tcp(socket)連接
方法1:重啟服務器釋放連接解決
方法2: 調整本機tcp連接數設置,增大buffer大小,找出tcp連接未被釋放原因
cmd
netstat -an
netstat -an中state含義
LISTEN:偵聽來自遠方的TCP端口的連接請求
SYN-SENT:再發送連接請求后等待匹配的連接請求
SYN-RECEIVED:再收到和發送一個連接請求后等待對方對連接請求的確認
ESTABLISHED:代表一個打開的連接
FIN-WAIT-1:等待遠程TCP連接中斷請求,或先前的連接中斷請求的確認
FIN-WAIT-2:從遠程TCP等待連接中斷請求
CLOSE-WAIT:等待從本地用戶發來的連接中斷請求
CLOSING:等待遠程TCP對連接中斷的確認
LAST-ACK:等待原來的發向遠程TCP的連接中斷請求的確認
TIME-WAIT:等待足夠的時間以確保遠程TCP接收到連接中斷請求的確認
CLOSED:沒有任何連接狀態
有關kafka offset的一些方法:
查詢topic的offset的范圍
用下面命令可以查詢到topic:test broker:suna:9092的offset的最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2
輸出
test:0:1288
查詢offset的最大值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -1
輸出
test:0:7885
從上面的輸出可以看出topic:test只有一個partition:0 offset范圍為:[1288,7885]
設置consumer group的offset
啟動zookeeper client
/zookeeper/bin/zkCli.sh
通過下面命令設置consumer group:testgroup topic:test partition:0的offset為1288:
set /consumers/testgroup/offsets/test/0 1288
注意如果你的kafka設置了zookeeper root,比如為/kafka,那么命令應該改為:
set /kafka/consumers/testgroup/offsets/test/0 1288
重啟相關的應用程序,就可以從設置的offset開始讀數據了。
手動更新Kafka存在Zookeeper中的偏移量。我們有時候需要手動將某個主題的偏移量設置成某個值,這時候我們就需要更新Zookeeper中的數據了。Kafka內置為我們提供了修改偏移量的類:kafka.tools.UpdateOffsetsInZK,我們可以通過它修改Zookeeper中某個主題的偏移量,具體操作如下:
[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic
在不輸入參數的情況下,我們可以得知kafka.tools.UpdateOffsetsInZK類需要輸入的參數。我們的consumer.properties文件配置內容如下:
zookeeper.connect=www.iteblog.com:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=group
這個工具只能把Zookeeper中偏移量設置成earliest或者latest,如下:
[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK \
earliest config/consumer.properties iteblog
updating partition 0 with new offset: 276022922
updating partition 1 with new offset: 234360148
updating partition 2 with new offset: 157237157
updating partition 3 with new offset: 106968019
updating partition 4 with new offset: 80696130
updating partition 5 with new offset: 317144986
updating partition 6 with new offset: 299182459
updating partition 7 with new offset: 197012246
updating partition 8 with new offset: 230433681
updating partition 9 with new offset: 120971431
updating partition 10 with new offset: 51200673
updated the offset
for 11 partitions
