kafka安裝和使用遠程代碼進行訪問 ---附踩坑記錄


kafka安裝和使用java連接遠程服務器進行消息的生成與消費

首先要使用kafka,要有jdk和zookeeper的環境

本文在阿里雲的centos7環境上進行

jdk版本選擇的是1.8.0_181

zookeeper的版本是3.4.12

kafka的版本是2.12-1.1.1

關於kafka命令的介紹 本文不介紹了 只介紹怎么搭建一個kafka單點服務器 以及怎么使用代碼 遠程連接kafka服務器

下載地址

kafka下載地址 :http://kafka.apache.org/downloads

zookeeper下載地址:https://zookeeper.apache.org/

jdk下載地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

操作步驟

1、首先 使用tar命令對jdk進行解壓
	tar -zxvf tar -zxvf jdk-8u181-linux-x64.tar.gz 
	目錄下面會多出一個jdk1.8.0_181  進入里面去  使用pwd命令查看絕對路徑  並且復制找個路徑
	最后進行jdk環境變量的配置
	編輯 vim /etc/profile文件
	在文件后面加上:
	export JAVA_HOME=(剛才pwd命令看到的路徑)
	export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/			lib/	tools.jar
	export PATH=$PATH:${JAVA_HOME}/bin
	
	最后使用source /etc/profile 刷新文件
	
	使用java -version 查看環境變量是否配置成功
	
2、成功之后進行zookeeper的安裝
	
	使用 tar -zxvf zookeeper-3.4.12.tar.gz 接下下載好的zookeeper安裝包
	
	將zookeeper下的/conf/zookeeper.example改名成zoo.cfg
	使用mv 和cp命令都可以  然后vim這個文件 加上下面兩行
	dataLogDir=/tmp/zookeeper-log #日志路徑
	quorumListenOnAllIPs=true #在阿里雲的服務器上保證外網可以訪問到  剛開始沒設置這個折騰了好久
3、最后,安裝kafka
	使用 tar -zxvf kafka_2.12-1.1.1.tgz 解壓下載好的kafka
	cd 到解壓后的文件里面去  編輯配置文件  vim config/server.properties
	加上下面幾行
	listeners=PLAINTEXT://:9092
	advertised.host.name=阿里雲服務器公網ip # 
	advertised.port=9092
	
	將zookeeper.connect的值改為阿里雲的公網ip

至此,所有的環境的安裝已經完成,下面使用kafka的命令進行消息的生成和消費

	首先cd到zookeeper的bin目錄下  使用 ./zkServer.sh start 啟動zookeeper
	再cd到kafka的bin目錄下 使用 ./kafka-server-start.sh ../config/server.properties 啟動kafka
	
	新建一個會話或者打開一個新的終端  
	這時候使用jps命令  可以看到 Kafka和QuorumPeerMain表示啟動全部成功,下面創建一個主題
	cd到kafka的bin目錄下面,執行
	./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --		partitions 1 --topic Hello-world
	
	輸出Created topic "Hello-world".  表示topic創建成功
	使用./kafka-topics.sh --list --zookeeper localhost:2181 查看主題的列表
	輸出里面會含有Hello-world
	
	下面進行消息的生產和消費
	先啟動生產者 ./kafka-console-producer.sh --broker-list 阿里雲公網ip:9092 --topic Hello-		world
	會出現一個 >  類似於交互界面 這時候就可以生產消息了
	
	啟動消費者 ./kafka-console-consumer.sh --zookeeper 阿里雲公網ip:2181 --topic Hello-		world --from-beginning 
	
	這時候當生產者生產消息的時候  消費者這邊就可以看到了  

在服務器上面進行消息的生產和消費就完成了 下面介紹怎么使用java代碼進行遠程連接kafka服務器

這個地方真的踩了好多好多坑、有次晚上下班搞到了快兩點 百度、谷歌、維基、Stack Overflow 能找解決問題的地方都找了浪費了好多不必要的時間

	首先、新建一個Maven工程(此處不再多描述),在pom文件中加入kafka的依賴
	<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.0</version>
    </dependency>
    
    新建一個KafkaProducerDemo和KafkaConsumerDemo類(名字可以自定義):
    話不多說  上代碼
    
    KafkaProducerDemo類:
	
  			public class KafkaProducerDemo {
    			public static void main(String[] args) {
    				//創建properties文件
        			Properties properties = new Properties();
					//設置kafka服務器地址
			       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里雲公網ip:9092");
			       //設置key進行序列化
			       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
			       //設置value進行序列化
			       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
			       //創建消息生產者
			       KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
			       //創建消息實體 制定主題、key、value
			       ProducerRecord<String,String> record = new ProducerRecord<>("Hello-world","haha","from java client");
			       //發送消息
			       producer.send(record);
			       System.out.println("消息發送成功");
			       //關閉生產者
			       producer.close();
			
			    }
			}
		
		
	KafkaConsumerDemo類:
   
	 public class KafkaConsumerDemo {
	
	    public static void main(String[] args) {
	        //新建配置文件
	        Properties properties = new Properties();
	        //設置kafka服務器地址
	        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"阿里雲公網ip:9092");
	        //設置key的反序列化
	        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
	        //設置value的反序列化
	        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
	        //設置groupid
	        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
	
	        //創建消費者對象
	        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
	        //訂閱主題
	        consumer.subscribe(Arrays.asList("Hello-world"));
	
	        while (true) {
	            //消費消息
	            ConsumerRecords<String, String> records = consumer.poll(100);
	            for (ConsumerRecord<String, String> record : records)
	
	               System.out.println("消息的主題是:" + record.topic()+",消息的key是:" + record.key()+",消息的value是:"+record.value());
	        }
	    }
	}
      	
	
	
	
	上面就是連接kafka遠程服務器代碼

但是上述過程做完之后還是不能正確運行、這個地方折騰了好久、最后在哪里看到解決的辦法記不大清了
就是要阿里雲服務器服務安全設置里面加個規則 將2181和9092端口開放就可以,但是我中間也使用命令的方式
關閉了防火牆、沒什么用,不知道什么鬼。 搞得我頭皮發麻


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM