1.概述
大多數情況下,我們使用 Kafka 只是作為消息處理。在有些情況下,我們需要多次讀取 Kafka 集群中的數據。當然,我們可以通過調用 Kafka 的 API 來完成,但是針對不同的業務需求,我們需要去編寫不同的接口,在經過編譯,打包,發布等一系列流程。最后才能看到我們預想的結果。那么,我們能不能有一種簡便的方式去實現這一部分功能,通過編寫 SQL 的方式,來可視化我們的結果。今天,筆者給大家分享一些心得,通過使用 SQL 的形式來完成這些需求。
2.內容
實現這些功能,其架構和思路並不復雜。這里筆者將整個實現流程,通過一個原理圖來呈現。如下圖所示:
這里筆者給大家詳述一下上圖的含義,消息數據源存放與 Kafka 集群當中,開啟低階和高階兩個消費線程,將消費的結果以 RPC 的方式共享出去(即:請求者)。數據共享出去后,回流經到 SQL 引擎處,將內存中的數據翻譯成 SQL Tree,這里使用到了 Apache 的 Calcite 項目來承擔這一部分工作。然后,我們通過 Thrift 協議來響應 Web Console 的 SQL 請求,最后將結果返回給前端,讓其以圖表的實行可視化。
3.插件配置
這里,我們需要遵循 Calcite 的 JSON Models,比如,針對 Kafka 集群,我們需要配置一下內容:
{ version: '1.0', defaultSchema: 'kafka', schemas: [ { name: 'kafka', type: 'custom', factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory', operand: { database: 'kafka_db' } } ] }
另外,這里最好對表也做一個表述,配置內容如下所示:
[ { "table":"Kafka", "schemas":{ "_plat":"varchar", "_uid":"varchar", "_tm":"varchar", "ip":"varchar", "country":"varchar", "city":"varchar", "location":"jsonarray" } } ]
4.操作
下面,筆者給大家演示通過 SQL 來操作相關內容。相關截圖如下所示:
在查詢處,填寫相關 SQL 查詢語句。點擊 Table 按鈕,得到如下所示結果:
我們,可以將獲取的結果以報表的形式進行導出。
當然,我們可以在 Profile 模塊下,瀏覽查詢歷史記錄和當前正在運行的查詢任務。至於其他模塊,都屬於輔助功能(展示集群信息,Topic 的 Partition 信息等)這里就不多贅述了。
5.總結
分析下來,整體架構和實現的思路都不算太復雜,也不存在太大的難點,需要注意一些實現上的細節,比如消費 API 針對集群消息參數的調整,特別是低階消費 API,尤為需要注意,其 fetch_size 的大小,以及 offset 是需要我們自己維護的。在使用 Calcite 作為 SQL 樹時,我們要遵循其 JSON Model 和標准的 SQL 語法來操作數據源。
6.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!