一、安裝zookeeper
1、下載zookeeper
下載路徑:http://mirrors.hust.edu.cn/apache/zookeeper/
版本選擇 apache-zookeeper-3.7.0-bin,bin里邊的文件內容比較全,否則有可能出現“找不到或無法加載主類 org.apache.zookeeper.server.quorum.QuorumPeerMain”新版本zookeeper易犯錯誤
2、解壓文件
解壓文件至目錄 D:\MyFolder\SoftWare\Zookeeper\Instail
3、進入Zookeeper設置目錄更新配置文件
- 在D:\MyForder\SoftWare\Zookeeper\Instail\apache-zookeeper-3.7.0-bin目錄下創建data,和log文件夾
- conf目錄下復制“zoo_sample.cfg”重命名為“zoo.cfg”,打開文件,編輯:
dataDir=D:\\MyForder\\SoftWare\\Zookeeper\\Instail\\apache-zookeeper-3.7.0-bin\\data
dataLogDir=D:\\MyForder\\SoftWare\\Zookeeper\\Instail\\apache-zookeeper-3.7.0-bin\\log
4、修改zookeeper端口(我這邊改成2222),可不修改,默認2181
5、啟動zookeeper
測試是否啟動成功:
點擊zkCli.cmd,如果出現以上字樣則表示啟動成功
二、安裝kafKa
1、下載安裝包
下載地址:http://kafka.apache.org/downloads.html
版本選擇2.12-2.8.1
2、解壓文件
解壓文件至目錄D:\Kafka\kafka_2.12-2.8.1(kafka安裝目錄不要放的太深,否則有可能在cmd中執行啟動命令時候報指令符太長的錯誤)
在D:\Kafka\kafka_2.12-2.8.1\config目錄下打開打開server.properties
- 編輯:log.dirs=D:\\Kafka\\kafkalogs(此目錄需要自己創建,此目錄的作用是記錄kafka日志)
- 編輯:zookeeper.connect=localhost:2222(2222對應zookeeper設置端口)
3)修改Kafka端口(我這邊不做修改),可不修改,默認9092
4、運行Kafka
打開dos窗口,將當前目錄切換至=D:\Kafka\kafka_2.12-2.8.1\bin\windows
執行命令kafka-server-start.bat ../../config/server.properties
執行命令前保證zookeeper已正常運行
三、簡單使用案例
1、創建springboot項目:
引入pom依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml配置:
server:
port: 8081
servlet:
context-path: /framedemo
kafka:
bootstrap-servers: kafka's-ip:9092
#=============== provider =======================
producer:
retries: 0
# 每次批量發送消息的數量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式,StringSerializer支持Json對象的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默認消費者group id
consumer:
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
# 指定消息key和消息體的編解碼方式,StringDeserializer支持Json對象的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
配置類:
@Configuration
public class KafkaInitialConfiguration {
// 創建一個名為testtopic的Topic並設置分區數為8,分區副本數為2
@Bean
public NewTopic initialTopic() {
return new NewTopic("testtopic",8, (short) 1 );
}
// 如果要修改分區數,只需修改配置值重啟項目即可
// 修改分區數並不會導致數據的丟失,但是分區數只能增大不能減小
/**
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}
**/
}
消息提供者:
@RestController
@RequestMapping("/KafkaProducer")
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
// 發送消息
@RequestMapping("/normal")
public void sendMessage1() {
String normalMessage="my test";
kafkaTemplate.send("topic1", normalMessage);
}
}
消息消費者:
@Component
public class KafkaConsumer {
// 消費監聽
@KafkaListener(groupId = "test-consumer-group",topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消費的哪個topic、partition的消息,打印出消息內容
System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}