一.軟件版本
1.linux:centos6
2.zookeeper:zookeeper-3.4.1
3.kafka:kafka_2.12-2.2.0
4.jdk:1.8
5.instelliJ Idea
二.環境准備
1.關閉防火牆:service iptables stop
2.zookeeper:復制conf目錄下的zoo_sample.cfg配置文件並改名為zoo.cfg
3.kafka:修改配置文件conf/server.properties中
①broker.id=123(broker.id每個kafka的都不一樣,必須唯一)
②advertised.listeners=PLAINTEXT://192.168.56.101:9092(對應自己的linux的ip地址)
③zookeeper.connect=127.0.0.1:2181(本機默認端口是2181,有需要到zookeeper的zoo.cfg文件中修改)
三.軟件setup&start
1.zookeeper:zookeeper-3.4.14/bin目錄下執行
①./zkServer.sh start ../conf/zoo.cfg
2.kafka:kafka_2.12-2.2.0/bin目錄下執行
①./kafka-server-start.sh ../config/server.properties
四.創建topic&發送消息&消費消息
1.創建topic “test”,kafka_2.12-2.2.0/bin目錄下執行
①./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
②驗證是否創建成功:./kafka-topics.sh --list --bootstrap-server localhost:9092
③開啟消息生產者的console,kafka_2.12-2.2.0/bin目錄下執行,並發送幾條信息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
④開啟消息消費者的console,kafka_2.12-2.2.0/bin目錄下執行
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
以上步驟沒出現問題表示kafka服務器端以及搭建完畢。
五.Spring boot2 kafka通信
1.創建topic:TopicConfiguration
package com.aaron.kafka.springboot.configuration; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.util.StringUtils; import java.util.HashMap; import java.util.Map; @Configuration public class TopicConfiguration { @Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(new String[]{"192.168.56.101:9092"})); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("test", 1, (short) 2); } }
2.spring boot2中建立kafka的消息生產者:KafkaProducerConfiguration
package com.aaron.kafka.springboot.configuration;
import com.aaron.kafka.springboot.listener.MessageListenerImpl;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.KafkaHeaders;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfiguration {
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// props.put(KafkaHeaders.TOPIC, "test");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
3.建立測試代碼:KafkaTest
package com.aaron.kafka.springboot; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; @RunWith(SpringRunner.class) @SpringBootTest public class KafkaTest { @Resource private KafkaTemplate kafkaTemplate; @Test public void sendMsg(){ kafkaTemplate.send("test","hello","world"); } }
運行測試結果如下:
4.Spring boot2中建立消費者和消息監聽器:kafkaConsumerConfiguration和MessageListenerImpl
package com.aaron.kafka.springboot.configuration; import com.aaron.kafka.springboot.listener.MessageListenerImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration public class kafkaConsumerConfiguration { @Bean public MessageListenerImpl messageListener(){ return new MessageListenerImpl(); } @Bean public KafkaMessageListenerContainer kafkaMessageListenerContainer(){ ContainerProperties containerProps = new ContainerProperties("test"); containerProps.setMessageListener(messageListener()); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; } /** * 設置consumer的properties * @return */ private Map<String,Object> consumerProps(){ Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put("bootstrap.servers","192.168.56.101:9092"); consumerProps.put("group.id","123"); consumerProps.put("enable.auto.commit","true"); consumerProps.put("auto.commit.interval.ms","1000"); consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return consumerProps; } }
package com.aaron.kafka.springboot.listener; import org.springframework.kafka.listener.MessageListener; public class MessageListenerImpl implements MessageListener { @Override public void onMessage(Object o) { System.out.println(o.toString()); } }
關閉服務器上的消費者然后啟動springBoot
在服務器上的producer中輸入文本 Hello World
Idea 中的console顯示如下:
六.kafka的集群配置
1.復制配置文件server.properties改名為server-1.properties、server-2.properties
修改配置文件中
broker.id=121
log.dirs=/tmp/kafka-logs-1
advertised.listeners=PLAINTEXT://192.168.56.101:9093
(server-2.properties中的則為:
broker.id=122
log.dirs=/tmp/kafka-logs-2
advertised.listeners=PLAINTEXT://192.168.56.101:9094)
2.運行方式啟動kafka,kafka_2.12-2.2.0/bin下執行
./kafka-server-start
.sh ../config
/server-1
.properties
./kafka-server-start
.sh ../config
/server-2
.properties
后台方式運行命令如下:
nohup ./kafka-server-start
.sh ../config
/server-1
.properties &
nohup ./kafka-server-start
.sh ../config
/server-2
.properties &
注意:如果出現
則配置文件(server-1.properties和server-2.properties)中打開注釋並修改為:listeners=PLAINTEXT://192.168.56.101:9093(server-2.properties的為:listeners=PLAINTEXT://192.168.56.101:9094)