1.防火牆踩坑
之前看的一些說連接不上,就關閉防火牆,不過其實是一些端口你沒有開放等。
打開防火牆端口2181、9092、3306
`
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --zone=public --add-port=3306/tcp --permanent
端口開啟之后需重新加載,記得!!
firewall-cmd --reload
查看已經開放的端口
firewall-cmd --list-ports
`
打開雲服務器端口,添加安全組規則,流程我在上一篇說有說過,這里不重復
添加3306、9092和2181三個端口,已經有了就不管
2.修改kafka配置文件
2.1 zookeeper文件修改
你自己的zookeeper文件配置目錄下的zoo.cfg(改名后的,一般都改成這個,我也不知道為什么)
server.0=xxx.xx.xx.xx:2888:3888
最后一行添加上面代碼,其中xxx為你的服務器公網ip
2.2 修改kafka配置文件
進入kafka配置目錄打開server.properties那個配置文件
原先的localhost注釋掉,該成 你的公網ip
listeners修改為內網ip,同時添加外部代理地址,為外網ip
`
listeners=PLAINTEXT://172.17.8.172:9092
外部代理地址
advertised.listeners=PLAINTEXT://106.14.250.7:9092
客戶端連接的ip地址,必須要寫成服務器的ip地址!advertised.host.name
advertised.host.name =106.14.250.7
host.name=172.17.8.172
port=9092
`
記得該成自己的公網內網ip。
最后還有一個修改地方:
原先的localhost改為公網ip
測試
創建主題:
bin/kafka-topics.sh --create --zookeeper 192.168.103.10:2181 --topic hw_data --partitions 2 --replication-factor 1
查看主題:
bin/kafka-topics.sh --list --zookeeper 192.168.103.10:2181
查看主題 詳情:
./kafka-topics.sh --describe --topic hw_data --zookeeper 192.168.103.10:2181
開啟消費者監聽
bin/kafka-console-consumer.sh --bootstrap-server 106.14.250.7:9092 --topic topic1
在springboot中生產消息
`
package com.example.demo.Kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
-
HKj
*/
public class Producer {
private static final String brokerList="106.14.250.7:9092";
private static final String topic="test";
private static final String topic1="topic1";
public static void main(String[] args){
Properties properties=new Properties();
//設置key序列化器
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//設置重試次數
properties.put(ProducerConfig.RETRIES_CONFIG,10);
//設置值序列化器
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//設置集群地址
properties.put("bootstrap.servers",brokerList);KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties); ProducerRecord<String,String> record=new ProducerRecord<>(topic1,"kafka-demo","hello,kafka"); try{ producer.send(record); }catch (Exception e){ e.printStackTrace(); } producer.close();
}
}
`