KafKa(下載,安裝,簡單使用教程)


 

一、安裝zookeeper

1、下載zookeeper

下載路徑:http://mirrors.hust.edu.cn/apache/zookeeper/

版本選擇 apache-zookeeper-3.7.0-binbin里邊的文件內容比較全,否則有可能出現“找不到或無法加載主類 org.apache.zookeeper.server.quorum.QuorumPeerMain”新版本zookeeper易犯錯誤

2、解壓文件
解壓文件至目錄  D:\MyFolder\SoftWare\Zookeeper\Instail

3、進入Zookeeper設置目錄更新配置文件

  •    在D:\MyForder\SoftWare\Zookeeper\Instail\apache-zookeeper-3.7.0-bin目錄下創建data,和log文件夾
  •    conf目錄下復制“zoo_sample.cfg”重命名為“zoo.cfg”,打開文件,編輯:

      dataDir=D:\\MyForder\\SoftWare\\Zookeeper\\Instail\\apache-zookeeper-3.7.0-bin\\data

      dataLogDir=D:\\MyForder\\SoftWare\\Zookeeper\\Instail\\apache-zookeeper-3.7.0-bin\\log

4、修改zookeeper端口(我這邊改成2222),可不修改,默認2181

5、啟動zookeeper

  

 

  測試是否啟動成功:

 

 點擊zkCli.cmd,如果出現以上字樣則表示啟動成功

 

二、安裝kafKa

1、下載安裝包

下載地址:http://kafka.apache.org/downloads.html

版本選擇2.12-2.8.1

2、解壓文件

解壓文件至目錄D:\Kafka\kafka_2.12-2.8.1(kafka安裝目錄不要放的太深,否則有可能在cmd中執行啟動命令時候報指令符太長的錯誤)

在D:\Kafka\kafka_2.12-2.8.1\config目錄下打開打開server.properties

  • 編輯:log.dirs=D:\\Kafka\\kafkalogs(此目錄需要自己創建,此目錄的作用是記錄kafka日志)
  • 編輯:zookeeper.connect=localhost:2222(2222對應zookeeper設置端口)

3)修改Kafka端口(我這邊不做修改),可不修改,默認9092

4、運行Kafka
打開dos窗口,將當前目錄切換至=D:\Kafka\kafka_2.12-2.8.1\bin\windows

執行命令kafka-server-start.bat ../../config/server.properties

執行命令前保證zookeeper已正常運行

 

 

 

三、簡單使用案例

 

1、創建springboot項目:

引入pom依賴:

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

application.yml配置:

 

server:
port: 8081
servlet:
context-path: /framedemo

kafka:
bootstrap-servers: kafka's-ip:9092
#=============== provider =======================
producer:
retries: 0
# 每次批量發送消息的數量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式,StringSerializer支持Json對象的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默認消費者group id
consumer:
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
# 指定消息key和消息體的編解碼方式,StringDeserializer支持Json對象的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
配置類:
@Configuration
public class KafkaInitialConfiguration {
// 創建一個名為testtopic的Topic並設置分區數為8,分區副本數為2
@Bean
public NewTopic initialTopic() {
return new NewTopic("testtopic",8, (short) 1 );
}
// 如果要修改分區數,只需修改配置值重啟項目即可
// 修改分區數並不會導致數據的丟失,但是分區數只能增大不能減小
/**
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}
**/
}
消息提供者:
@RestController
@RequestMapping("/KafkaProducer")
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
// 發送消息
@RequestMapping("/normal")
public void sendMessage1() {
String normalMessage="my test";
kafkaTemplate.send("topic1", normalMessage);
}
}
消息消費者:
@Component
public class KafkaConsumer {
// 消費監聽
@KafkaListener(groupId = "test-consumer-group",topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消費的哪個topic、partition的消息,打印出消息內容
System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM