為了實現遠程kafka通信,我可謂是嘔心瀝血。期間各種bug各種調,太煎熬了 (T.T)
介紹:
我用一台虛擬機作為遠程消息的發送方,用本地電腦主機作為消息的接收方
虛擬機:安裝java,kafka,zookeeper
主機:eclipse,注意我沒有說在主機上也要安裝kafka的
1、虛擬機部署
1)下載kafka_2.11-2.2.0 我用的最新的(當前)
2)解壓到 /usr/local/ ,注意切換都root,不然后面編輯不了文件
3)配置文件 kafka/config/server.properties 只用修改下面三個
稍微解釋下:上面的ip都是一個,都是虛擬機ip,修改后可以在本機接收消息也可以在遠程(本地電腦或者其他電腦接收)
不知道虛擬機ip? 在命令行下 敲 ifconfig就可以找到了
2、本地eclipse
1)新建maven工程
pom.xml 注意里面的kafka版本最好和遠程對應(其他版本有可能發生錯誤,收不到消息)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hadoop</groupId> <artifactId>eclipseandmaven</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>eclipseandmaven</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> <!-- 本地測試注釋集群運行打開 --> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
建立 MainTopology.java
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; public class MainTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //ip設置為虛擬機ip,后面的topic要和虛擬機上的一樣 KafkaSpoutConfig.Builder<String, String> kafkaBuilder = KafkaSpoutConfig.builder("192.168.83.133:9092","test561");// 設置kafka屬於哪個組 kafkaBuilder.setGroupId("testgroup"); // 創建kafkaspoutConfig KafkaSpoutConfig<String, String> build = kafkaBuilder.build(); // 通過kafkaspoutConfig獲得kafkaspout KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(build); // 設置5個線程接收數據 builder.setSpout("kafkaSpout", kafkaSpout, 5); // 設置2個線程處理數據 builder.setBolt("printBolt", new PrintBolt(), 2).localOrShuffleGrouping("kafkaSpout"); Config config = new Config(); if (args.length > 0) { // 集群提交模式 config.setDebug(false); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { // 本地測試模式 config.setDebug(true); // 設置2個進程 config.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafkaSpout", config, builder.createTopology()); } } }
建立 PrintBolt.java
import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; public class PrintBolt extends BaseBasicBolt { /** * execute會被storm一直調用 * * @param tuple * @param basicOutputCollector */ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { // 為了便於查看消息用err標紅 System.err.println(tuple.getValue(4)); System.err.println(tuple.getValues()); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
3、運行
切換到kafka安裝目錄
1)啟動zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2)啟動kafka服務
bin/kafka-server-start.sh -daemon config/server.properties
3)創建生產者
bin/kafka-console-producer.sh --broker-list 192.168.83.133:9092 --topic test561
4)創建消費者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.83.133:9092 --topic test561 --from-beginning
5)啟動本地eclipse項目
6)在3)中的窗口發送字符串
7)4)中可以收到消息,同時本地也可以收到消息
4、問題羅列
1)再次使用發現啟動不了------殺進程
ps -ef | grep kafka kill -9 kafka的pid ps -ef | grep zookeeper kill -9 zookeeper的pid
2)收不到消息是不是防火牆的原因
進行遠程telnet測試(如果不報錯就可以用,不用改動什么了,否則要把虛擬機防火牆關閉或者開放端口 下面有連接 )
telnet 192.168.83.133 9092
3)自己安裝的zookeeper和kafka自帶的不能混用
我自己安裝了一個然后還設置了自啟動,然后每次運行kafka自帶的zookeeper時總是啟動不了消費者。。。。。。
之后我把它刪了只用kafka自帶的就可以了。
4)jdk版本不適合
java版本我原先用的openjdk1.7,后來重新下載了一個jdk1.8安裝的,
然后下載時要登錄,就找了一個(謝謝共享)
name:2696671285@qq.com
pwd:Oracle123
5)還有。。。到以后再總結吧
參考:
https://blog.csdn.net/luozhonghua2014/article/details/80369469?utm_source=blogxgwz5
https://blog.csdn.net/wxgxgp/article/details/85701844
防火牆:
https://blog.csdn.net/feeltouch/article/details/21830541
https://kiddwyl.iteye.com/blog/67708