springboot+kafka(centos7集群部署kafka)


 

 
1、kafka簡介
  1.1:Kafka是由Apache軟件基金會開發的一個開源流處理平台,由ScalaJava編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息

2:Kafka  是一種高吞吐量 的分布式發布訂閱消息系統,主要特性:

  2.1.:通過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
  2.2:高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。
  2.3:支持通過Kafka服務器和消費機集群來分區消息。
  2.4:支持Hadoop並行數據加載

3:基本概念
  3.1、Producer:消息生產者,向kafka broker發消息的客戶端。

  3.2、Consumer:消息消費者,向kafka broker取消息的客戶端。

  3.3、Topic:特定類型的消息流,可以理解為一個隊列。
  3.4、Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個CG只會把消息發給該CG       中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。
  3.5、Broker:已發布的消息保存在一組服務器中,稱之為Kafka集群。一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
  3.6、Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給                          consumer,不保證一個topic的整體(多個partition間)的順序。
  3.7、Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka。

4:配置准備
  4.1:准備三台Linux服務器

機器名 IP
hadoop207 192.168.168.207
hadoop208 192.168.168.208
hadoop209 192.168.168.209


    

 

 

 

  4.2:zookeeper環境准備
    由於Kafka需要通過Zookeeper進行分布式系統的協調和促進,通過Zookeeper協調Broker、生產者和消費者。所以安裝前需在每台機器上安裝好Zookeeper(Kafka自帶有Zookeeper,但一般使用外置安裝)

 4.2.1:zookeeper官網下載地址:https://zookeeper.apache.org/releases.html

 4.2.2:自行安裝jdk
    

 4.2.3:解壓zookeeper(tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/)    

 4.2.4:修改配置文件zoo.cfg

# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/opt/module/zookeeper-3.5.7/zooData # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 #######################cluster########################## #server.2=hadoop202:2888:3888 #server.3=hadoop203:2888:3888 #server.4=hadoop204:2888:3888 #server.5=hadoop205:2888:3888 #server.6=hadoop206:2888:3888 server.7=hadoop207:2888:3888 server.8=hadoop208:2888:3888 server.9=hadoop209:2888:3888

4.2.5:一台機器配置好后,可以使用rsync同步文件,也可以自定義xsyn集群分發腳本,同步到另外兩台機器(chmod 777 xsync -> xsyn /opt/module)

#!/bin/bash #1. 判斷參數個數 if [ $# -lt 1 ] then
    echo Not Enough Arguement! exit; fi #2. 遍歷集群所有機器 for host in hadoop207 hadoop208 hadoop209 do
    echo ====================  $host  ==================== #3. 遍歷所有目錄,挨個發送 for file in $@ do #4. 判斷文件是否存在 if [ -e $file ] then #5. 獲取父目錄 pdir=$(cd -P $(dirname $file); pwd) #6. 獲取當前文件的名稱 fname=$(basename $file) ssh $host "sudo mkdir -p $pdir" rsync -av $pdir/$fname $host:$pdir else
                echo $file does not exists!
        fi
    done
done

4.2.6:zookeeper集群啟動、停止腳本(vim zk.sh -> zk.sh start)

#!/bin/bash case $1 in
"start"){ for i in hadoop207 hadoop208 hadoop209 do
        echo -------------$i starting-----------------------------
        ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
    done } ;; "stop"){ for i in hadoop207 hadoop208 hadoop209 do
       echo --------------$i stoping------------------------------ 
       ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
    done } ;; "status"){ for i in hadoop207 hadoop208 hadoop209 do
       echo --------------$i status------------------------------- 
       ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
    done } ;; esac

 

 5:官網下載kafka地址:https://kafka.apache.org/downloads,解壓到/opt/module

 

 5.1:修改配置文件(cd /opt/module/kafka_2.13-3.0.0/config ->vim server.properties)
  1、 修改broker.id,確保每台機器的broker.id不一致,本文3台服務器的broker.id分別設置為1、2、3;

  2、 port默認為9092,可以根據需要進行修改,一般情況下保持3台服務器一致;

  3、 修改host.name為本機真實IP;
  4、 num.partitions默認為1,可根據集群要求進行修改,本文修改為4;
  5、 修改zookeeper.connect,其值為所有服務器上zookeeper的IP端口串,如下所示:
    zookeeper.connect=192.168.168.207:2181,192.168.168.208:2181,192.168.168.209:2181
  6、 log.dirs= 配置kafka日志目錄

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0
# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=2 #刪除 topic 功能 delete.topic.enable=true ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port
# EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092
 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/opt/module/kafka_2.13-3.0.0/logs/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=hadoop207:2181,hadoop208:2181,hadoop209:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0

6:集群啟動腳本編寫(vim kafka.sh)

#!/bin/bash case $1 in
"start"){ for i in hadoop207 hadoop208 hadoop209 do
        echo -------------$i starting-----------------------------
        ssh $i '/opt/module/kafka_2.13-3.0.0/bin/kafka-server-start.sh /opt/module/kafka_2.13-3.0.0/config/server.properties'

    done } ;; "stop"){ for i in hadoop207 hadoop208 hadoop209 do
       echo --------------$i stoping------------------------------ 
       ssh $i "/opt/module/kafka_2.13-3.0.0/bin/kafka-server-stop.sh stop"
    done } ;; esac

 6.1:jps查看啟動進程
 

6.2:kafka常用命令

1、啟動kafka服務

bin/kafka-server-start.sh config/server.properties &

2、停止kafka服務

./kafka-server-stop.sh 

3、查看所有的話題

./kafka-topics.sh --list --zookeeper localhost:9092

4、查看所有話題的詳細信息

./kafka-topics.sh --zookeeper localhost:2181 --describe

5、列出指定話題的詳細信息

./kafka-topics.sh --zookeeper localhost:2181 --describe  --topic ywb

6、刪除一個話題

./kafka-topics.sh --zookeeper localhost:2181 --delete  --topic ywb

7、創建一個叫ywb的話題,有兩個分區,每個分區3個副本

./kafka-topics.sh --zookeeper localhost:2181 --create --topic ywb--replication-factor 3 --partitions 2

 8、測試kafka發送和接收消息(啟動兩個終端)

#發送消息(注意端口號為配置文件里面的端口號)
./kafka-console-producer.sh --broker-list localhost:9092 --topic ywb
#消費消息(可能端口號與配置文件保持一致,或與發送端口保持一致)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ywb --from-beginning   #加了--from-beginning 重頭消費所有的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ywb #不加--from-beginning 從最新的一條消息開始消費

9、查看某個topic對應的消息數量

./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic ywb--time -1

10、顯示所有消費者

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

11、獲取正在消費的topic(testGroup)的group的offset

./kafka-consumer-groups.sh --describe --group testGroup --bootstrap-server localhost:9092

12、顯示消費者

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

13、查看消息的消息堆積情況(CURRENT-OFFSET -> 已消費條數; LOG-END-OFFSET -> 總條數; LAG -> 未消費條數;)

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group  testGroup

 7:kafka可視化界面下載地址:http://download.kafka-eagle.org/
  7.1:下載后解壓:tar -zxvf kafka-eagle-bin-2.0.8.tar.gz -C /opt/module

  7.2:修改配置文件(vim /opt/module/kafka-eafle-bin-2.0.8/conf/system-config.properties)
  

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1
#efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=hadoop207:2181,hadoop208:2181,hadoop209:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32

######################################
# EFAK webui port
######################################
efak.webui.port=8048

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
#efak.driver=org.sqlite.JDBC
#efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#efak.username=root
#efak.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://120.79.35.166:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

7.2:啟動(./ke.sh start

  

 

 7.3: 訪問8048端口 用戶名admin  默認密碼 123456

  

8:springboot整合kafka簡單demo
 8.1:pom.xml

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

8.2:application.yml

server:
  servlet:
    context-path: /
  port: 8088
spring:
  kafka:
    bootstrap-servers: 192.168.168.207:9092,192.168.168.208:9092,192.168.168.209:9092
    #生產者的配置,大部分我們可以使用默認的,這里列出幾個比較重要的屬性
    producer:
      #每批次發送消息的數量
      batch-size: 16
      #設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。
      retries: 0
      #producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。這項設置將和producer能夠使用的總內存相關,但並不是一個硬性的限制,因為不是producer使用的所有內存都是用於緩存。一些額外的內存會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消費者的配置
    consumer:
      #Kafka中沒有初始偏移或如果當前偏移在服務器上不再存在時,默認區最新 ,有三個選項 【latest, earliest, none】
      # latest 從最新的數據消費,也就是新產生的數據,
      auto-offset-reset: earliest
      #是否開啟自動提交
      enable-auto-commit: true
      #自動提交的時間間隔
      auto-commit-interval: 100
      #key的解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解碼方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #在/usr/local/etc/kafka/consumer.properties中有配置
      group-id: test-consumer-group

8.3:Consumer

/** * 定義一個消費者監聽topic中的消息 * @author ywb * @createdDate 2021/12/27 11:15 * @updatedDate */ @Slf4j @Component public class Consumer { private static Gson gson = new GsonBuilder().create(); @KafkaListener(topics = {"ywb"}) public void listen(ConsumerRecord<?, ?> record){ Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("consumer get message : {}",gson.toJson(message)); } } }

8.4:Producer

/**
 * @author ywb
 * @createdDate 2021/12/27 11:16
 * @updatedDate
 */
@Component
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Gson gson = new GsonBuilder().create();

    //發送消息方法
//    @Transactional(rollbackFor = RuntimeException.class)
    public void send() {
        Message message = new Message();
        message.setId("KFK_"+System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        kafkaTemplate.send("ywb", gson.toJson(message));
//        int i = 1/0;
//        new RuntimeException("error");
    }

}

8.5:Message

/**
 * @author ywb
 * @createdDate 2021/12/27 11:16
 * @updatedDate
 */
public class Message {

    private String id;

    private String msg;

    private Date sendTime;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

8.6:SendController

/**
 * @author ywb
 * @createdDate 2021/12/27 11:16
 * @updatedDate
 */
@RestController
@RequestMapping("/kafka")
public class SendController {

    @Autowired
    private Producer producer;

    @RequestMapping(value = "/send")
    public String send() {
        producer.send();
        return "{\"code\":0}";
    }
}

8.7:生產者發送請求http://localhost:8088/kafka/send,消費者開始消費

2022-01-07 16:24:40.789  INFO 38592 --- [ntainer#0-0-C-1] com.ratel.kafka.consumer.Consumer        : consumer  get  message : "{\"id\":\"KFK_1641543880749\",\"msg\":\"94a89968-40e2-49c3-ac55-4b3b97041e70\",\"sendTime\":\"Jan 7, 2022 4:24:40 PM\"}"
2022-01-07 16:24:44.380  INFO 38592 --- [ntainer#0-0-C-1] com.ratel.kafka.consumer.Consumer        : consumer  get  message : "{\"id\":\"KFK_1641543884372\",\"msg\":\"87f27450-709d-4559-91ad-72d52ee4619f\",\"sendTime\":\"Jan 7, 2022 4:24:44 PM\"}"

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM