介紹:
kafka是高性能的消息中間件,利用zookeeper做分布式協調,實現集群化擴展
關鍵詞:topic,partition,replication,offset
安裝使用:
下載安裝包但是一定要注意版本問題不然springboot代碼沒有反應,kafka-clients為kafka的broker版本號:https://spring.io/projects/spring-kafka
低版本使用zookeeper做存儲,高版本使用自身做存儲metadata,命令有所差異。
啟動zookeeper命令:
bin/zookeeper-server-start.sh <-daemon 非交互式啟動> config/zookeeper.properties
修改server.properties配置,復制多個配置文件:
#是否允許刪除topic,默認false不能手動刪除 delete.topic.enable=true #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣 broker.id=0 #當前kafka服務偵聽的地址和端口,端口默認是9092 listeners = PLAINTEXT://192.168.100.21:9092 #這個是borker進行網絡處理的線程數 num.network.threads=3 #這個是borker進行I/O處理的線程數 num.io.threads=8 #發送緩沖區buffer大小,數據不是一下子就發送的,先會存儲到緩沖區到達一定的大小后在發送,能提高性能 socket.send.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤 socket.receive.buffer.bytes=102400 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 socket.request.max.bytes=104857600 #消息日志存放的路徑 log.dirs=/opt/module/kafka_2.11-1.1.0/logs #默認的分區數,一個topic默認1個分區數 num.partitions=1 #每個數據目錄用來日志恢復的線程數目 num.recovery.threads.per.data.dir=1 #默認消息的最大持久化時間,168小時,7天 log.retention.hours=168 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.segment.bytes=1073741824 #每隔300000毫秒去檢查上面配置的log失效時間 log.retention.check.interval.ms=300000 #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能 log.cleaner.enable=false #設置zookeeper的連接端口 zookeeper.connect=node21:2181,node22:2181,node23:2181 #設置zookeeper的連接超時時間 zookeeper.connection.timeout.ms=6000
listeners=PLAINTEXT://192.168.121.132:9092 (--暴露服務,否則連接超時)
broker.id=2
(以下同一個機器需要修改):
port=9094
log.dirs=/tmp/kafka-logs-2
啟動kafka命令:
bin/kafka-server-start.sh <-daemon 非交互式啟動> config/server.properties
停止命令:
bin/kafka-server-stop.sh config/server.properties
創建低版本topic:(replication-factor副本值):
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
查詢topic詳情:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic-name
查詢所有topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
修改topic參數配置:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
刪除topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
高版本:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic-name
創建生產者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
創建消費者(有非必須參數,分區與consumer之間的關系:一個分區不能分給兩個consumer,但是兩個分區可以分給一個consumer):
bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic topic-name --from-beginning --group testgroup
查詢組:
springboot中使用kafka的配置:
1 #kafka集群配置 2 spring.kafka.bootstrap-servers=192.168.121.132:9092 3 #重試次數 4 spring.kafka.producer.retries=0 5 #應答級別:多少個分區副本備份完成時向生產者發送ack確認(可選0,1,all/-1) 6 spring.kafka.producer.acks=0 7 #批量大小 8 spring.kafka.producer.batch-size=16384 9 #提交延時 10 spring.kafka.producer.properties.linger.ms=0 11 #當生產端積累的消息達到batch-size或者接收信息linger.ms后,生產者就會將消息提交個kafka 12 #linger.ms為0表示每接收到一條信息就會提交給kafka,這時候batch-size其實就沒有用了 13 #生產端緩沖區大小 14 spring.kafka.producer.buffer-memory=33554432 15 #kafka提供的序列化和反序列化類 16 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 17 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 18 19 ###############消費者配置########## 20 #默認的消費組id(同一消息,可以被不同組重復使用) 21 spring.kafka.consumer.properties.group.id=defaultConsumerGroup 22 #是否自動提交offset 23 spring.kafka.consumer.enable-auto-commit=true 24 #提交offset延時(接收到信息后多久提交offset) 25 spring.kafka.consumer.auto-commit-interval=1000 26 #earliest:獲取上一次消費的offset繼續消費或者從頭開始消費(重復消費)28 #latest:獲取上一次消費的offset繼續消費或者最新的offset消費(丟失消費) 29 #none:必須要有上次消費的offset,否則報錯 30 spring.kafka.consumer.auto-offset-reset=latest 31 #消費會話超時時間(超過這個時間consumer沒有發送心跳,就會觸發rebalance操作) 32 spring.kafka.consumer.properties.session.timeout.ms=120000 33 #消費請求超時時間 34 spring.kafka.consumer.properties.request.timeout.ms=180000 35 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 36 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer 37 #消費端監聽的topic不存在時,項目啟動會報錯 38 spring.kafka.listener.missing-topics-fatal=false
消費者:
@Component @Slf4j public class KafkaCustomerService { @KafkaListener(topics = "hello-world") //定義此消費者接收topic為“hello-world”的消息,監聽服務器上的kafka是否有相關的消息發過來 //record變量代表消息本身,可以通過ConsumerRecord<?,?>類型的record變量來打印接收的消息的各種信息 public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); } }
生產者:
@Service @Slf4j public class KafkaProductService { @Autowired KafkaTemplate kafkaTemplate; private void sendMsg(){ kafkaTemplate.send("hello-world","welcom to kafka"); } }