Apache Kafka Connect - 2019完整指南


今天,我們將討論Apache Kafka Connect。此Kafka Connect文章包含有關Kafka Connector類型的信息,Kafka Connect的功能和限制。此外,我們將了解Kafka Connect及其配置的必要性。與此同時,我們將討論不同的模式和Rest API。
在本Kafka Connect教程中,我們將研究如何將數據從外部系統導入Apache Kafka主題,以及如何將數據從Kafka主題導出到外部系統,我們還有另一個Apache Kafka項目組件,即Kafka Connect。但是,有關Kafka Connect的更多信息。
那么,讓我們開始Kafka Connect。

Apache Kafka Connect

Apache Kafka Connect - 完整指南

1.什么是Kafka Connect?

我們使用Apache Kafka Connect在Apache Kafka和其他系統之間流式傳輸數據,既可靠又可靠。此外,connect使得快速定義Kafka連接器變得非常簡單,這些連接器可以將大量數據移入和移出Kafka。
Kafka Connect收集指標或將整個數據庫從應用程序服務器收集到Kafka主題中。它可以為流處理提供低延遲的可用數據。

卡夫卡連接

工作 - Apache Kafka Connect

2. Kafka Connect功能

Kafka Connect有以下功能:

卡夫卡連接

Kafka Connect - 功能

一個。Kafka連接器的通用框架
它標准化了其他數據系統與Kafka的集成。此外,還簡化了連接器的開發,部署和管理。
灣 分布式和獨立模式
擴展到支持整個組織的大型集中管理服務,或者擴展到開發,測試和小型生產部署。
C。REST界面
通過易於使用的REST API,我們可以提交和管理Kafka Connect集群的連接器。
d。自動偏移管理
但是,即使只有連接器的一些信息,Kafka Connect也可以自動管理偏移提交過程。因此,連接器開發人員無需擔心連接器開發中容易出錯的部分。
即 默認情況下分布式和可伸縮性
它基於現有的組管理協議。為了擴展Kafka Connect集群,我們可以添加更多工作人員。
F。流媒體/批處理集成
我們可以說,為了橋接流媒體和批處理數據系統,Kafka Connect是一個理想的解決方案。
Apache Kafka工作流程| Kafka Pub-Sub Messaging

3.為什么選擇Kafka Connect?

正如我們所知,像F lume一樣,有許多工具能夠寫入Kafka或從Kafka讀取,或者也可以導入和導出數據。所以,問題出現了,為什么我們需要Kafka Connect。因此,我們列出了主要優勢:

卡夫卡連接

為什么Kafka Connect-需要Kafka

一個。失敗后自動恢復

對於每個記錄,“源”連接器可以附加它傳遞給Kafka Connect的任意“源位置”信息。因此,在發生故障時,Kafka Connect會自動將此信息提供給連接器。通過這種方式,它可以在失敗的地方恢復。此外,“接收器”連接器的自動恢復更加容易。

灣 自動故障轉移

可以進行自動故障轉移,因為Kafka Connect節點構建了一個Kafka群集。這意味着如果假設一個節點發生故障,它正在進行的工作將重新分配給其他節點。

C。簡單並行

連接器可以定義數據導入或導出任務,尤其是並行執行的任務。

4. Kafka Connect Concepts

  • 在子線程中執行連接器及其相關任務的操作系統進程(基於Java)就是我們所說的Kafka Connect 工作者
  • 此外,有一個對象定義了一個或多個任務的參數,這些任務實際上應該執行導入或導出數據的工作,我們稱之為連接器
  • 要從某些任意輸入讀取並寫入Kafka, 連接器將生成任務。
  • 為了從Kafka讀取並寫入一些任意輸出,接收連接器生成任務。

測試你對卡夫卡的了解程度

但是,我們可以說Kafka Connect不是重要數據轉換的選擇。盡管如此,為了定義基本數據轉換,最新版本的Kafka Connect允許連接器的配置參數。然而,對於“源”連接器,此函數認為任務將其輸入轉換為AVRO或JSON格式; 在將記錄寫入Kafka主題之前應用轉換。而且,雖然它涉及“接收器”連接器,但此函數認為輸入Kafka主題上的數據已經是AVRO或JSON格式。

5. Kafka Connect的依賴性

Kafka Connect節點需要連接到Kafka消息代理群集,無論是以獨立模式還是分布式模式運行。
基本上,對於分布式模式,沒有其他依賴項。即使連接器配置設置存儲在Kafka消息主題中,Kafka Connect節點也完全無狀態。由於這個,Kafka Connect節點,它變得非常適合通過技術運行。
雖然要存儲“當前位置”和連接器配置,但我們需要少量的本地磁盤存儲,用於獨立模式。
你必須閱讀有關Kafka排隊的信息

6.分布式模式

通過使用Kafka Broker地址,我們可以啟動Kafka Connect工作器實例(即java進程),“內部使用”的幾個Kafka主題的名稱和“組ID”參數。通過“內部使用”Kafka主題,每個工作者實例與屬於同一組ID的其他工作者實例進行協調。這里,一切都是通過Kafka消息代理完成的,不需要其他外部協調機制(沒有Zookeeper等)。
工作人員之間(通過主題)就如何在可用的工作人員組中分發連接器和任務集進行協商。如果工作進程死亡,則會重新平衡群集,以便在剩余的工作人員上公平地分配工作。如果新員工開始工作,則重新平衡可確保其接管現有員工的一些工作。

7.獨立模式

我們可以說,它只是分布式模式,其中工作者實例在Kafka消息代理中不使用內部主題此進程運行所有指定的連接器及其生成的任務本身(作為線程)。
由於獨立模式將當前源偏移存儲在本地文件中,因此它不會使用Kafka Connect“內部主題”進行存儲。作為命令行選項,以獨立模式提供有關要執行的連接器的信息。
此外,在此模式下,運行連接器可以對生產系統有效; 通過這種方式,我們傳統上執行大多數ETL樣式的工作負載。
因此,我們再次以傳統方式管理故障轉移 - 例如,通過腳本啟動備用實例。
一個。啟動工人
工作者實例只是一個Java進程。通常,它是通過提供的shell腳本啟動的。然后,從其CLASSPATH中,worker實例將加載連接器配置指定的任何自定義連接器。對於獨立模式,配置在命令行上提供,對於從Kafka主題讀取的分布式模式。
對於啟動Kafka Connect工作程序,還有一個標准的Docker容器映像。因此,可以啟動此映像的任意數量的實例,並且只要它們配置了相同的Kafka消息代理群集和group-id,它們也將自動聯合在一起。

8. REST API

基本上,每個工作器實例都啟動一個嵌入式Web服務器。因此,通過它,它為狀態查詢和配置公開了一個REST API。此外,對於處於分布式模式的工作人員,通過此REST API上載的配置將保存在內部Kafka消息代理主題中。但是,對於獨立模式的工作程序,配置REST API不相關。
通過包裝工作者REST API,Confluent控制中心提供了大量的Kafka-connect-management UI。
要定期獲取系統狀態,Nagios或REST調用可能會執行Kafka Connect守護程序的監視。
我們來討論Apache Kafka + Spark Streaming Integration

9.卡夫卡連接器類型

通過實現特定的Java接口,可以創建連接器。我們有一套現有的連接器,或者我們可以為我們編寫自定義連接器的設施。
它的工作者只是希望它執行的任何連接器和任務類的實現都存在於其類路徑中。但是,如果沒有子類加載器的好處,此代碼將直接加載到應用程序,OSGi框架或類似代碼中。 
“Confluent開源版”下載包中有幾個連接器,它們是:

  • JDBC
  • HDFS
  • S3
  • Elasticsearch

但是,沒有辦法單獨下載這些連接器,但我們可以從Confluent Open Source中提取它們,因為它們是開源的,我們也可以下載並將其復制到標准的Kafka安裝中。 

10.配置Kafka Connect

通常,使用指向包含工作程序實例選項的config-file的命令行選項,每個工作程序實例都會啟動。例如,Kafka消息代理詳細信息,group-id。
但是,還會在獨立模式下為工作者提供指向定義要執行的連接器的配置文件的命令行選項。然而,每個工作人員都以分布式模式從Kafka主題(在工作器配置文件中指定)中檢索連接器/任務配置。 
此外,工作進程在獨立模式下為狀態檢查等提供REST API。
此外,要暫停和恢復連接器,我們可以使用REST API。
請務必注意,配置選項“key.converter”和“value.converter”選項不是特定於連接器的,它們是特定於工作者的。

11.從Kafka Connect Workers到Kafka Brokers的聯系

從Kafka Connect Workers到Kafka Brokers的聯系

出於管理目的,每個工作程序以分布式模式建立與Kafka消息代理群集的連接。但是,在worker配置文件中,我們將這些設置定義為“頂級”設置。
此外,為每個連接器建立了與Kafka消息代理群集的單獨連接(套接字集)。許多設置都是從​​“頂級”Kafka設置繼承的,但是可以使用配置前綴“consumer。”(由sink使用)或“producer。”(由源使用)覆蓋它們以便使用不同的Kafka消息代理承載生產數據的連接與攜帶管理消息的連接的網絡設置。 

12.標准JDBC源連接器

連接器中心站點列出了JDBC源連接器,此連接器是Confluent Open Source下載的一部分。此外,請確保我們無法單獨下載,因此對於已安裝Apache的“純”Kafka軟件包而非Confluent軟件包的用戶,必須從Confluent軟件包中提取此連接器並將其復制。
它有各種配置選項:

  • 要掃描的數據庫,指定為JDBC URL。
  • 輪詢間隔。
  • 一個正則表達式,指定要監視的表; 對於每個表,都有一個單獨的Kafka主題。 
  • 具有“遞增id” SQL列,在這種情況下,連接器可以檢測新記錄(選擇id> last-known-id)。
  • 具有更新時間戳的SQL列,在這種情況下,連接器可以檢測新的/已修改的記錄(選擇時間戳> last-known-timestamp的位置)。

奇怪的是,盡管連接器顯然具有復制多個表的能力,但是“遞增id”和“時間戳”列名是全局的 - 即,當復制多個表時,它們必須遵循相同的命名約定。列。

13. Kafka Connect Security

基本上,使用Kerberos安全的Kafka消息代理,Kafka Connect(v0.10.1.0)工作得非常好。通過與這些代理的SSL加密連接也可以正常工作。
但是,通過Kerberos或SSL,無法保護Kafka Connect節點公開的REST API; 但是,有一個功能請求。因此,在配置安全集群時,必須配置外部代理(例如Apache HTTP)以充當REST服務的安全網關。
看看Apache Kafka Security | Kafka的需求和組成部分

14. Kafka Connect的限制

除此之外,Kafka Connect也有一些限制:

  • 目前,連接器的選擇非常少。
  • 商業和開源功能的分離非常差。
  • 此外,它缺乏配置工具。
  • 要部署自定義連接器(插件),有一種糟糕/原始的方法。
  • 它非常以Java / Scala為中心。

因此,目前,它感覺更像是一個“工具包”,而不是當前的打包解決方案 - 至少在沒有購買商業工具的情況下。
所以,這就是Apache Kafka Connect。希望你喜歡我們的解釋。

15.總結

因此,我們已經看到了Kafka Connect 的整個概念此外,我們已經了解了Kafka connect的好處。但是,如果有任何疑問,請隨時在評論部分詢問。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM