spring boot2 kafka


一.軟件版本

  1.linux:centos6

  2.zookeeper:zookeeper-3.4.1

  3.kafka:kafka_2.12-2.2.0

  4.jdk:1.8
  5.instelliJ Idea

二.環境准備

  1.關閉防火牆:service iptables stop

  2.zookeeper:復制conf目錄下的zoo_sample.cfg配置文件並改名為zoo.cfg

  3.kafka:修改配置文件conf/server.properties中

      ①broker.id=123(broker.id每個kafka的都不一樣,必須唯一)

      ②advertised.listeners=PLAINTEXT://192.168.56.101:9092(對應自己的linux的ip地址)

      ③zookeeper.connect=127.0.0.1:2181(本機默認端口是2181,有需要到zookeeper的zoo.cfg文件中修改)

三.軟件setup&start

  1.zookeeper:zookeeper-3.4.14/bin目錄下執行

    ①./zkServer.sh start ../conf/zoo.cfg

  2.kafka:kafka_2.12-2.2.0/bin目錄下執行
    ①./kafka-server-start.sh ../config/server.properties

四.創建topic&發送消息&消費消息

  1.創建topic “test”,kafka_2.12-2.2.0/bin目錄下執行

    ①./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    ②驗證是否創建成功:./kafka-topics.sh --list --bootstrap-server localhost:9092

      

    ③開啟消息生產者的console,kafka_2.12-2.2.0/bin目錄下執行,並發送幾條信息

      ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

      

    ④開啟消息消費者的console,kafka_2.12-2.2.0/bin目錄下執行

      ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

      

    以上步驟沒出現問題表示kafka服務器端以及搭建完畢。

五.Spring boot2 kafka通信

  1.創建topic:TopicConfiguration

package com.aaron.kafka.springboot.configuration;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TopicConfiguration {
    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(new String[]{"192.168.56.101:9092"}));
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("test", 1, (short) 2);
    }

}

 

  2.spring boot2中建立kafka的消息生產者:KafkaProducerConfiguration

 
         
package com.aaron.kafka.springboot.configuration;

import com.aaron.kafka.springboot.listener.MessageListenerImpl;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.KafkaHeaders;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfiguration {

@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// props.put(KafkaHeaders.TOPIC, "test");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}

}
    

  3.建立測試代碼:KafkaTest 

package com.aaron.kafka.springboot;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {

    @Resource
    private KafkaTemplate kafkaTemplate;

    @Test
    public void sendMsg(){
        kafkaTemplate.send("test","hello","world");
    }
}

  運行測試結果如下:

  4.Spring boot2中建立消費者和消息監聽器:kafkaConsumerConfiguration和MessageListenerImpl  

package com.aaron.kafka.springboot.configuration;

import com.aaron.kafka.springboot.listener.MessageListenerImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class kafkaConsumerConfiguration {

    @Bean
    public MessageListenerImpl messageListener(){
        return new MessageListenerImpl();
    }

    @Bean
    public KafkaMessageListenerContainer kafkaMessageListenerContainer(){
        ContainerProperties containerProps = new ContainerProperties("test");
        containerProps.setMessageListener(messageListener());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());
        KafkaMessageListenerContainer<Integer, String> container =
                new KafkaMessageListenerContainer<>(cf, containerProps);
        return container;
    }


    /**
     * 設置consumer的properties
     * @return
     */
    private Map<String,Object> consumerProps(){
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put("bootstrap.servers","192.168.56.101:9092");
        consumerProps.put("group.id","123");
        consumerProps.put("enable.auto.commit","true");
        consumerProps.put("auto.commit.interval.ms","1000");
        consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return consumerProps;
    }
}
package com.aaron.kafka.springboot.listener;
import org.springframework.kafka.listener.MessageListener;

public class MessageListenerImpl implements MessageListener {
    @Override
    public void onMessage(Object o) {
        System.out.println(o.toString());
    }
}

關閉服務器上的消費者然后啟動springBoot

在服務器上的producer中輸入文本 Hello World

Idea 中的console顯示如下:

 六.kafka的集群配置

  1.復制配置文件server.properties改名為server-1.properties、server-2.properties

    修改配置文件中

      broker.id=121

      log.dirs=/tmp/kafka-logs-1

      advertised.listeners=PLAINTEXT://192.168.56.101:9093

      (server-2.properties中的則為:

        broker.id=122

        log.dirs=/tmp/kafka-logs-2

        advertised.listeners=PLAINTEXT://192.168.56.101:9094

  2.運行方式啟動kafka,kafka_2.12-2.2.0/bin下執行

     ./kafka-server-start.sh ../config/server-1.properties

    ./kafka-server-start.sh ../config/server-2.properties

    后台方式運行命令如下:

    nohup ./kafka-server-start.sh ../config/server-1.properties &

    nohup ./kafka-server-start.sh ../config/server-2.properties &

  注意:如果出現

  則配置文件(server-1.properties和server-2.properties)中打開注釋並修改為:listeners=PLAINTEXT://192.168.56.101:9093(server-2.properties的為:listeners=PLAINTEXT://192.168.56.101:9094)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM