關於消息的發布與訂閱,之前一直使用的是activeMQ基於JMS的消息隊列進行操作的,最近聽說有一個更高效的消息的發布與訂閱技術,就是Kafka。
關於kafka的介紹,在這里就不做過多講解了,因為我自己也不是很了解,大概就知道它與activeMQ一樣,都是具有生產者和消費者的發布與訂閱消息的機制。
具體請參見百度百科Apache Kafka。
今天我想說的就是,初遇kafka所踩的坑,非常大的坑!!
今天第一次學習Kafka,參考的是ORCHome網上的資料。
具體使用,我這里不過多介紹,具體講我遇到的問題。因為是自學,我采用的是在centOS6.5的虛擬機上安裝的Kafka,由於新版的Kafka自帶有zookeeper,所以就直接使用了。
當我按照教程啟動玩Kafka后,並且在虛擬機服務器里面是可以正常操作,可是使用JavaAPI遠程進行操作的時候,便一直報連接異常!
Java代碼:
package site.wangxin520.kafkatest; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "http://192.168.211.129:9092"); //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. //“所有”設置將導致記錄的完整提交阻塞,最慢的,但最持久的設置。 props.put("acks", "all"); //如果請求失敗,生產者也會自動重試,即使設置成0 the producer can automatically retry. props.put("retries", 0); //The producer maintains buffers of unsent records for each partition. props.put("batch.size", 16384); //默認立即發送,這里這是延時毫秒數 props.put("linger.ms", 1); //生產者緩沖大小,當緩沖區耗盡后,額外的發送調用將被阻塞。時間超過max.block.ms將拋出TimeoutException props.put("buffer.memory", 33554432); //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //創建kafka的生產者類 Producer<String, String> producer = new KafkaProducer<String, String>(props); //生產者的主要方法 producer.send(new ProducerRecord<String, String>("show", "測試Kafka")); producer.close(); } }
代碼沒問題,但是每次運行就會拋一個time out 異常,總是連接失敗。
java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323) at org.apache.kafka.common.network.Selector.poll(Selector.java:291) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148) at java.lang.Thread.run(Thread.java:745)
解決辦法
- 這里需要注意的是,因為是遠程連接服務器,所以要看服務器的防火牆是否針對端口9092(默認端口)打開的,剛開始弄了很長時間,我一直沒弄好的原因是因為中午我重啟了服務器,導致防火牆又打開了。
- 如果防火牆是正常的,就需要改變Kafka的配置:在/config/service.properties中,添加上一句host.name=192.168.211.129
這主要是因為,kafka默認是監聽localhost的端口,如果不配置新端口名的話,就解析監聽不到消息。
現在重新啟動一下,看看是不是已經解決了。
在kafka安裝目錄,啟動自帶的zookeeper服務:
bin/zookeeper-server-start.sh config/zookeeper.properties
在同一個地方,啟動kafka服務
bin/kafka-server-start.sh config/server.properties
使用消費者客戶端,監聽show的topic,驗證是否已經啟動了Kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic show --from-beginning
沒有報錯,並且現在服務器端已經在監聽狀態
啟動Java客戶端,控制台沒有報錯
並且在服務器端顯示了剛剛在Java客戶端發送的消息。
解決成功!