阿里雲kafka部署踩坑系列,連接不上等問題


1.防火牆踩坑

之前看的一些說連接不上,就關閉防火牆,不過其實是一些端口你沒有開放等。
打開防火牆端口2181、9092、3306
`
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --zone=public --add-port=3306/tcp --permanent

端口開啟之后需重新加載,記得!!

firewall-cmd --reload

查看已經開放的端口

firewall-cmd --list-ports
`
打開雲服務器端口,添加安全組規則,流程我在上一篇說有說過,這里不重復
添加3306、9092和2181三個端口,已經有了就不管

2.修改kafka配置文件

2.1 zookeeper文件修改

你自己的zookeeper文件配置目錄下的zoo.cfg(改名后的,一般都改成這個,我也不知道為什么)
server.0=xxx.xx.xx.xx:2888:3888
最后一行添加上面代碼,其中xxx為你的服務器公網ip

2.2 修改kafka配置文件

進入kafka配置目錄打開server.properties那個配置文件
原先的localhost注釋掉,該成 你的公網ip

listeners修改為內網ip,同時添加外部代理地址,為外網ip

`
listeners=PLAINTEXT://172.17.8.172:9092

外部代理地址

advertised.listeners=PLAINTEXT://106.14.250.7:9092

客戶端連接的ip地址,必須要寫成服務器的ip地址!advertised.host.name

advertised.host.name =106.14.250.7
host.name=172.17.8.172
port=9092
`
記得該成自己的公網內網ip。

最后還有一個修改地方:
原先的localhost改為公網ip

測試

創建主題:
bin/kafka-topics.sh --create --zookeeper 192.168.103.10:2181 --topic hw_data --partitions 2 --replication-factor 1
查看主題:
bin/kafka-topics.sh --list --zookeeper 192.168.103.10:2181
查看主題 詳情:
./kafka-topics.sh --describe --topic hw_data --zookeeper 192.168.103.10:2181

開啟消費者監聽
bin/kafka-console-consumer.sh --bootstrap-server 106.14.250.7:9092 --topic topic1

在springboot中生產消息
`
package com.example.demo.Kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**

  • HKj
    */
    public class Producer {
    private static final String brokerList="106.14.250.7:9092";
    private static final String topic="test";
    private static final String topic1="topic1";
    public static void main(String[] args){
    Properties properties=new Properties();
    //設置key序列化器
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    //設置重試次數
    properties.put(ProducerConfig.RETRIES_CONFIG,10);
    //設置值序列化器
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    //設置集群地址
    properties.put("bootstrap.servers",brokerList);

     KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);
     ProducerRecord<String,String> record=new ProducerRecord<>(topic1,"kafka-demo","hello,kafka");
     try{
         producer.send(record);
     }catch (Exception e){
         e.printStackTrace();
     }
     producer.close();
    

    }
    }
    `


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM