kafka的安裝及簡單使用方式(一)


介紹:
  kafka是高性能的消息中間件,利用zookeeper做分布式協調,實現集群化擴展
  關鍵詞:topic,partition,replication,offset
安裝使用:
  

下載安裝包但是一定要注意版本問題不然springboot代碼沒有反應,kafka-clients為kafka的broker版本號:https://spring.io/projects/spring-kafka

  低版本使用zookeeper做存儲,高版本使用自身做存儲metadata,命令有所差異。
  啟動zookeeper命令:
    bin/zookeeper-server-start.sh <-daemon 非交互式啟動> config/zookeeper.properties
  修改server.properties配置,復制多個配置文件:

#是否允許刪除topic,默認false不能手動刪除
delete.topic.enable=true
#當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
broker.id=0
#當前kafka服務偵聽的地址和端口,端口默認是9092
listeners = PLAINTEXT://192.168.100.21:9092
#這個是borker進行網絡處理的線程數
num.network.threads=3
#這個是borker進行I/O處理的線程數
num.io.threads=8
#發送緩沖區buffer大小,數據不是一下子就發送的,先會存儲到緩沖區到達一定的大小后在發送,能提高性能
socket.send.buffer.bytes=102400
#kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤
socket.receive.buffer.bytes=102400
#這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
socket.request.max.bytes=104857600
#消息日志存放的路徑
log.dirs=/opt/module/kafka_2.11-1.1.0/logs
#默認的分區數,一個topic默認1個分區數
num.partitions=1
#每個數據目錄用來日志恢復的線程數目
num.recovery.threads.per.data.dir=1
#默認消息的最大持久化時間,168小時,7天
log.retention.hours=168
#這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.segment.bytes=1073741824
#每隔300000毫秒去檢查上面配置的log失效時間
log.retention.check.interval.ms=300000
#是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
log.cleaner.enable=false
#設置zookeeper的連接端口
zookeeper.connect=node21:2181,node22:2181,node23:2181
#設置zookeeper的連接超時時間
zookeeper.connection.timeout.ms=6000

    listeners=PLAINTEXT://192.168.121.132:9092 (--暴露服務,否則連接超時)
    broker.id=2
    (以下同一個機器需要修改):
    port=9094
    log.dirs=/tmp/kafka-logs-2

  啟動kafka命令:
    bin/kafka-server-start.sh <-daemon 非交互式啟動> config/server.properties
  停止命令:
    bin/kafka-server-stop.sh config/server.properties
  創建低版本topic:(replication-factor副本值):
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
  查詢topic詳情:
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic-name
  查詢所有topic:
    bin/kafka-topics.sh --list --zookeeper localhost:2181
  修改topic參數配置:
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
  刪除topic:
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
  高版本:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic-name
  創建生產者:
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

  創建消費者(有非必須參數,分區與consumer之間的關系:一個分區不能分給兩個consumer,但是兩個分區可以分給一個consumer):
    bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic topic-name --from-beginning --group testgroup
  查詢組:

springboot中使用kafka的配置:

 1 #kafka集群配置
 2 spring.kafka.bootstrap-servers=192.168.121.132:9092
 3 #重試次數
 4 spring.kafka.producer.retries=0
 5 #應答級別:多少個分區副本備份完成時向生產者發送ack確認(可選0,1,all/-1 6 spring.kafka.producer.acks=0
 7 #批量大小
 8 spring.kafka.producer.batch-size=16384
 9 #提交延時
10 spring.kafka.producer.properties.linger.ms=0
11 #當生產端積累的消息達到batch-size或者接收信息linger.ms后,生產者就會將消息提交個kafka
12 #linger.ms為0表示每接收到一條信息就會提交給kafka,這時候batch-size其實就沒有用了
13 #生產端緩沖區大小
14 spring.kafka.producer.buffer-memory=33554432
15 #kafka提供的序列化和反序列化類
16 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
17 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
18 
19 ###############消費者配置##########
20 #默認的消費組id(同一消息,可以被不同組重復使用)
21 spring.kafka.consumer.properties.group.id=defaultConsumerGroup
22 #是否自動提交offset
23 spring.kafka.consumer.enable-auto-commit=true
24 #提交offset延時(接收到信息后多久提交offset)
25 spring.kafka.consumer.auto-commit-interval=1000
26 #earliest:獲取上一次消費的offset繼續消費或者從頭開始消費(重復消費)28 #latest:獲取上一次消費的offset繼續消費或者最新的offset消費(丟失消費)
29 #none:必須要有上次消費的offset,否則報錯
30 spring.kafka.consumer.auto-offset-reset=latest
31 #消費會話超時時間(超過這個時間consumer沒有發送心跳,就會觸發rebalance操作)
32 spring.kafka.consumer.properties.session.timeout.ms=120000
33 #消費請求超時時間
34 spring.kafka.consumer.properties.request.timeout.ms=180000
35 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
36 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
37 #消費端監聽的topic不存在時,項目啟動會報錯
38 spring.kafka.listener.missing-topics-fatal=false

  消費者:

@Component
@Slf4j
public class KafkaCustomerService {
@KafkaListener(topics = "hello-world")
//定義此消費者接收topic為“hello-world”的消息,監聽服務器上的kafka是否有相關的消息發過來
//record變量代表消息本身,可以通過ConsumerRecord<?,?>類型的record變量來打印接收的消息的各種信息
public void listen (ConsumerRecord<?, ?> record) throws Exception 
  {
         System.out.printf("topic = %s, offset = %d, value = %s \n",     
         record.topic(), record.offset(), record.value());
    }

}

生產者:

@Service
@Slf4j
public class KafkaProductService {
@Autowired
KafkaTemplate kafkaTemplate;
private void sendMsg(){
    kafkaTemplate.send("hello-world","welcom to kafka");
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM