相關環境搭建(centos7下搭建單機kafka)
1、官網下載[kafka][http://kafka.apache.org/]
tar -xzf kafka_2.12-2.6.0.tgz
cd kafka_2.13-2.6.0
2、修改配置文件(conf下面service.properties中advertised.listeners)
# 允許外部端口連接
listeners=PLAINTEXT://0.0.0.0:9092
# 外部代理地址
advertised.listeners=PLAINTEXT://192.168.0.175:9092
3、通過守護進程啟動命令
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
環境配置好之后,下面進入測試。
4、創建一個主題
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
5、將事件/消息寫入主題(創建生產者)
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>this is test!
按crtl+c可退出當前輸入模式
6、消費
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
springboot集成kafka
1、新建工程,添加pom
<!--引入kafak和spring整合的jar-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、application.yml添加kafka相關配置:
spring:
application:
name: cloud-kafka
kafka:
bootstrap-servers: 192.168.0.175:9092
producer: # producer 生產者
retries: 0 # 重試次數
acks: 1 # 應答級別:多少個分區副本備份完成時向生產者發送ack確認(可選0、1、all/-1)
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 生產端緩沖區大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: # consumer消費者
group-id: test-consumer-group # 默認的消費組ID
enable-auto-commit: true # 是否自動提交offset
auto-commit-interval: 100 # 提交offset延時(接收到消息后多久提交offset)
auto-offset-reset: latest
# 當kafka中沒有初始offset或offset超出范圍時將自動重置offset;
# earliest:重置為分區中最小的offset;
# latest:重置為分區中最新的offset(消費分區中新產生的數據);
# none:只要有一個分區不存在已提交的offset,就拋出異常;
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、生產者(發送者)
@RestController
public class KafkaProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/normal/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
Message message = new Message();
message.setId(UUID.randomUUID().toString());
message.setSendTime(new Date());
message.setMessage(msg);
kafkaTemplate.send("test", JSONUtil.toJsonStr(message));
}
}
4、消費者(接受者)
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = {"test"})
public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("record:{}", consumerRecord);
logger.info("message:{}", msg);
}
}
}
5、實體類
public class Message {
private String id;
private String message;
private Date sendTime;
// getter setter 略
}
上面示例創建了一個生產者,發送消息到test,消費者監聽test消費消息。監聽器用@KafkaListener注解,topics表示監聽的topic,支持同時監聽多個,用英文逗號分隔。啟動項目,postman調接口觸發生產者發送消息。
同時查看日志信息:
2020-11-09 17:28:08.530 INFO 15076 --- [ntainer#0-0-C-1] com.example.service.KafkaConsumer : record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1604914088509, serialized key size = -1, serialized value size = 87, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"})
2020-11-09 17:28:08.530 INFO 15076 --- [ntainer#0-0-C-1] com.example.service.KafkaConsumer : message:{"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"}
可以看到消費成功。
更詳細內容,請參考:https://blog.csdn.net/yuanlong122716/article/details/105160545/