Kafka - SQL 引擎分享


1.概述

  大多數情況下,我們使用 Kafka 只是作為消息處理。在有些情況下,我們需要多次讀取 Kafka 集群中的數據。當然,我們可以通過調用 Kafka 的 API 來完成,但是針對不同的業務需求,我們需要去編寫不同的接口,在經過編譯,打包,發布等一系列流程。最后才能看到我們預想的結果。那么,我們能不能有一種簡便的方式去實現這一部分功能,通過編寫 SQL 的方式,來可視化我們的結果。今天,筆者給大家分享一些心得,通過使用 SQL 的形式來完成這些需求。

2.內容

  實現這些功能,其架構和思路並不復雜。這里筆者將整個實現流程,通過一個原理圖來呈現。如下圖所示:

  這里筆者給大家詳述一下上圖的含義,消息數據源存放與 Kafka 集群當中,開啟低階和高階兩個消費線程,將消費的結果以 RPC 的方式共享出去(即:請求者)。數據共享出去后,回流經到 SQL 引擎處,將內存中的數據翻譯成 SQL Tree,這里使用到了 Apache 的 Calcite 項目來承擔這一部分工作。然后,我們通過 Thrift 協議來響應 Web Console 的 SQL 請求,最后將結果返回給前端,讓其以圖表的實行可視化。

3.插件配置

  這里,我們需要遵循 Calcite 的 JSON Models,比如,針對 Kafka 集群,我們需要配置一下內容:

{
    version: '1.0',
    defaultSchema: 'kafka',  
    schemas: [  
        {
            name: 'kafka',  
            type: 'custom',
            factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory',  
            operand: {
                database: 'kafka_db'
            }  
        } 
    ]
}

  另外,這里最好對表也做一個表述,配置內容如下所示:

[
    {
        "table":"Kafka",
        "schemas":{
            "_plat":"varchar",
            "_uid":"varchar",
            "_tm":"varchar",
            "ip":"varchar",
            "country":"varchar",
            "city":"varchar",
            "location":"jsonarray"
        }
    }
]

4.操作

  下面,筆者給大家演示通過 SQL 來操作相關內容。相關截圖如下所示:

  在查詢處,填寫相關 SQL 查詢語句。點擊 Table 按鈕,得到如下所示結果:

  我們,可以將獲取的結果以報表的形式進行導出。

  當然,我們可以在 Profile 模塊下,瀏覽查詢歷史記錄和當前正在運行的查詢任務。至於其他模塊,都屬於輔助功能(展示集群信息,Topic 的 Partition 信息等)這里就不多贅述了。

5.總結

  分析下來,整體架構和實現的思路都不算太復雜,也不存在太大的難點,需要注意一些實現上的細節,比如消費 API 針對集群消息參數的調整,特別是低階消費 API,尤為需要注意,其 fetch_size 的大小,以及 offset 是需要我們自己維護的。在使用 Calcite 作為 SQL 樹時,我們要遵循其 JSON Model 和標准的 SQL 語法來操作數據源。

6.結束語

這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM