處理大並發量訂單處理的 KafKa部署總結
今天要介紹的是消息中間件KafKa,應該說是一個很牛的中間件吧,背靠Apache 與很多有名的中間件搭配起來用效果更好哦 ,為什么不用RabbitMQ,因為公司需要它。
網上已經有很多怎么用和用到哪的內容,但結果很多人都倒在了入門第一步 環境都搭不起來,可謂是從了解到放棄,所以在此特記錄如何在linux環境搭建,windows中配置一樣,只是啟動運行bat文件。
想要用它就先必須了解它能做什么及能做到什么程度,先看看它是什么吧。
當今社會各種應用系統諸如商業、社交、搜索、瀏覽等像信息工廠一樣不斷的生產出各種信息,在大數據時代,我們面臨如下幾個挑戰:
如何收集這些巨大的信息
如何分析它
如何及時做到如上兩點
以上幾個挑戰形成了一個業務需求模型,即生產者生產(produce)各種信息,消費者消費(consume)(處理分析)這些信息,而在生產者與消費者之間,需要一個溝通兩者的橋梁-消息系統。從一個微觀層面來說,這種需求也可理解為不同的系統之間如何傳遞消息。
kafka 應用場景
日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
消息系統:解耦和生產者和消費者、緩存消息等。
用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
流式處理:比如spark streaming和storm
事件源解耦 在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
冗余有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
擴展性 因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
靈活性 & 峰值處理能力 在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
可恢復性 系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
順序保證 在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。Kafka能保證一個Partition內的消息的有序性。
緩沖 在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩沖有助於控制和優化數據流經過系統的速度。
用於數據流 在一個分布式系統里,要得到一個關於用戶操作會用多長時間及其原因的總體印象,是個巨大的挑戰。消息系列通過消息被處理的頻率,來方便的輔助確定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。
異步通信 很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
Kafka主要特點:
同時為發布和訂閱提供高吞吐量。據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
可進行持久化操作。將消息持久化到磁盤,因此可用於批量消費,例如ETL,以及實時應用程序。通過將數據持久化到硬盤以及replication防止數據丟失。
分布式系統,易於向外擴展。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴展機器。
消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
支持online和offline的場景。
Kafka的架構:
Kafka 的整體架構非常簡單,是顯式分布式架構,producer、broker(kafka)和consumer都可以有多個。 Producer,consumer實現Kafka注冊的接口,數據從producer發送到broker,broker承擔一個中間緩存和分發的作用。 broker分發注冊到系統中的consumer。broker的作用類似於緩存,即活躍的數據和離線處理系統之間的緩存。客戶端和服務器端的通信,是基 於簡單,高性能,且與編程語言無關的TCP協議。
幾個基本概念:
- Topic:特指Kafka處理的消息源(feeds of messages)的不同分類。
- Partition:Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。
- Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發布一些消息。
- Producers:消息和數據生產者,向Kafka的一個topic發布消息的過程叫做producers。
- Consumers:消息和數據消費者,訂閱topics並處理其發布的消息的過程叫做consumers。
- Broker:緩存代理,Kafa集群中的一台或多台服務器統稱為broker。
消息發送的流程:
- Producer根據指定的partition方法(round-robin、hash等),將消息發布到指定topic的partition里面
- kafka集群接收到Producer發過來的消息后,將其持久化到硬盤,並保留消息指定時長(可配置),而不關注消息是否被消費。
- Consumer從kafka集群pull數據,並控制獲取消息的offset
環境
CentOS7.0
kafka_2.11-0.10.1.0版本
用root用戶安裝
Java環境,最好是最新版本的。
安裝Zookeeper
分布式時多機間要確保能正常通訊,關閉防火牆或讓涉及到的端口通過。
下載
去官網下載 :http://kafka.apache.org/downloads.html 選擇最新版本(在此下載編譯好的包,不要下載src源碼包)。
下載后放進CentOS中的/usr/local/ 文件夾中,並解壓到當前文件中 /usr/local/kafka212
安裝配置
由於Kafka集群需要依賴ZooKeeper集群來協同管理,所以需要事先搭建好ZK集群。
安裝之前先要啟動zookeeper,如何安裝可參考之前的 一步到位分布式開發Zookeeper實現集群管理 一文
將壓縮文件夾壓到當前文件夾
tar -zxvf kafka_2.12-0.10.2.0.tgz,產生文件夾kafka_2.12-0.10.2.0 並更改名為kafka212
進入config目錄,修改server.properties文件
vi server.properties
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣 port=9092 #當前kafka對外提供服務的端口默認是9092 log.dirs=//usr/local/kafka212/kafka-logs zookeeper.connect=192.168.80.32:2181,192.168.80.33:2181,192.168.80.30:2181
注意:
broker.id=0 值每台服務器上的都不一樣
啟動
首先啟動獨立的ZK集群,三台都要啟動(./zkServer.sh start) 進入到kafka的bin目錄,然后啟動服務 ./kafka-server-start.sh ../config/server.properties (三台服務器都要啟動) 或啟動daemon守護進程后台程序 進入到kafka的bin目錄 ./kafka-server-start.sh -daemon ../config/server.properties
驗證啟動進程
jps
顯示包含
25014 QuorumPeerMain
25778 Kafka
使用客戶端進入zk
[root@H32 bin]# zkCli.sh -server 192.168.80.32:2181
查看目錄情況
[zk: 192.168.80.32:2181(CONNECTED) 0] ls /
顯示
[controller_epoch, controller, brokers, zookeeper, test, admin, isr_change_notification, consumers, config]
創建一個topic:
[root@H32 bin]# ./kafka-topics.sh --create --zookeeper 192.168.80.32:2181,192.168.80.33:2181,192.168.80.30:2181 --replication-factor 3 --partitions 1 --topic hotnews --replication-factor 2 #復制兩份 --partitions 1 #創建1個分區 --topic #主題
查看topic狀態:
[root@H32 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.80.32:2181,192.168.80.33:2181,192.168.80.30:2181 --topic hotnews
發送消息:
[root@H32 bin]# ./kafka-console-producer.sh --broker-list 192.168.80.32:9092,192.168.80.33:9092,192.168.80.30:9092 --topic hotnews
接收消息:
[root@H32 bin]# ./kafka-console-consumer.sh --zookeeper 192.168.80.32:2181,192.168.80.33:2181,192.168.80.30:2181 --topic hotnews --from-beginning
NET-KafKa編程
對於net來說需要相應的插件才能與之通訊,網上比較推薦的是 rdkafka-dotnet 有需要的可以git中下載demo。
擴展
找到為0的leader的進程,並殺死
[root@bin /]# ps -ef | grep kafka kill -9 25285
啟動各服務器上的kafka后,有機器訪問主機時出現:
WARN Fetching topic metadata with correlation id 5 for topics [Set(hotnews)] from broker [BrokerEndPoint(1,H33,9092)] failed (kafka.client.ClientUtils$)
這里需要關閉機器的防火牆或將9092加入防火牆。
Kafka在分布式設計中有着相當重要的作用,算是一個基礎工具,因此需要不斷的學習了解與實踐,如何處理大並發訂單這只是一種場景。
這里留有一個問題:如何確定Kafka的分區數、key和consumer線程數