spring boot2 kafka


一.软件版本

  1.linux:centos6

  2.zookeeper:zookeeper-3.4.1

  3.kafka:kafka_2.12-2.2.0

  4.jdk:1.8
  5.instelliJ Idea

二.环境准备

  1.关闭防火墙:service iptables stop

  2.zookeeper:复制conf目录下的zoo_sample.cfg配置文件并改名为zoo.cfg

  3.kafka:修改配置文件conf/server.properties中

      ①broker.id=123(broker.id每个kafka的都不一样,必须唯一)

      ②advertised.listeners=PLAINTEXT://192.168.56.101:9092(对应自己的linux的ip地址)

      ③zookeeper.connect=127.0.0.1:2181(本机默认端口是2181,有需要到zookeeper的zoo.cfg文件中修改)

三.软件setup&start

  1.zookeeper:zookeeper-3.4.14/bin目录下执行

    ①./zkServer.sh start ../conf/zoo.cfg

  2.kafka:kafka_2.12-2.2.0/bin目录下执行
    ①./kafka-server-start.sh ../config/server.properties

四.创建topic&发送消息&消费消息

  1.创建topic “test”,kafka_2.12-2.2.0/bin目录下执行

    ①./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    ②验证是否创建成功:./kafka-topics.sh --list --bootstrap-server localhost:9092

      

    ③开启消息生产者的console,kafka_2.12-2.2.0/bin目录下执行,并发送几条信息

      ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

      

    ④开启消息消费者的console,kafka_2.12-2.2.0/bin目录下执行

      ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

      

    以上步骤没出现问题表示kafka服务器端以及搭建完毕。

五.Spring boot2 kafka通信

  1.创建topic:TopicConfiguration

package com.aaron.kafka.springboot.configuration;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TopicConfiguration {
    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(new String[]{"192.168.56.101:9092"}));
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("test", 1, (short) 2);
    }

}

 

  2.spring boot2中建立kafka的消息生产者:KafkaProducerConfiguration

 
 
package com.aaron.kafka.springboot.configuration;

import com.aaron.kafka.springboot.listener.MessageListenerImpl;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.KafkaHeaders;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfiguration {

@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// props.put(KafkaHeaders.TOPIC, "test");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}

}
    

  3.建立测试代码:KafkaTest 

package com.aaron.kafka.springboot;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {

    @Resource
    private KafkaTemplate kafkaTemplate;

    @Test
    public void sendMsg(){
        kafkaTemplate.send("test","hello","world");
    }
}

  运行测试结果如下:

  4.Spring boot2中建立消费者和消息监听器:kafkaConsumerConfiguration和MessageListenerImpl  

package com.aaron.kafka.springboot.configuration;

import com.aaron.kafka.springboot.listener.MessageListenerImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class kafkaConsumerConfiguration {

    @Bean
    public MessageListenerImpl messageListener(){
        return new MessageListenerImpl();
    }

    @Bean
    public KafkaMessageListenerContainer kafkaMessageListenerContainer(){
        ContainerProperties containerProps = new ContainerProperties("test");
        containerProps.setMessageListener(messageListener());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());
        KafkaMessageListenerContainer<Integer, String> container =
                new KafkaMessageListenerContainer<>(cf, containerProps);
        return container;
    }


    /**
     * 设置consumer的properties
     * @return
     */
    private Map<String,Object> consumerProps(){
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put("bootstrap.servers","192.168.56.101:9092");
        consumerProps.put("group.id","123");
        consumerProps.put("enable.auto.commit","true");
        consumerProps.put("auto.commit.interval.ms","1000");
        consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return consumerProps;
    }
}
package com.aaron.kafka.springboot.listener;
import org.springframework.kafka.listener.MessageListener;

public class MessageListenerImpl implements MessageListener {
    @Override
    public void onMessage(Object o) {
        System.out.println(o.toString());
    }
}

关闭服务器上的消费者然后启动springBoot

在服务器上的producer中输入文本 Hello World

Idea 中的console显示如下:

 六.kafka的集群配置

  1.复制配置文件server.properties改名为server-1.properties、server-2.properties

    修改配置文件中

      broker.id=121

      log.dirs=/tmp/kafka-logs-1

      advertised.listeners=PLAINTEXT://192.168.56.101:9093

      (server-2.properties中的则为:

        broker.id=122

        log.dirs=/tmp/kafka-logs-2

        advertised.listeners=PLAINTEXT://192.168.56.101:9094

  2.运行方式启动kafka,kafka_2.12-2.2.0/bin下执行

     ./kafka-server-start.sh ../config/server-1.properties

    ./kafka-server-start.sh ../config/server-2.properties

    后台方式运行命令如下:

    nohup ./kafka-server-start.sh ../config/server-1.properties &

    nohup ./kafka-server-start.sh ../config/server-2.properties &

  注意:如果出现

  则配置文件(server-1.properties和server-2.properties)中打开注释并修改为:listeners=PLAINTEXT://192.168.56.101:9093(server-2.properties的为:listeners=PLAINTEXT://192.168.56.101:9094)


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM