簡述
本文記錄logstash的output配置為kafka的過程。這里是簡單的例子,輸入為stdin,本文主要目的是為了記錄在這次配置過程中遇到的問題和解決的過程及總結。
關於kafka集群的搭建可以參考:https://www.cnblogs.com/ldsggv/p/11010497.html
一、logstash的conf文件配置
input{ stdin {} } output{ stdout { codec => rubydebug } kafka { bootstrap_servers => "192.168.183.195:9092,192.168.183.194:9092,192.168.183.196:9092" #生產者 codec => json topic_id => "kafkalogstash" #設置寫入kafka的topic } }
這里配置完成之后,如果kafka集群沒有問題,那么啟動logstash,就可以測試發送消息了;
啟動:
bin/logstash -f logstash-kafka.conf
然后等待啟動,
當提示:
[INFO ] 2019-06-11 17:52:51.163 [[main]-pipeline-manager] AppInfoParser - Kafka version : 2.1.0 [INFO ] 2019-06-11 17:52:51.164 [[main]-pipeline-manager] AppInfoParser - Kafka commitId : eec43959745f444f [INFO ] 2019-06-11 17:52:51.342 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xa43495c sleep>"} The stdin plugin is now waiting for input: [INFO ] 2019-06-11 17:52:51.444 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [INFO ] 2019-06-11 17:52:51.708 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9601}
此時啟動成功
然后輸入消息,正常的輸出為下圖,在kafka集群也能看到對應的topic信息,也能通過kafka-console-consumer.sh消費消息
456 { "@timestamp" => 2019-06-11T10:20:09.615Z, "host" => "emr-worker-4.cluster-96380", "@version" => "1", "message" => "456" } [INFO ] 2019-06-11 18:20:10.642 [kafka-producer-network-thread | producer-1] Metadata - Cluster ID: S8sBZgHPRJOv-nULn_bVGw { "@timestamp" => 2019-06-11T11:48:11.234Z, "host" => "emr-worker-4.cluster-96380", "@version" => "1", "message" => "" }
上面是正確的輸出結果,但是我從一開始是沒有成功的,輸出為:
[INFO ] 2019-06-11 17:53:33.558 [kafka-producer-network-thread | producer-1] Metadata - Cluster ID: S8sBZgHPRJOv-nULn_bVGw [ERROR] 2019-06-11 17:53:33.581 [kafka-producer-network-thread | producer-1] Sender - [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for connection 2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201] [ERROR] 2019-06-11 17:53:33.586 [kafka-producer-network-thread | producer-1] Sender - [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for connection 2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201] [ERROR] 2019-06-11 17:53:33.586 [kafka-producer-network-thread | producer-1] Sender - [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for connection 2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
問題分析:
根據提示可以看出是連接的問題,通過ping 和telnet都可以確定網絡可以連通,那么問題出在哪里了啊?
解決:
首先上面的問題是出現在線上搭建的時候,之后我再本地實驗沒有任何問題,之后再線上的kafka集群內下載了logstahs,相同的配置也OK,
此時可以確認是我線上的logstash配置有問題,但是確認不到問題的位置
之后下載了kafka-client-2.1.0,在本地查看出現問題的代碼位置,才確認問題;
根據上面的提示跟蹤代碼
代碼是在 initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?] 這里拋出了異常:
private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup); InetAddress address = this.connectionStates.currentAddress(nodeConnectionId); log.debug("Initiating connection to node {} using address {}", node, address); selector.connect(nodeConnectionId, new InetSocketAddress(address, node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.warn("Error connecting to node {}", node, e); } }
進入selector.connect()方法中可以看到,根據注釋很快就確定了原因:
從圖中的位置可以看到如果DNS在hostname解析失敗,或者broker down掉,那么就會拋出這個異常,但是我的集群肯定沒問題,那么就是DNS的問題,此時聯想我本地的集群和線上的kafka集群都是由於都是集群都配置各自的/etc/hosts,但是線上logstash的機器在其他區域下,因此如果kafka的Producer通過dns解析本地路由然后和kafka集群通信的時候,如果本地解析不到就會報錯,
Producer通過一個線程在一直跑,因為上面的錯誤日志是永遠打印不完的,
因此解決的方法就是:
配置/etc/hosts;里面的內容是kafka集群的各個hostname和ip的對應關系,配置完成,重新啟動logstash,運行正常。
總結:
在此疑惑的一點是,我明明配置了 bootstrap_servers => "192.168.183.195:9092,192.168.183.194:9092,192.168.183.196:9092" 那為什么還是dns解析不到地址,還是說producer在發型消息的時候沒有使用我的配置?
為了進一步確認問題,我自己在本地跟蹤和代碼:
首先在Producer通過new KafkaProducer<>(props)方式創建的時候,會創建一個KafkaProducer實例,同時會newSender,Sender implements Runnable,之后會啟動一個線程會在后台運行,將消息發送給kafka集群
因此主要看Sender的run方法就行,跟蹤run方法有一個方法是sendProducerData(now);這個是發送數據,還有一個是client.poll(pollTimeout, now) 主要看這倆個方法就OK,
1 private long sendProducerData(long now) { 2 Cluster cluster = metadata.fetch(); 3 // get the list of partitions with data ready to send 4 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); 5 6 // if there are any partitions whose leaders are not known yet, force metadata update 7 if (!result.unknownLeaderTopics.isEmpty()) { 8 // The set of topics with unknown leader contains topics with leader election pending as well as 9 // topics which may have expired. Add the topic again to metadata to ensure it is included 10 // and request metadata update, since there are messages to send to the topic. 11 for (String topic : result.unknownLeaderTopics) 12 this.metadata.add(topic); 13 14 log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", 15 result.unknownLeaderTopics); 16 this.metadata.requestUpdate(); 17 } 18 19 // remove any nodes we aren't ready to send to 20 Iterator<Node> iter = result.readyNodes.iterator(); 21 long notReadyTimeout = Long.MAX_VALUE; 22 while (iter.hasNext()) { 23 Node node = iter.next(); 24 if (!this.client.ready(node, now)) { 25 iter.remove(); 26 notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); 27 } 28 } 29 30 // create produce requests 31 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); 32 addToInflightBatches(batches); 33 if (guaranteeMessageOrder) { 34 // Mute all the partitions drained 35 for (List<ProducerBatch> batchList : batches.values()) { 36 for (ProducerBatch batch : batchList) 37 this.accumulator.mutePartition(batch.topicPartition); 38 } 39 } 40 41 accumulator.resetNextBatchExpiryTime(); 42 List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); 43 List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); 44 expiredBatches.addAll(expiredInflightBatches); 45 46 // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics 47 // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why 48 // we need to reset the producer id here. 49 if (!expiredBatches.isEmpty()) 50 log.trace("Expired {} batches in accumulator", expiredBatches.size()); 51 for (ProducerBatch expiredBatch : expiredBatches) { 52 String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition 53 + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; 54 failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false); 55 if (transactionManager != null && expiredBatch.inRetry()) { 56 // This ensures that no new batches are drained until the current in flight batches are fully resolved. 57 transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); 58 } 59 } 60 sensors.updateProduceRequestMetrics(batches); 61 62 // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately 63 // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry 64 // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet 65 // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data 66 // that aren't ready to send since they would cause busy looping. 67 long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); 68 pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); 69 pollTimeout = Math.max(pollTimeout, 0); 70 if (!result.readyNodes.isEmpty()) { 71 log.trace("Nodes with data ready to send: {}", result.readyNodes); 72 // if some partitions are already ready to be sent, the select time would be 0; 73 // otherwise if some partition already has some data accumulated but not ready yet, 74 // the select time will be the time difference between now and its linger expiry time; 75 // otherwise the select time will be the time difference between now and the metadata expiry time; 76 pollTimeout = 0; 77 } 78 sendProduceRequests(batches, now); 79 return pollTimeout; 80 }
public List<ClientResponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); return responses; }
從上面代碼看poll方法里面會更新一次kafka的元數據,在更新完成之后,元數據里面的node信息,就不在是自己配置的bootstrap_servers ,而是集群node的信息,此時,再去運行sendProducerData的方法的時候在這個方法的24行,進入ready方法,之后執行initiateConnect方法,也就是最開始出現問題的方法,在這里的node信息是集群里面的leader信息,sender發送信息也只會和leader進行交流,此時會根據配置的hostname解析ip,如果運行producer的機器沒有配置,那就會出現上面的問題
從上面的debug截圖可以看出來,node確實從kafka集群同步的node信息,由於kafka集群對外的都是leader,所以producer不會使用配置的bootstrap_servers,而是通過它發現集群的信息,然后根據集群的信息,來確定和哪個node通信。其實producer和sender的創建很復雜,這里之后撿了和這個問題相關的說了,有興趣的同學可以跟一根源碼,會很有收獲喲
問題到這里就解決了,究其原因還是因為配置的問題導致的,但是經過這個問題,查看kafka client的代碼,對kafka的理解更深了一步。算是因禍得福吧《^_^》