kafka springboot (或 springcloud ) 整合



《SpringCloud Nginx 高並發核心編程》 環境搭建 - 系列

組件 鏈接地址
windows centos 虛擬機 安裝&排坑 vagrant+java+springcloud+redis+zookeeper鏡像下載(&制作詳解))
centos mysql 安裝&排坑 centos mysql 筆記(內含vagrant mysql 鏡像)
linux kafka安裝&排坑 kafka springboot (或 springcloud ) 整合
Linux openresty 安裝 Linux openresty 安裝
【必須】Linux Redis 安裝(帶視頻) Linux Redis 安裝(帶視頻)
【必須】Linux Zookeeper 安裝(帶視頻) Linux Zookeeper 安裝, 帶視頻
Windows Redis 安裝(帶視頻) Windows Redis 安裝(帶視頻)
RabbitMQ 離線安裝(帶視頻) RabbitMQ 離線安裝(帶視頻)
ElasticSearch 安裝, 帶視頻 ElasticSearch 安裝, 帶視頻
Nacos 安裝(帶視頻) Nacos 安裝(帶視頻)
【必須】Eureka Eureka 入門,帶視頻
【必須】springcloud Config 入門,帶視頻 springcloud Config 入門,帶視頻
【必須】SpringCloud 腳手架打包與啟動 SpringCloud腳手架打包與啟動
Linux 自啟動 假死自啟動 定時自啟 Linux 自啟動 假死啟動

1 Apache Kafka 簡介

Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基於zookeeper協調的分布式日志系統(也可以當做MQ系統),常見可以用於web/nginx日志、訪問日志,消息服務等等,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。

img

2 Apache Kafka 安裝

1.1 - 驗證Java是否安裝

希望你已經在你的機器上安裝了java,所以你只需使用下面的命令驗證它。

$ java -version

如果java在您的機器上成功安裝,您可以看到已安裝的Java的版本。

Linux 安裝jdk

這里需要安裝1.8以上版本

第一步:用java -version於查看是否安裝了jdk, 如果版本是對的, 則不需要重復安裝

第二步:下載需要安裝的linux版本

JDK1.8安裝包在Oracle官網的下載路徑為:

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

下載之前,需要注冊Oracle賬號。

第三步: 將安裝包上傳到Linux,並且解壓

上傳到Linux服務器,創建JDK的安裝目錄,將jdk壓縮包解壓到安裝目錄

mkdir -p /usr/local/java

tar -zxvf /usr/local/jdk-8u121-linux-x64.tar.gz -C /usr/local/java

為了方便后續的使用,和JDK版本的升級,可以為JDK建立一個統一的軟連接 /usr/jdk,命令如下:

ln -s /usr/local/java/jdk1.8.0_121/ /usr/jdk

第四步驟:檢查並且升級Linux的glibc 核心源碼包

在linux 上運行jdk 1.7及以上版本,會依賴到glibc 核心源碼包版本,其版本必須在2.4或以上。 可以通過以下命令,查看glibc核心源碼包的版本。

rpm -qi glibc

如果版本低於2.4,使用以下命令進行安裝

yum install glibc.i686

第四步:配置JDK的環境變量,並且加載環境變量

編輯linux系統配置文件

vi /etc/profile

在最后一行,追加JDK的環境變量、全局類路徑配置

export JAVA_HOME=/usr/local/java/jdk1.8.0_121

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$JAVA_HOME/bin:$PATH

添加完成后,還需要加載修改完的linux配置文件,執行下面的指令:

source /etc/profile

第六步:查看JDK是否安裝成功

使用java -version 命令,如果看到如下的輸出,則表示JDK的安裝,已經成功:

[root@localhost local]# java -version

java version "1.8.0_121"

Java(TM) SE Runtime Environment (build 1.8.0_121-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

1.2 - 驗證ZooKeeper是否安裝

  • Apache Kafka 的運行依賴了ZooKeeper,所以安裝前,需要檢查ZooKeeper是否已經安裝

  • 驗證ZooKeeper安裝命令為:

/work/zookeeper/zookeeper-1/bin/zkServer.sh  status

具體的結果如下:


[root@localhost work]# /work/zookeeper/zookeeper-1/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /work/zookeeper/zookeeper-1/bin/../conf/zoo.cfg
Mode: follower
[root@localhost work]# /work/zookeeper/zookeeper-2/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /work/zookeeper/zookeeper-2/bin/../conf/zoo.cfg
Mode: leader

下載kafka

在這里插入圖片描述

下載地址為:

http://kafka.apache.org/downloads , 瘋狂創客圈網盤也已經備好

建議下載1.1以前的版本,如果kafka_2.11-1.0.2, 安裝的時候問題比較少, 然后將kafka 安裝包上傳到 虛擬機

在這里插入圖片描述

3 單節點安裝

步驟3.2 - 解壓tar文件

現在您已經在您的機器上下載了最新版本的Kafka, 使用以下命令提取tar文件, 也就是解壓縮 -

$ cd /work/
$ tar -zxvf kafka_2.11-1.0.2.tgz
$ cd kafka_2.11-1.0.2
[root@localhost kafka_2.11-1.0.2]# ll
total 52
drwxr-xr-x 3 root root  4096 Apr  7  2020 bin
drwxr-xr-x 2 root root  4096 Apr  7  2020 config
drwxr-xr-x 2 root root  4096 Nov 23 22:23 libs
-rw-r--r-- 1 root root 32216 Apr  7  2020 LICENSE
-rw-r--r-- 1 root root   337 Apr  7  2020 NOTICE
drwxr-xr-x 2 root root  4096 Apr  7  2020 site-docs

步驟3.2 - 創建日志目錄與環境變量

[root@localhost ~]#  cd /work/kafka_2.11-1.0.2/

[root@localhostkafka_2.11-1.0.2]#  mkdir -p logs/kafka1-logs

創建環境變量 vi /etc/profile

export KAFKA_HOME=/work/kafka_2.11-1.0.2

修改配置文件:

進入kafka的config目錄下,有一個server.properties,主要修改的地方如下:

broker的全局唯一編號,不能重復
broker.id=1
監聽
listeners=PLAINTEXT://192.168.233.128:9092

advertised.listeners=PLAINTEXT://192.168.233.128:9092

日志目錄
log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs
配置zookeeper的連接(如果不是本機,需要該為ip或主機名)
zookeeper.connect=localhost:2181

vi /work/kafka_2.11-1.0.2/config/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.233.128:9092


# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

啟動Kafka 並且測試

$ nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties  2>&1 &

打印的日志信息沒有報錯,可以看到如下信息

[root@localhost ~]#  $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
[2020-11-25 21:59:42,557] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        authorizer.class.name =
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 1
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name =
        inter.broker.listener.name = null
        inter.broker.protocol.version = 1.0-IV0
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = PLAINTEXT://192.168.233.128:9092
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /work/kafka_2.11-1.0.2/logs/kafka1-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 1.0-IV0
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = null
        producer.purgatory.purge.interval.requests = 1000
        queued.max.request.bytes = -1
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.connect = localhost:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2020-11-25 21:59:42,694] INFO starting (kafka.server.KafkaServer)
[2020-11-25 21:59:42,699] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2020-11-25 21:59:42,878] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-11-25 21:59:42,886] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.version=1.8.0_11 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.home=/work/java/jdk1.8.0_11/jre (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.class.path=.:/work/java/jdk1.8.0_11/lib/dt.jar:/work/java/jdk1.8.0_11/lib/tools.jar:/work/kafka_2.11-1.0.2/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/argparse4j-0.7.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/commons-lang3-3.5.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-api-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-file-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-json-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-runtime-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-transforms-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/guava-20.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-api-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-locator-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-utils-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-core-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-databind-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-base-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-json-provider-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-module-jaxb-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.20.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.21.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.annotation-api-1.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-1.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.servlet-api-3.1.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.ws.rs-api-2.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-client-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-common-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-guava-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-media-jaxb-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-server-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-continuation-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-http-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-io-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-security-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-server-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlet-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlets-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-util-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jopt-simple-5.0.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-test-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-clients-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-log4j-appender-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-examples-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-tools-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/log4j-1.2.17.jar:/work/kafka_2.11-1.0.2/bin/../libs/lz4-java-1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/maven-artifact-3.5.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/metrics-core-2.2.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/osgi-resource-locator-1.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/plexus-utils-3.0.24.jar:/work/kafka_2.11-1.0.2/bin/../libs/reflections-0.9.11.jar:/work/kafka_2.11-1.0.2/bin/../libs/rocksdbjni-5.7.3.jar:/work/kafka_2.11-1.0.2/bin/../libs/scala-library-2.11.12.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-api-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-log4j12-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/snappy-java-1.1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/validation-api-1.1.0.Final.jar:/work/kafka_2.11-1.0.2/bin/../libs/zkclient-0.10.jar:/work/kafka_2.11-1.0.2/bin/../libs/zookeeper-3.4.10.jar (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.dir=/root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,888] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@481a996b (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,991] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2020-11-25 21:59:42,999] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,012] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,086] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1006049103f0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,094] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2020-11-25 21:59:44,369] INFO Cluster ID = 4MOhHbbzS42FdvekFfLwTQ (kafka.server.KafkaServer)
[2020-11-25 21:59:44,381] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-25 21:59:44,412] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,429] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,442] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,541] INFO Loading logs. (kafka.log.LogManager)
[2020-11-25 21:59:44,547] INFO Logs loading complete in 6 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,086] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,095] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,394] INFO Awaiting socket connections on 192.168.233.128:9092. (kafka.network.Acceptor)
[2020-11-25 21:59:45,399] INFO [SocketServer brokerId=1] Started 1 acceptor threads (kafka.network.SocketServer)
[2020-11-25 21:59:45,422] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,423] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,427] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,438] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2020-11-25 21:59:45,646] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,648] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,651] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,658] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,698] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2020-11-25 21:59:45,705] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-11-25 21:59:45,718] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2020-11-25 21:59:45,741] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-25 21:59:45,771] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-25 21:59:45,774] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-11-25 21:59:45,807] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,811] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,812] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(192.168.233.128,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2020-11-25 21:59:45,813] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-25 21:59:45,893] INFO [SocketServer brokerId=1] Started processors for 1 acceptors (kafka.network.SocketServer)
[2020-11-25 21:59:45,894] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-25 21:59:45,894] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-25 21:59:45,895] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

測試kafka

但是並不能保證Kafka已經啟動成功,輸入jps查看進程,如果可以看到Kafka進程,表示啟動成功

[hadoop@Master ~]$ jps
9173 Kafka
9462 Jps
8589 QuorumPeerMain
[hadoop@Master ~]$ jps -m
9472 Jps -m
9173 Kafka /opt/kafka/config/server.properties
8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg

創建topic

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test

參數說明:
– zookeeper:指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣

這里為 192.168.233.128:2181

– replication-factor:指定副本數量
– partitions:指定分區數量
– topic:主題名稱

[root@localhost ~]# $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

查看所有的topic信息

[hadoop@Master ~]$  $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:2181 

結果如下;


[root@localhost ~]#  $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:9092 
test

啟動測試生產者

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092  --topic test

注意,命令中的端口,是kafka的端口

執行上述命令后,就會在控制台等待鍵入消息體,直接輸入消息值(value)即可,每行(以換行符分隔)表示一條消息,如下所示。

>Hello Kafka!
>你好 kafka!

正常情況,每次回車表示觸發“發送”操作,回車后可直接使用“Ctrl + c”退出生產者控制台,再使用 kafka-console-consumer.sh 腳本驗證本次的生產情況。

啟動測試消費者

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:9092 --topic test --from-beginning

注意:

  • 1 命令中的端口,是zookeeper 的端口

  • –from-beginning參數如果有表示從最開始消費數據,舊的和新的數據都會被消費,而沒有該參數表示只會消費新產生的數據

執行效果

發送端的執行效果

[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092  --topic test
>aaa bbbb
>ccc fff
>Hello Kafka!
>你好 kafka!
>



接收端的執行效果

[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
aaa bbbb
ccc fff
Hello Kafka!
你好 kafka!

4 集群模式 節點安裝

  • config/server.properties復制三份,分別命名為server1.properties,server2.properties,server3.properties
  • 修改server1.properties
- - broker.id=1
  - listeners=PLAINTEXT://:9092
  - advertised.listeners=PLAINTEXT://192.168.233.128:9092(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 同理,修改server2.properties
- - broker.id=2
  - listeners=PLAINTEXT://:9093
  - advertised.listeners=PLAINTEXT://192.168.233.128:9093(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka2-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 同理,修改server3.properties
- - broker.id=3
  - listeners=PLAINTEXT://:9094
  - advertised.listeners=PLAINTEXT://192.168.233.128:9094(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka3-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 然后執行以下命令
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server3.properties > /work/kafka_2.11-1.0.2/logs/kafka3-logs/startup.log 2>&1 &
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server2.properties > /work/kafka_2.11-1.0.2/logs/kafka2-logs/startup.log 2>&1 &
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server1.properties > /work/kafka_2.11-1.0.2/logs/kafka1-logs/startup.log 2>&1 &
  • 通過startup.log,或者同級目錄下的server.log查看是否有報錯即可。

5 消息系統的類型

一個消息系統負責將數據從一個應用傳遞到另外一個應用,應用只需關注於數據,無需關注數據在兩個或多個應用間是如何傳遞的。分布式消息傳遞基於可靠的消息隊列,在客戶端應用和消息系統之間異步傳遞消息。有兩種主要的消息傳遞模式:點對點傳遞模式、發布-訂閱模式

5.1 點對點消息傳遞模式

在點對點消息系統中,消息持久化到一個隊列中。此時,將有一個或多個消費者消費隊列中的數據。但是一條消息只能被消費一次。當一個消費者消費了隊列中的某條數據之后,該條數據則從消息隊列中刪除。該模式即使有多個消費者同時消費數據,也能保證數據處理的順序。這種架構描述示意圖如下:

img

生產者發送一條消息到queue,只有一個消費者能收到

5.2 發布-訂閱消息傳遞模式(kafka)

在發布-訂閱消息系統中,消息被持久化到一個topic中。與點對點消息系統不同的是,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的數據,同一條數據可以被多個消費者消費,數據被消費后不會立馬刪除。在發布-訂閱消息系統中,消息的生產者稱為發布者,消費者稱為訂閱者。該模式的示例圖如下:

img

發布者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息

 如上圖所示,發布訂閱模式是一個基於消息送的消息傳送模型,改模型可以有多種不同的訂閱者。生產者將消息放入消息隊列后,隊列會將消息推送給訂閱過該類消息的消費者(類似微信公眾號)。

大部分的消息系統選用發布-訂閱模式。Kafka就是一種發布-訂閱模式

6、Kafka中的術語解釋

6.1 概述

在深入理解Kafka之前,先介紹一下Kafka中的術語。下圖展示了Kafka的相關術語以及之間的關系:

img

上圖中, 一個topic配置了3個partition。集群中的每個broker存儲一個或多個partition。

Partition1有兩個offset:0和1。Partition2有4個offset。Partition3有1個offset。副本的id和副本所在的機器的id恰好相同。

如果一個topic的副本數為3,那么Kafka將在集群中為每個partition創建3個相同的副本。多個producer和consumer可同時生產和消費數據。

6.2 broker

Kafka 集群包含一個或多個服務器,服務器節點稱為broker。

broker存儲topic的數據。如果某topic有N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。

如果某topic有N個partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。

如果某topic有N個partition,集群中broker數目少於N個,那么一個broker存儲該topic的一個或多個partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群數據不均衡。

6.3 Topic

每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)

類似於數據庫的表名

6.3 Partition

topic中的數據分割為一個或多個partition。每個topic至少有一個partition。每個partition中的數據使用多個segment文件存儲。partition中的數據是有序的,不同partition間的數據丟失了數據的順序。如果topic有多個partition,消費數據時就不能保證數據的順序。在需要嚴格保證消息的消費順序的場景下,需要將partition數目設為1。

6.4 Producer

生產者即數據的發布者,該角色將消息發布到Kafka的topic中。broker接收到生產者發送的消息后,broker將該消息追加到當前用於追加數據的segment文件中。生產者發送的消息,存儲到一個partition中,生產者也可以指定數據存儲的partition。

6.5 Consumer

消費者可以從broker中讀取數據。消費者可以消費多個topic中的數據。

6.6 Consumer Group

每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。

6.7 Leader

每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責數據的讀寫的partition。

6.8 Follower

Follower跟隨Leader,所有寫請求都通過Leader路由,數據變更會廣播給所有Follower,Follower與Leader保持數據同步。如果Leader失效,則從Follower中選舉出一個新的Leader。當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)列表中刪除,重新創建一個Follower。

7、常用Message Queue對比

7.1 RabbitMQ

RabbitMQ是使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合於企業級的開發。同時實現了Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。

7.2 Redis

Redis是一個基於Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它本身支持MQ功能,所以完全可以當做一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試數據分為128Bytes、512Bytes、1K和10K四個不同大小的數據。實驗表明:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而如果數據大小超過了10K,Redis則慢的無法忍受;出隊時,無論數據大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低於Redis。

7.3 ZeroMQ

ZeroMQ號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。ZeroMQ能夠實現RabbitMQ不擅長的高級/復雜的隊列,但是開發人員需要自己組合多種技術框架,技術上的復雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息服務器或中間件,因為你的應用程序將扮演這個服務器角色。你只需要簡單的引用ZeroMQ程序庫,可以使用NuGet安裝,然后你就可以愉快的在應用程序之間發送消息了。但是ZeroMQ僅提供非持久性的隊列,也就是說如果宕機,數據將會丟失。其中,Twitter的Storm 0.9.0以前的版本中默認使用ZeroMQ作為數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty作為傳輸模塊)。

7.4 ActiveMQ

ActiveMQ是Apache下的一個子項目。 類似於ZeroMQ,它能夠以代理人和點對點的技術實現隊列。同時類似於RabbitMQ,它少量代碼就可以高效地實現高級應用場景。

7.5 Kafka/Jafka

Kafka是Apache下的一個子項目,是一個高性能跨語言分布式發布/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一台普通的服務器上既可以達到10W/s的吞吐速率;完全的分布式系統,Broker、Producer、Consumer都原生自動支持分布式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行加載機制統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。

8 Kafka的開發

8.1開發簡單的Kafka 應用程序

簡單的發送端代碼

package test;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class SimpleProducer {
        private static Producer<Integer,String> producer;
        private final Properties props=new Properties();
        public SimpleProducer(){
                //定義連接的broker list
                props.put("metadata.broker.list", "192.168.1.216:9092");
                //定義序列化類 Java中對象傳輸之前要序列化
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                producer = new Producer<Integer, String>(new ProducerConfig(props));
        }
        public static void main(String[] args) {
                SimpleProducer sp=new SimpleProducer();
                //定義topic
                String topic="mytopic";

                //定義要發送給topic的消息
                String messageStr = "This is a message";

                //構建消息對象
                KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);

                //推送消息到broker
                producer.send(data);
                producer.close();
        }
}

kafka單機環境端口就是kafka broker端口9092,這里定義topic為mytopic當然可以自己隨便定義不用考慮服務器是否創建,對於發送消息的話上面代碼是簡單的單條發送,如果發送數據量很大的話send方法多次推送會耗費時間,所以建議把data數據按一定量分組放到List中,最后send一下AarrayList即可,這樣速度會大幅度提高

簡單的Kafka 接收端代碼

package test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class SimpleHLConsumer {
        private final ConsumerConnector consumer;
        private final String topic;

        public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
                Properties props = new Properties();
                //定義連接zookeeper信息
                props.put("zookeeper.connect", zookeeper);
                //定義Consumer所有的groupID
                props.put("group.id", groupId);
                props.put("zookeeper.session.timeout.ms", "500");
                props.put("zookeeper.sync.time.ms", "250");
                props.put("auto.commit.interval.ms", "1000");
                consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
                this.topic = topic;
        }

        public void testConsumer() {
                Map<String, Integer> topicCount = new HashMap<String, Integer>();
                //定義訂閱topic數量
                topicCount.put(topic, new Integer(1));
                //返回的是所有topic的Map
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
                //取出我們要需要的topic中的消息流
                List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
                for (final KafkaStream stream : streams) {
                        ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                        while (consumerIte.hasNext())
                                System.out.println("Message from Topic :" + new String(consumerIte.next().message()));
                }
                if (consumer != null)
                        consumer.shutdown();
        }

        public static void main(String[] args) {
                String topic = "mytopic";
                SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.233.128:2181/kafka", "testgroup", topic);
                simpleHLConsumer.testConsumer();
        }

}

消費者代碼主要邏輯就是對生產者發送過來的數據做簡單處理和輸出,注意這里的地址是zookeeper的地址並且包括節點/kafka,topic名稱要一致

8.2開發 通用kafka模塊

開發一個通用的kafka模塊發送和接收模塊,其他的模塊,只需要調用該kafka模塊統一的發送接口和開發接收邏輯即可。

可以通過數據庫,進行 微服務Provider、 訂閱主題 topic、訂閱組 group 配置。 服務在消息后,自動就那些配置的接收類的回調。

庫表的配置如下:
在這里插入圖片描述

該kafka模塊處於 瘋狂創客圈的 Crazy-SpringCloud腳手架中, 模塊名稱為 base-kafka ,啟動之后的swagger 界面如下:

在這里插入圖片描述

可以通過該接口發送某個topic的消息,如果在數據庫里配置了訂閱關系,如果 provider-name( 微服務名稱) 訂閱了 test 主題,並且配置了消息的回調類和方法, 那么就會就會進行消息的消費。

消費的界面如下:

在這里插入圖片描述

9 Kafka 原理

來看看生產者和消費者、主題和組之間的關系:

如果看到這張圖你很懵逼,木有關系!我們先來分析相關概念
  Producer:Producer即生產者,消息的產生者,是消息的入口。

  kafka cluster
  

 Broker:Broker是kafka實例,每個服務器上有一個或多個kafka的實例,我們姑且認為每個broker對應一台服務器。每個kafka集群內的broker都有一個不重復的編號,如圖中的broker-0、broker-1等……
  

 Topic:消息的主題,可以理解為消息的分類,kafka的數據就保存在topic。在每個broker上都可以創建多個topic。
  

 Partition:Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。同一個topic在不同的Partition分區的數據是不重復的,partition的表現形式就是一個一個的文件夾!
   

 Replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也只可能存放一個副本(包括自己)。

Message:每一條發送的消息主體。
 

 Consumer:消費者,即消息的消費方,是消息的出口。

  Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中,同一個Partition分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的數據,這也是為了提高kafka的吞吐量!

  Zookeeper:kafka集群依賴zookeeper來保存集群的的元信息,來保證系統的可用性。

要點1:同一個topic在不同的Partition分區的數據是不重復的

要點2:同一個Partition分區的數據只能被消費者組中的某一個消費者消費

工作流程分析

  上面介紹了kafka的基礎架構及基本概念,不知道大家看完有沒有對kafka有個大致印象,如果對還比較懵也沒關系!我們接下來再結合上面的結構圖分析kafka的工作流程,最后再回來整個梳理一遍我相信你會更有收獲!

發送數據

  我們看上面的架構圖中,producer就是生產者,是數據的入口。注意看圖中的紅色箭頭,Producer在寫入數據的時候永遠的找leader,不會直接將數據寫入follower!那leader怎么找呢?寫入的流程又是什么樣的呢?我們看下圖:

  img

 

發送的流程就在圖中已經說明了,就不單獨在文字列出來了!需要注意的一點是,消息寫入leader后,follower是主動的去leader進行同步的!producer采用push模式將數據發布到broker,每條消息追加到分區中,順序寫入磁盤,所以保證同一分區內的數據是有序的!寫入示意圖如下:

  img

  上面說到數據會寫入到不同的分區,那kafka為什么要做分區呢?相信大家應該也能猜到,分區的主要目的是:
  1、 方便擴展。因為一個topic可以有多個partition,所以我們可以通過擴展機器去輕松的應對日益增長的數據量。
  2、 提高並發。以partition為讀寫單位,可以多個消費者同時消費數據,提高了消息的處理效率。

  熟悉負載均衡的朋友應該知道,當我們向某個服務器發送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的服務器,那在kafka中,如果某個topic有多個partition,producer又怎么知道該將數據發往哪個partition呢?kafka中有幾個原則:
  1、 partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
  2、 如果沒有指定partition,但是設置了數據的key,則會根據key的值hash出一個partition。
  3、 如果既沒指定partition,又沒有設置key,則會輪詢選出一個partition。

  保證消息不丟失是一個消息隊列中間件的基本保證,那producer在向kafka寫入消息的時候,怎么保證消息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是通過ACK應答機制!在生產者向隊列寫入數據的時候可以設置參數來確定是否確認kafka接收到數據,這個參數可設置的值為01all
  0代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效率最高。
  1代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
  all代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保leader發送成功和所有的副本都完成備份。安全性最高,但是效率最低。

  最后要注意的是,如果往不存在的topic寫數據,能不能寫入成功呢?kafka會自動創建topic,分區和副本的數量根據默認配置都是1。

保存數據

  Producer將數據寫入kafka后,集群就需要對數據進行保存了!kafka將數據保存在磁盤,可能在我們的一般的認知里,寫入磁盤是比較耗時的操作,不適合這種高並發的組件。Kafka初始會單獨開辟一塊磁盤空間,順序寫入數據(效率比隨機寫入高)。

Partition 結構
  前面說過了每個topic都可以分為一個或多個partition,如果你覺得topic比較抽象,那partition就是比較具體的東西了!Partition在服務器上的表現形式就是一個一個的文件夾,每個partition的文件夾下面會有多組segment文件,每組segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中沒有)三個文件, log文件就實際是存儲message的地方,而index和timeindex文件為索引文件,用於檢索消息。

  img

  如上圖,這個partition有三組segment文件,每個log文件的大小是一樣的,但是存儲的message數量是不一定相等的(每條的message大小不一致)。文件的命名是以該segment最小offset來命名的,如000.index存儲offset為0~368795的消息,kafka就是利用分段+索引的方式來解決查找效率的問題。

Message結構
上面說到log文件就實際是存儲message的地方,我們在producer往kafka寫入的也是一條一條的message,那存儲在log中的message是什么樣子的呢?消息主要包含消息體、消息大小、offset、壓縮類型……等等!我們重點需要知道的是下面三個:
  1、 offset:offset是一個占8byte的有序id號,它可以唯一確定每條消息在parition內的位置!
  2、 消息大小:消息大小占用4byte,用於描述消息的大小。
  3、 消息體:消息體存放的是實際的消息數據(被壓縮過),占用的空間根據具體的消息而不一樣。

存儲策略
  無論消息是否被消費,kafka都會保存所有的消息。那對於舊數據有什么刪除策略呢?
  1、 基於時間,默認配置是168小時(7天)。
  2、 基於大小,默認配置是1073741824。
  需要注意的是,kafka讀取特定消息的時間復雜度是O(1),所以這里刪除過期的文件並不會提高kafka的性能!

消費數據

  消息存儲在log文件后,消費者就可以進行消費了。與生產消息相同的是,消費者在拉取消息的時候也是找leader去拉取。

  多個消費者可以組成一個消費者組(consumer group),每個消費者組都有一個組id!同一個消費組者的消費者可以消費同一topic下不同分區的數據,但是不會組內多個消費者消費同一分區的數據!!!是不是有點繞。我們看下圖:

  img

  圖示是消費者組內的消費者小於partition數量的情況,所以會出現一個消費者消費多個partition數據的情況,消費的速度也就不及只處理一個partition的消費者的處理速度!

如果是消費者組的消費者多於partition的數量,那會不會出現多個消費者消費同一個partition的數據呢?

上面已經提到過不會出現這種情況!注意:多出來的消費者不消費任何partition的數據。所以在實際的應用中,建議消費者組的consumer的數量與partition的數量一致!,至少比partition多。

 

partition如何存儲的呢?

partition划分為多組segment,每個segment又包含.log、.index、.timeindex文件,存放的每條message包含offset、消息大小、消息體……我們多次提到segment和offset,查找消息的時候是怎么利用segment+offset配合查找的呢?假如現在需要查找一個offset為368801的message是什么樣的過程呢?我們先看看下面的圖:

img

  1、 先找到offset的368801message所在的segment文件(利用二分法查找),這里找到的就是在第二個segment文件。
  2、 打開找到的segment中的.index文件(也就是368796.index文件,該文件起始偏移量為368796+1,我們要查找的offset為368801的message在該index內的偏移量為368796+5=368801,所以這里要查找的相對offset為5)。由於該文件采用的是稀疏索引的方式存儲着相對offset及對應message物理偏移量的關系,所以直接找相對offset為5的索引找不到,這里同樣利用二分法查找相對offset小於或者等於指定的相對offset的索引條目中最大的那個相對offset,所以找到的是相對offset為4的這個索引。
  3、 根據找到的相對offset為4的索引確定message存儲的物理偏移位置為256。打開數據文件,從位置為256的那個地方開始順序掃描直到找到offset為368801的那條Message。

  這套機制是建立在offset為有序的基礎上,利用segment+有序offset+稀疏索引+二分查找+順序查找等多種手段來高效的查找數據!至此,消費者就能拿到需要處理的數據進行處理了。

那每個消費者又是怎么記錄自己消費的位置呢?

在早期的版本中,消費者將消費到的offset維護zookeeper中,consumer每間隔一段時間上報一次,這里容易導致重復消費,且性能不好!在新的版本中消費者消費到的offset已經直接維護在kafk集群的__consumer_offsets這個topic中!

回到◀瘋狂創客圈

瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM