kafka和zookeeper


1.Kafka入門教程

1.1 消息隊列(Message Queue)

Message Queue消息傳送系統提供傳送服務。消息傳送依賴於大量支持組件,這些組件負責處理連接服務、消息的路由和傳送、持久性、安全性以及日志記錄。消息服務器可以使用一個或多個代理實例。

JMS(Java Messaging Service)是Java平台上有關面向消息中間件(MOM)的技術規范,它便於消息系統中的Java應用程序進行消息交換,並且通過提供標准的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。

1.2 MQ消息模型

img

KafkaMQ消息模型圖1-1

1.3 MQ消息隊列分類

消息隊列分類:點對點和發布/訂閱兩種:

1、點對點:

消息生產者生產消息發送到queue中,然后消息消費者從queue中取出並且消費消息。

消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。

2、發布/訂閱:

消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。

1.4 MQ消息隊列對比

1、RabbitMQ:支持的協議多,非常重量級消息隊列,對路由(Routing),負載均衡(Loadbalance)或者數據持久化都有很好的支持。

2、ZeroMQ:號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景,擅長的高級/復雜的隊列,但是技術也復雜,並且只提供非持久性的隊列。

3、ActiveMQ:Apache下的一個子項,類似ZeroMQ,能夠以代理人和點對點的技術實現隊列。

4、Redis:是一個key-Value的NOSql數據庫,但也支持MQ功能,數據量較小,性能優於RabbitMQ,數據超過10K就慢的無法忍受。

1.5 Kafka簡介

Kafka是分布式發布-訂閱消息系統,它最初由 LinkedIn 公司開發,使用 Scala語言編寫,之后成為 Apache 項目的一部分。在Kafka集群中,沒有“中心主節點”的概念,集群中所有的服務器都是對等的,因此,可以在不做任何配置的更改的情況下實現服務器的的添加與刪除,同樣的消息的生產者和消費者也能夠做到隨意重啟和機器的上下線。

img

Kafka消息系統生產者和消費者部署關系圖1-2

img

Kafka消息系統架構圖1-3

1.6 Kafka術語介紹

1、消息生產者:即:Producer,是消息的產生的源頭,負責生成消息並發送到Kafka

服務器上。

2、消息消費者:即:Consumer,是消息的使用方,負責消費Kafka服務器上的消息。

3、主題:即:Topic,由用戶定義並配置在Kafka服務器,用於建立生產者和消息者之間的訂閱關系:生產者發送消息到指定的Topic下,消息者從這個Topic下消費消息。

4、消息分區:即:Partition,一個Topic下面會分為很多分區,例如:“kafka-test”這個Topic下可以分為6個分區,分別由兩台服務器提供,那么通常可以配置為讓每台服務器提供3個分區,假如服務器ID分別為0、1,則所有的分區為0-0、0-1、0-2和1-0、1-1、1-2。Topic物理上的分組,一個 topic可以分為多個 partition,每個 partition 是一個有序的隊列。partition中的每條消息都會被分配一個有序的 id(offset)。

5、Broker:即Kafka的服務器,用戶存儲消息,Kafa集群中的一台或多台服務器統稱為 broker。

6、消費者分組:Group,用於歸組同類消費者,在Kafka中,多個消費者可以共同消息一個Topic下的消息,每個消費者消費其中的部分消息,這些消費者就組成了一個分組,擁有同一個分組名稱,通常也被稱為消費者集群。

7、Offset:消息存儲在Kafka的Broker上,消費者拉取消息數據的過程中需要知道消息在文件中的偏移量,這個偏移量就是所謂的Offset。

1.7 Kafka中Broker

1、Broker:即Kafka的服務器,用戶存儲消息,Kafa集群中的一台或多台服務器統稱為 broker。

2、Message在Broker中通Log追加的方式進行持久化存儲。並進行分區(patitions)。

3、為了減少磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數。

4、Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。Message消息是有多份的。

5、Broker不保存訂閱者的狀態,由訂閱者自己保存。

6、無狀態導致消息的刪除成為難題(可能刪除的消息正在被訂閱),kafka采用基於時間的SLA(服務水平保證),消息保存一定時間(通常為7天)后會被刪除。

7、消息訂閱者可以rewind back到任意位置重新進行消費,當訂閱者故障時,可以選擇最小的offset(id)進行重新讀取消費消息。

1.8 Kafka的Message組成

1、Message消息:是通信的基本單位,每個 producer 可以向一個 topic(主題)發布一些消息。

2、Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在創建topic時指定的),每個partition存儲一部分Message。

3、partition中的每條Message包含了以下三個屬性:

offset 即:消息唯一標識:對應類型:long

MessageSize 對應類型:int32

data 是message的具體內容。

1.9 Kafka的Partitions分區

1、Kafka基於文件存儲.通過分區,可以將日志內容分散到多個server上,來避免文件尺寸達到單機磁盤的上限,每個partiton都會被當前server(kafka實例)保存。

2、可以將一個topic切分多任意多個partitions,來消息保存/消費的效率。

3、越多的partitions意味着可以容納更多的consumer,有效提升並發消費的能力。

1.10 Kafka的Consumers

1、消息和數據消費者,訂閱 topics並處理其發布的消息的過程叫做 consumers。

2、在 kafka中,我們可以認為一個group是一個“訂閱者”,一個Topic中的每個partions,只會被一個“訂閱者”中的一個consumer消費,不過一個 consumer可以消費多個partitions中的消息(消費者數據小於Partions的數量時)。注意:kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味着某些consumer將無法得到消息。

3、一個partition中的消息只會被group中的一個consumer消息。每個group中consumer消息消費互相獨立。

1.11 Kafka的持久化

1、一個Topic可以認為是一類消息,每個topic將被分成多partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),partition是以文件的形式存儲在文件系統中。

2、Logs文件根據broker中的配置要求,保留一定時間后刪除來釋放磁盤空間。

img

Kafka消息分區Partition圖1-4

Partition:

Topic物理上的分組,一個 topic可以分為多個 partition,每個 partition 是一個有序的隊列。partition中的每條消息都會被分配一個有序的 id(offset)。

3、為數據文件建索引:稀疏存儲,每隔一定字節的數據建立一條索引。下圖為一個partition的索引示意圖:

img

Kafka消息分區Partition索引圖1-5

1.12 Kafka的分布式實現:

img

Kafka分布式關系圖1-6

img

Kafka生產環境關系圖1-7

1.13 Kafka的通訊協議:

1、Kafka的Producer、Broker和Consumer之間采用的是一套自行設計基於TCP層的協議,根據業務需求定制,而非實現一套類似ProtocolBuffer的通用協議。

2、基本數據類型:(Kafka是基於Scala語言實現的,類型也是Scala中的數據類型)

定長數據類型:int8,int16,int32和int64,對應到Java中就是byte, short, int和long。

變長數據類型:bytes和string。變長的數據類型由兩部分組成,分別是一個有符號整數N(表示內容的長度)和N個字節的內容。其中,N為-1表示內容為null。bytes的長度由int32表示,string的長度由int16表示。

數組:數組由兩部分組成,分別是一個由int32類型的數字表示的數組長度N和N個元素。

3、Kafka通訊的基本單位是Request/Response。

4、基本結構:

RequestOrResponse => MessageSize(RequestMessage | ResponseMessage)

名稱 類型 描術
MessageSize int32 表示RequestMessage或者ResponseMessage的長度
RequestMessageResponseMessage

5、通訊過程:

客戶端打開與服務器端的Socket

往Socket寫入一個int32的數字(數字表示這次發送的Request有多少字節)

服務器端先讀出一個int32的整數從而獲取這次Request的大小

然后讀取對應字節數的數據從而得到Request的具體內容

服務器端處理了請求后,也用同樣的方式來發送響應。

6、RequestMessage結構:

RequestMessage => ApiKey ApiVersionCorrelationId ClientId Request

名稱 類型 描術
ApiKey int16 表示這次請求的API編號
ApiVersion int16 表示請求的API的版本,有了版本后就可以做到后向兼容
CorrelationId int32 由客戶端指定的一個數字唯一標示這次請求的id,服務器端在處理完請求后也會把同樣的CorrelationId寫到Response中,這樣客戶端就能把某個請求和響應對應起來了。
ClientId string 客戶端指定的用來描述客戶端的字符串,會被用來記錄日志和監控,它唯一標示一個客戶端。
Request Request的具體內容。

7、ResponseMessage結構:

ResponseMessage => CorrelationId Response

名稱 類型 描術
CorrelationId int32 對應Request的CorrelationId。
Response 對應Request的Response,不同的Request的Response的字段是不一樣的。

Kafka采用是經典的Reactor(同步IO)模式,也就是1個Acceptor響應客戶端的連接請求,N個Processor來讀取數據,這種模式可以構建出高性能的服務器。

8、Message結構:

Message:Producer生產的消息,鍵-值對

Message => Crc MagicByte Attributes KeyValue

名稱 類型 描術
CRC int32 表示這條消息(不包括CRC字段本身)的校驗碼。
MagicByte int8 表示消息格式的版本,用來做后向兼容,目前值為0。
Attributes int8 表示這條消息的元數據,目前最低兩位用來表示壓縮格式。
Key bytes 表示這條消息的Key,可以為null。
Value bytes 表示這條消息的Value。Kafka支持消息嵌套,也就是把一條消息作為Value放到另外一條消息里面。

9、MessageSet結構:

MessageSet:用來組合多條Message,它在每條Message的基礎上加上了Offset和MessageSize

MessageSet => [Offset MessageSize Message]

名稱 類型 描術
Offset int64 它用來作為log中的序列號,Producer在生產消息的時候還不知道具體的值是什么,可以隨便填個數字進去。
MessageSize int32 表示這條Message的大小。
Message - 表示這條Message的具體內容,其格式見上一小節。

10、 Request/Respone和Message/MessageSet的關系:

Request/Response是通訊層的結構,和網絡的7層模型對比的話,它類似於TCP層。

Message/MessageSet定義的是業務層的結構,類似於網絡7層模型中的HTTP層。Message/MessageSet只是Request/Response的payload中的一種數據結構。

備注:Kafka的通訊協議中不含Schema,格式也比較簡單,這樣設計的好處是協議自身的Overhead小,再加上把多條Message放在一起做壓縮,提高壓縮比率,從而在網絡上傳輸的數據量會少一些。

1.14 數據傳輸的事務定義:

1、at most once:最多一次,這個和JMS中”非持久化”消息類似.發送一次,無論成敗,將不會重發。

at most once:消費者fetch消息,然后保存offset,然后處理消息;當client保存offset之后,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理.那么此后”未處理”的消息將不能被fetch到,這就是“atmost once”。

2、at least once:消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功。

at least once:消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是“atleast once”,原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。

3、exactly once:消息只會發送一次。

exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的。

注:通常情況下“at-least-once”是我們首選。(相比at most once而言,重復接收數據總比丟失數據要好)。

2.簡單命令介紹

啟動命令:./kafka-server-start.sh -daemon /Users/huangweixing/software/kafka_2.12-2.1.0/config/server.properties

創建topic命令:./kafka-topics.sh --create --topic test0 --replication-factor 1 --partitions 1 --zookeeper localhost:2181

查看kafka中topic情況:./kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看對應topic詳細描述信息:./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test0

topic信息修改:kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test0 --partitions 3 ## Kafka分區數量只允許增加,不允許減少
————————————————
版權聲明:本文為CSDN博主「沒那個條件」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/hweixing123/article/details/86536641

3.kafka配置

1. 生產端的配置文件 producer.properties

	# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

#broker必須在該時間范圍之內給出反饋,否則失敗。
#在向producer發送ack之前,broker允許等待的最大時間 ,如果超時,
#broker將會向producer發送一個error ACK.意味着上一次消息因為某種原因
#未能成功(比如follower未能同步成功)
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

2. 消費端的配置文件 consumer.properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

#指定消費
group.id=test-consumer-group

# 如果zookeeper沒有offset值或offset值超出范圍。
#那么就給個初始的offset。有smallest、largest、
#anything可選,分別表示給當前最小的offset、
#當前最大的offset、拋異常。默認larges
#auto.offset.reset=

3.服務端的配置文件 server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

#broker的全局唯一編號,不能重復
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

#處理網絡請求的線程數量,也就是接收消息的線程數。
#接收線程會將接收到的消息放到內存中,然后再從內存中寫入磁盤。
num.network.threads=3

#消息從內存中寫入磁盤是時候使用的線程數量。
#用來處理磁盤IO的線程數量
num.io.threads=8

#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400

#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400

#請求套接字的緩沖區大小
socket.request.max.bytes=104857600


############################# Log Basics #############################

#kafka運行日志存放的路徑
log.dirs=/tmp/kafka-logs

#topic在當前broker上的分片個數
num.partitions=1

#我們知道segment文件默認會被保留7天的時間,超時的話就
#會被清理,那么清理這件事情就需要有一些線程來做。這里就是
#用來設置恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

#segment文件保留的最長時間,默認保留7天(168小時),
#超時將被刪除,也就是說7天之前的數據將被清理掉。
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824

#上面的參數設置了每一個segment文件的大小是1G,那么
#就需要有一個東西去定期檢查segment文件有沒有達到1G,
#多長時間去檢查一次,就需要設置一個周期性檢查文件大小
#的時間(單位是毫秒)。
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

4.java kafka

引入依賴

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

bootstrap.yml

spring:
  cloud:
    config:
      label: paas-service-message-converged-service
      profile: DEV
      discovery:
        enabled: true
        service-id: baseServices-config-server
#    stream:
#        kafka:
#          binder:
#            brokers: 10.17.173.199:9092
#            zkNodes:  10.17.173.199:2181
  kafka:
    # 指定kafka server的地址,集群配多個,中間,逗號隔開
    bootstrap-servers: 10.17.162.121:9092
    producer:
      # 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
      # 當retris為0時,produce不會重復。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
      retries: 0
      # 每次批量發送消息的數量,produce積累到一定數據,一次發送
#      batch-size: 16384
      # produce積累數據一次發送,緩存大小達到buffer.memory就發送數據
#      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默認消費者group id --> 由於在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設置組名
      group-id: group${random.uuid}
      # smallest和latest才有效,如果smallest重新0開始讀取,如果是latest從logfile的offset讀取。消息推送只需要實時的消息,因此使用latest
      auto-offset-reset: latest
      # enable.auto.commit:true --> 設置自動提交offset
      enable-auto-commit: true
      #如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。
      auto-commit-interval: 100
      # 指定消息key和消息體的編解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  application:
    name: paas-service-message-converged-service
  main:
    allow-bean-definition-overriding: true
eureka:
  instance:
    hostname: ${spring.cloud.client.ip-address}
    prefer-ip-address: true
    instanceId: ${eureka.instance.hostname}:${spring.application.name}:${server.port}
  client:
    serviceUrl:
      defaultZone: http://portal-dev:portal-dev@10.17.162.120:8761/eureka/

kafka發送消息

package com.midea.mideacloud.paasservice.rest;

import com.alibaba.fastjson.JSONArray;
import com.midea.management.util.UserUtils;
import com.midea.mideacloud.paascommon.utils.TemplateUtil;
import com.midea.mideacloud.paasservice.common.utils.NumberUtil;
import com.midea.mideacloud.paasservice.domain.MessageAtomicRouters;
import com.midea.mideacloud.paasservice.domain.api.MessageReceiverFeign;
import com.midea.mideacloud.paasservice.domain.api.UserInfoFeign;
import com.midea.mideacloud.paasservice.domain.dto.*;
import com.midea.mideacloud.paasservice.service.IMessageInfoConvergedService;
import com.midea.mideacloud.paasservice.service.websocket.helper.WebSocketMessageRedisHelper;
import com.mideaframework.core.web.JsonResponse;
import com.mideaframework.core.web.RestDoing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.midea.mideacloud.paasservice.common.constants.WebSocketConstants.Kafka.WEBSOCKET_TO_CLIENT_KAFKA_TOPIC;
import static com.midea.mideacloud.paasservice.common.constants.WebSocketConstants.Redis.REDIS_WEBSOCKET_MSG_PREFIX;

@RestController
public class MessageInfoRest {

    private Logger logger = LoggerFactory.getLogger(MessageInfoRest.class);
    @Autowired
    private IMessageInfoConvergedService iMessageInfoConvergedService;
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;
    @Autowired
    private UserInfoFeign userInfoFeign;
    @Autowired
    private WebSocketMessageRedisHelper webSocketMessageRedisHelper;
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private MessageReceiverFeign messageReceiverFeign;


    @RequestMapping(value = MessageAtomicRouters.ROUTER_MESSAGES_SEND_MESSAGE,method = RequestMethod.POST)
    public JsonResponse send(@RequestBody Body body, HttpServletRequest request){
        if(body == null){
            logger.info("body參數為空");
            return JsonResponse.fail("500","body參數為空");
        }
        RestDoing doing = jsonResponse -> {
            //通過傳過來的類型,去獲取模板對象,拼接對應的回復內容
            int type = body.getType();
            List<String> data = body.getData();
            String[] strings = data.toArray(new String[data.size()]);
            JsonResponse<MessageTemplateDto> messageTemplateDtoJsonResponse = iMessageInfoConvergedService.getTemplate(type);
            if(null == messageTemplateDtoJsonResponse || !messageTemplateDtoJsonResponse.judgeSuccess()){
                logger.error("獲取信息模板失敗",messageTemplateDtoJsonResponse);
            }
            MessageTemplateDto template = messageTemplateDtoJsonResponse.getData();
            logger.info("模板對象"+template);
            //根據類型拼接內容模板
            MessageInfoDto messageInfoDto = new MessageInfoDto();
            //工單或者應用部署類型,要拼接標題模板
            if(0 == type || 3 == type || 4 == type){
                messageInfoDto.setTitle(TemplateUtil.getTitle(template.getName(),body.getTitle()));
            }else {
                messageInfoDto.setTitle(template.getName());
            }
            messageInfoDto.setContent(TemplateUtil.getContent(type,template.getTypeTemplate(), strings));
            messageInfoDto.setType(type);
            messageInfoDto.setUrl(TemplateUtil.getUrl(type,template.getUrlTemplate(),body.getId()));
            Integer messageId = (Integer) iMessageInfoConvergedService.saveMessageInfo(messageInfoDto).getData();
            messageInfoDto.setId(messageId);
            //把id插入中間表
            List<Integer> userIds = body.getUserIds();
            List<MessageReceiverDto> MessageReceiverDtos = new ArrayList<>();
            for (int i = 0; i < userIds.size(); i++) {
                MessageReceiverDto messageReceiverDto = new MessageReceiverDto();
                messageReceiverDto.setMessageId(messageId);
                Integer userId = NumberUtil.valueOf(userIds.get(i));
                messageReceiverDto.setUserId(userId);
                messageReceiverDto.setStatus(0);
                MessageReceiverDtos.add(messageReceiverDto);
            }
            //通過用戶id集合獲取用戶對象集合
            List<UserInfoDto> userInfos = userInfoFeign.findByUserCodeList((ArrayList<Integer>) userIds).getData();
            logger.info("用戶對象集合打印"+userInfos);
            //插入接收表,返回對應的對象集合
            List<MessageReceiverDto> messageReceiverDtoList = messageReceiverFeign.insertRelation(MessageReceiverDtos).getData();
            //封裝推送消息
            List<Object> list = new ArrayList<>();
            for (int i = 0; i < userIds.size(); i++) {
                MessageInfoDto temp = new MessageInfoDto();
                BeanUtils.copyProperties(messageInfoDto, temp);
                Integer userId = NumberUtil.valueOf(userIds.get(i));
                //根據userId獲取用戶user對象
                temp.setUserId(userId);
                UserInfoDto userInfo = null;
                try {
                    userInfo = userInfos.get(i);
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.info("消息發送,用戶獲取失敗");
                }
                temp.setUserCode(userInfo.getUserCode());
                temp.setUserName(userInfo.getUserName());
                Integer messageReceiverId = messageReceiverDtoList.get(i).getId();
                temp.setMessageReceiverId(messageReceiverId.toString());
                list.add(temp);
            }
            logger.info("推送消息集合打印"+list);
            JSONArray kafkaMsgArray = new JSONArray(list);
            //將當前消息存入redis中,key為messageId,標記狀態為未發送
            Map mapToRedis = webSocketMessageRedisHelper.buildMapForBatchSaveToRedis(kafkaMsgArray);
            redisTemplate.opsForHash().putAll(REDIS_WEBSOCKET_MSG_PREFIX, mapToRedis);
            //將消息推送至kafka
            kafkaTemplate.send(WEBSOCKET_TO_CLIENT_KAFKA_TOPIC, kafkaMsgArray.toJSONString());
        };
        return doing.go(request, logger);

    }


}

kafka監聽

package com.midea.mideacloud.paasservice.mq.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.midea.mideacloud.paasservice.cache.websocket.SocketManager;
import com.midea.mideacloud.paasservice.service.websocket.WebSocketService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;

import static com.midea.mideacloud.paasservice.common.constants.WebSocketConstants.Kafka.WEBSOCKET_CLOSE_KAFKA_TOPIC;
import static com.midea.mideacloud.paasservice.common.constants.WebSocketConstants.Kafka.WEBSOCKET_TO_CLIENT_KAFKA_TOPIC;

/**
 * <p>Kafka消費者:監聽WebSocket推送消息</p>
 * @author ex_shibb1
 * @date 2019/11/13 14:58
 */
@Slf4j
@Component
public class WebSocketMessageKafkaConsumer {

    @Autowired
    private WebSocketService webSocketService;

    @KafkaListener(topics = WEBSOCKET_TO_CLIENT_KAFKA_TOPIC)
    public void onMessage(String message){
        log.info("監聽到一條{}主題的消息 : {}", WEBSOCKET_TO_CLIENT_KAFKA_TOPIC, message);
        if (StringUtils.isBlank(message)) {
            return;
        }
        JSONArray kafkaMsgArray = JSON.parseArray(message);
        webSocketService.batchSendMsg(kafkaMsgArray);
    }

    @Deprecated
    @KafkaListener(topics = WEBSOCKET_CLOSE_KAFKA_TOPIC)
    public void onClose(String userCode){
        log.info("監聽到一條{}主題的消息 : {}", WEBSOCKET_CLOSE_KAFKA_TOPIC, userCode);
        if (StringUtils.isBlank(userCode)) {
            return;
        }
        WebSocketSession webSocketSession = SocketManager.get(userCode);
        if (null == webSocketSession) {
            //嘗試remove一下
            SocketManager.remove(userCode);
            log.warn(userCode + "連接不存在,無需斷開");
            return;
        }
        try {
            webSocketSession.close();
            SocketManager.remove(userCode);
        } catch (IOException e) {
            log.error(userCode + "的WebSocket斷開連接異常", e);
            return;
        }
        log.info(userCode + "連接斷開成功");
    }


}

2.kafka manage

一.kafka-manager簡介

kafka-manager是目前最受歡迎的kafka集群管理工具,最早由雅虎開源,用戶可以在Web界面執行一些簡單的集群管理操作。具體支持以下內容:

  • 管理多個集群
  • 輕松檢查群集狀態(主題,消費者,偏移,代理,副本分發,分區分發)
  • 運行首選副本選舉
  • 使用選項生成分區分配以選擇要使用的代理
  • 運行分區重新分配(基於生成的分配)
  • 使用可選主題配置創建主題(0.8.1.1具有與0.8.2+不同的配置)
  • 刪除主題(僅支持0.8.2+並記住在代理配置中設置delete.topic.enable = true)
  • 主題列表現在指示標記為刪除的主題(僅支持0.8.2+)
  • 批量生成多個主題的分區分配,並可選擇要使用的代理
  • 批量運行重新分配多個主題的分區
  • 將分區添加到現有主題
  • 更新現有主題的配置

kafka-manager 項目地址:https://github.com/yahoo/kafka-manager

二.kafka-manager安裝

1.下載安裝包

使用Git或者直接從Releases中下載,這里我們下載 1.3.3.18 版本:https://github.com/yahoo/kafka-manager/releases

img

[admin@node21 software]$ wget https://github.com/yahoo/kafka-manager/archive/1.3.3.18.zip

2.解壓安裝包

[復制代碼](javascript:void(0)😉

[admin@node21 software]$ mv 1.3.3.18.zip kafka-manager-1.3.3.18.zip
[admin@node21 software]$ unzip kafka-manager-1.3.3.18.zip -d /opt/module/
[admin@node21 software]$ cd /opt/module/
[admin@node21 module]$ ll
drwxr-xr-x  9 admin admin   268 May 27 00:33 jdk1.8
drwxr-xr-x  7 admin admin   122 Jun 14 11:44 kafka_2.11-1.1.0
drwxrwxr-x  9 admin admin   189 Jul  7 04:44 kafka-manager-1.3.3.18
drwxr-xr-x 11 admin admin  4096 May 29 10:14 zookeeper-3.4.12
[admin@node21 module]$ ls kafka-manager-1.3.3.18/
app build.sbt conf img LICENCE project public README.md sbt src target test

[復制代碼](javascript:void(0)😉

3.sbt編譯

1)yum安裝sbt(因為kafka-manager需要sbt編譯)

[admin@node21 ~]$ curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
[admin@node21 ~]$ sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
[admin@node21 ~]$ sudo yum install sbt

修改倉庫地址:(sbt 默認下載庫文件很慢, 還時不時被打斷),我們可以在用戶目錄下創建 touch ~/.sbt/repositories, 填上阿里雲的鏡像 # vi ~/.sbt/repositories

[復制代碼](javascript:void(0)😉

[repositories] 
local 
aliyun: http://maven.aliyun.com/nexus/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly 
sonatype-oss-releases 
maven-central 
sonatype-oss-snapshots

[復制代碼](javascript:void(0)😉

驗證:檢查sbt是否安裝成功,查看命令輸出,發現已經成功可以從maven.aliyun.com/nexus下載到依賴即表示成功

[admin@node21 ~]$ sbt-version

2)編譯kafka-manager

[admin@node21 kafka-manager-1.3.3.18]$ ./sbt clean dist

看到打印這個消息 Getting org.scala-sbt sbt 0.13.9 (this may take some time)... 就慢慢等吧,可以到~/.sbt/boot/update.log 查看sbt更新日志。sbt更新好,就開始下載各種jar包,最后看到:Your package is ready in /opt/module/kafka-manager-1.3.3.18/target/universal/kafka-manager-1.3.3.18.zip 證明編譯好了。

img

4.安裝

環境准備:Java 8+ kafka集群搭建參考:CentOS7.5搭建Kafka2.11-1.1.0集群

重新解壓編譯好的kafka-manager-1.3.3.18.zip

[admin@node21 kafka-manager-1.3.3.18]$ ls
bin  conf  lib  README.md  share

修改配置文件

[復制代碼](javascript:void(0)😉

[admin@node21 kafka-manager-1.3.3.18]$ pwd
/opt/module/kafka-manager-1.3.3.18
[admin@node21 kafka-manager-1.3.3.18]$ ls conf/
application.conf  consumer.properties  logback.xml  logger.xml  routes
[admin@node21 kafka-manager-1.3.3.18]$ sudo vi conf/application.conf 
修改kafka-manager.zkhosts列表為自己的zk節點
kafka-manager.zkhosts="node21:2181,node22:2181,node23:2181"

[復制代碼](javascript:void(0)😉

img

5.啟動服務

啟動zk集群,kafka集群,再啟動kafka-manager服務。

bin/kafka-manager 默認的端口是9000,可通過 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:

[admin@node21 kafka-manager-1.3.3.18]$ nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &

jps查看進程

img

6.編寫服務啟動腳本

chmod +x kafka-manager.sh

nohup /opt/module/kafka-manager-1.3.3.18/bin/kafka-manager -Dconfig.file=/opt/module/kafka-manager-1.3.3.18/conf/application.conf -Dhttp.port=8888 >/opt/module/kafka-ma
nager-1.3.3.18/kafka-manager.log 2>&1 &

WebUI查看:http://node21:8888/ 出現如下界面則啟動成功。

img

三.kafka-manager配置

1.新建Cluster

點擊【Cluster】>【Add Cluster】打開如下添加集群配置界面:輸入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服務器地址(如localhost:2181),選擇最接近的Kafka版本

img

其他broker的配置可以根據自己需要進行配置,默認情況下,點擊【保存】時,會提示幾個默認值為1的配置錯誤,需要配置為>=2的值。提示如下。

img

新建完成后,保存運行界面如下:

img

img

img

四.kafka-manager管理

1.新建主題

Topic---Create

img

2.查看主題

Topic---list

img

3.kafka-manage配置

3.1.application.conf

# Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
# See accompanying LICENSE file.

# This is the main configuration file for the application.
# ~~~~~

# Secret key
# ~~~~~
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same key!
play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"
play.crypto.secret=${?APPLICATION_SECRET}
play.http.session.maxAge="1h"

# The application languages
# ~~~~~
play.i18n.langs=["en"]

play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader

# 修改kafka-manager.zkhosts列表為自己的zk節點
kafka-manager.zkhosts="10.17.162.229:2181,10.17.162.230:2181,10.17.162.231:2181"
kafka-manager.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
}

akka.logger-startup-timeout = 60s

basicAuthentication.enabled=true
basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED}

basicAuthentication.ldap.enabled=false
basicAuthentication.ldap.enabled=${?KAFKA_MANAGER_LDAP_ENABLED}
basicAuthentication.ldap.server=""
basicAuthentication.ldap.server=${?KAFKA_MANAGER_LDAP_SERVER}
basicAuthentication.ldap.port=389
basicAuthentication.ldap.port=${?KAFKA_MANAGER_LDAP_PORT}
basicAuthentication.ldap.username=""
basicAuthentication.ldap.username=${?KAFKA_MANAGER_LDAP_USERNAME}
basicAuthentication.ldap.password=""
basicAuthentication.ldap.password=${?KAFKA_MANAGER_LDAP_PASSWORD}
basicAuthentication.ldap.search-base-dn=""
basicAuthentication.ldap.search-base-dn=${?KAFKA_MANAGER_LDAP_SEARCH_BASE_DN}
basicAuthentication.ldap.search-filter="(uid=$capturedLogin$)"
basicAuthentication.ldap.search-filter=${?KAFKA_MANAGER_LDAP_SEARCH_FILTER}
basicAuthentication.ldap.connection-pool-size=10
basicAuthentication.ldap.connection-pool-size=${?KAFKA_MANAGER_LDAP_CONNECTION_POOL_SIZE}
basicAuthentication.ldap.ssl=false
basicAuthentication.ldap.ssl=${?KAFKA_MANAGER_LDAP_SSL}

basicAuthentication.username="admin"
basicAuthentication.username=${?KAFKA_MANAGER_USERNAME}
basicAuthentication.password="password"
basicAuthentication.password=${?KAFKA_MANAGER_PASSWORD}

basicAuthentication.realm="Kafka-Manager"
basicAuthentication.excluded=["/api/health"] # ping the health of your instance without authentification


kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}

3.2.consumer.propertis

security.protocol=PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

3.zookeeper

下載地址:http://mirrors.hust.edu.cn/apache/zookeeper/ ,↓
下載最新版:

選擇版本

點擊下載:

下載

下載好后解壓到自己想存放的位置,當前我的文件解壓到了D盤(D:\zookeeper-3.5.4-beta),選擇解壓到當前文件即可。。。

解壓后目錄結構:

  • bin目錄
    zk的可執行腳本目錄,包括zk服務進程,zk客戶端,等腳本。其中,.sh是Linux環境下的腳本,.cmd是Windows環境下的腳本。
  • conf目錄
    配置文件目錄。zoo_sample.cfg為樣例配置文件,需要修改為自己的名稱,一般為zoo.cfg。log4j.properties為日志配置文件。
  • lib
    zk依賴的包。
  • contrib目錄
    一些用於操作zk的工具包。
  • recipes目錄
    zk某些用法的代碼示例
    zookeeper 支持的運行平台:
  • 支持的運行平台

ZooKeeper的安裝包括單機模式安裝,以及集群模式安裝。

單機模式較簡單,是指只部署一個zk進程,客戶端直接與該zk進程進行通信。
在開發測試環境下,通過來說沒有較多的物理資源,因此我們常使用單機模式。當然在單台物理機上也可以部署集群模式,但這會增加單台物理機的資源消耗。故在開發環境中,我們一般使用單機模式。
但是要注意,生產環境下不可用單機模式,這是由於無論從系統可靠性還是讀寫性能,單機模式都不能滿足生產的需求。

1、單機模式:

然后找到文件夾下面的 conf 配置文件中的 zoo_sample.cfg 設置一下配置文件就可以啟動:
but 這里需要更改一下 .cfg 文件名 zookeeper 啟動腳本默認是尋找 zoo.cfg 文件。。。。之所以 得修改文件名

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:\\zookeeper-3.5.4-beta\\data
dataLogDir=D:\\zookeeper-3.5.4-beta\\log
# the port at which the clients will connect
admin.serverPort=8082
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

修改內容:

增加了:admin.serverPort=8082  #不然會出現端口被占用的情況,因為默認是和Apache.Tomcat使用的8080端口
修改了:dataDir=D:\\zookeeper-3.5.4-beta\\data  #保存數據的目錄
       dataLogDir=D:\\zookeeper-3.5.4-beta\\log #保存日志的目錄
  • tickTime:這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。

  • dataDir:顧名思義就是 Zookeeper 保存數據的目錄,默認情況下,Zookeeper 將寫數據的日志文件也保存在這個目錄里。

  • dataLogDir:顧名思義就是 Zookeeper 保存日志文件的目錄

  • clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。

    使用文本編輯器打開zkServer.cmd或者zkServer.sh文件,可以看到其會調用zkEnv.cmd或者zkEnv.sh腳本。zkEnv腳本的作用是設置zk運行的一些環境變量,例如配置文件的位置和名稱等。

當這些配置項配置好后,你現在就可以啟動 Zookeeper 了,啟動后要檢查 Zookeeper 是否已經在服務,可以通過 netstat – ano 命令查看是否有你配置的 clientPort 端口號在監聽服務
通過 cmd 命令模式進入 Zookeeper 的 bin 目錄:# D:\zookeeper-3.5.4-beta\bin>zkServer.cmd

現在 zookeeper server 端已經啟動:然后我們來連接一下這個服務端
通過 cmd 命令模式進入 Zookeeper 的 bin 目錄運行:# D:\zookeeper-3.5.4-beta\bin>zkCli.cmd

2 、集群模式:

單機模式的zk進程雖然便於開發與測試,但並不適合在生產環境使用。在生產環境下,我們需要使用集群模式來對zk進行部署。

注意:
在集群模式下,建議至少部署3個zk進程,或者部署奇數個zk進程。如果只部署2個zk進程,當其中一個zk進程掛掉后,剩下的一個進程並不能構成一個quorum的大多數。因此,部署2個進程甚至比單機模式更不可靠,因為2個進程其中一個不可用的可能性比一個進程不可用的可能性還大。
  • 在集群模式下,所有的zk進程可以使用相同的配置文件(是指各個zk進程部署在不同的機器上面),例如如下配置:
    只需要在我們配置好的zoo.cfg文件中進行更改:
tickTime=2000
dataDir=/home/myname/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.229.160:2888:3888
server.2=192.168.229.161:2888:3888
server.3=192.168.229.162:2888:3888
  • initLimit
    ZooKeeper集群模式下包含多個zk進程,其中一個進程為leader,余下的進程為follower。
    當follower最初與leader建立連接時,它們之間會傳輸相當多的數據,尤其是follower的數據落后leader很多。initLimit配置follower與leader之間建立連接后進行同步的最長時間。

  • syncLimit
    配置follower和leader之間發送消息,請求和應答的最大時間長度。

  • tickTime
    tickTime則是上述兩個超時配置的基本單位,例如對於initLimit,其配置值為5,說明其超時時間為 2000ms * 5 = 10秒。

  • server.id=host:port1:port2
    其中id為一個數字,表示zk進程的id,這個id也是dataDir目錄下myid文件的內容。
    host是該zk進程所在的IP地址,port1表示follower和leader交換消息所使用的端口,port2表示選舉leader所使用的端口。

  • dataDir
    其配置的含義跟單機模式下的含義類似,不同的是集群模式下還有一個myid文件。myid文件的內容只有一行,且內容只能為1 - 255之間的數字,這個數字亦即上面介紹server.id中的id,表示zk進程的id。

  • initLimit
    ZooKeeper集群模式下包含多個zk進程,其中一個進程為leader,余下的進程為follower。
    當follower最初與leader建立連接時,它們之間會傳輸相當多的數據,尤其是follower的數據落后leader很多。initLimit配置follower與leader之間建立連接后進行同步的最長時間。

  • syncLimit
    配置follower和leader之間發送消息,請求和應答的最大時間長度。

  • tickTime
    tickTime則是上述兩個超時配置的基本單位,例如對於initLimit,其配置值為5,說明其超時時間為 2000ms * 5 = 10秒。

  • server.id=host:port1:port2
    其中id為一個數字,表示zk進程的id,這個id也是dataDir目錄下myid文件的內容。
    host是該zk進程所在的IP地址,port1表示follower和leader交換消息所使用的端口,port2表示選舉leader所使用的端口。

  • dataDir
    其配置的含義跟單機模式下的含義類似,不同的是集群模式下還有一個myid文件。myid文件的內容只有一行,且內容只能為1 - 255之間的數字,這個數字亦即上面介紹server.id中的id,表示zk進程的id。

注意
如果僅為了測試部署集群模式而在同一台機器上部署zk進程,server.id=host:port1:port2配置中的port參數必須不同。但是,為了減少機器宕機的風險,強烈建議在部署集群模式時,將zk進程部署不同的物理機器上面。

我們打算在三台不同的機器 192.168.229.160,192.168.229.161,192.168.229.162上各部署一個zk進程,以構成一個zk集群。 三個zk進程均使用相同的 zoo.cfg 配置:

tickTime=2000
dataDir=/home/myname/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.229.160:2888:3888
server.2=192.168.229.161:2888:3888
server.3=192.168.229.162:2888:3888

在三台機器dataDir目錄( /home/myname/zookeeper 目錄)下,分別生成一個myid文件,其內容分別為1,2,3。然后分別在這三台機器上啟動zk進程,這樣我們便將zk集群啟動了起來:
可以使用以下命令來連接一個zk集群:
zkCli -server IP:端口,IP:端口,IP:端口
客戶端連接進程(連接上哪台機器的zk進程是隨機的),客戶端已成功連接上zk集群。

3.zookeeper配置

3.1 zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/apps/data/zookeeper
# the port at which the clients will connect
clientPort=2181
server.1=10.17.162.229:2888:3888
server.2=10.17.162.230:2888:3888
server.3=10.17.162.231:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1


本文教程參考:kafka:https://blog.csdn.net/dapeng1995/article/details/81536862,

kafka-manage:https://www.cnblogs.com/frankdeng/p/9584870.html,

zookeeper:https://blog.csdn.net/weixin_41558061/article/details/80597174

本文配置參考:https://www.jianshu.com/p/b9a8f20b0af6,


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM