Kafka kSQL sql查詢


背景

    kafka早期作為一個日志消息系統,很受運維歡迎的,配合ELK玩起來很happy,在kafka慢慢的轉向流式平台的過程中,開發也慢慢介入了,一些業務系統也開始和kafka對接起來了,也還是很受大家歡迎的,由於業務需要,一部分小白也就免不了接觸kafka了,這些小白總是會安奈不住好奇心,要精確的查看kafka中的某一條數據,作為服務提供方,我也很方啊,該怎么懟?業務方不敢得罪啊,只能寫consumer去消費,然后人肉查詢。

需求

    有什么方法能直接查詢kafka中已有的數據呢?那時候presto就映入眼簾了,初步探索后發現presto確實強大,和我們在用的impala有的一拼,支持的數據源也更多,什么redis、mongo、kafka都可以用sql來查詢,真是救星啊,這樣那群小白就可以直接使用presto來查詢里面的數據了。不過presto在不開發插件的情況下,對kafka的數據有格式要求,支持json、avro。關於presto的調研見presto實戰。但是我只是想用sql查詢kafka,而presto功能過於強大,必然整個框架就顯得比較厚重了,功能多嘛。有什么輕量級的工具呢?

介紹

    某一天,kafka的親兒子KSQL就誕生了,KSQL是一個用於Apache kafka的流式SQL引擎,KSQL降低了進入流處理的門檻,提供了一個簡單的、完全交互式的SQL接口,用於處理Kafka的數據,可以讓我們在流數據上持續執行 SQL 查詢,KSQL支持廣泛的強大的流處理操作,包括聚合、連接、窗口、會話等等。

    KSQL在內部使用Kafka的Streams API,並且它們共享與Kafka流處理相同的核心抽象,KSQL有兩個核心抽象,它們對應於到Kafka Streams中的兩個核心抽象,讓你可以處理kafka的topic數據。關於這兩個核心抽象下章節解讀。

架構

    部署架構

        由一個KSQL服務器進程執行查詢。一組KSQL進程可以作為集群運行。可以通過啟動更多的KSQL實例來動態添加更多的處理能力。這些KSQL實例是容錯的,如果一個實例失敗了,其他的就會接管它的工作。查詢是使用交互式的KSQL命令行客戶端啟動的,該客戶端通過REST API向集群發送命令。命令行允許檢查可用的stream和table,發出新的查詢,檢查狀態並終止正在運行的查詢。KSQL內部是使用Kafka的stream API構建的,它繼承了它的彈性可伸縮性、先進的狀態管理和容錯功能,並支持Kafka最近引入的一次性處理語義。KSQL服務器將此嵌入到一個分布式SQL引擎中(包括一些用於查詢性能的自動字節代碼生成)和一個用於查詢和控制的REST API。

    處理架構

抽象概念

    KSQL簡化了流應用程序,它集成了stream和table的概念,允許使用表示現在發生的事件的stream來連接表示當前狀態的table。 Apache Kafka中的一個topic可以表示為KSQL中的STREAM或TABLE,具體取決於topic處理的預期語義。下面看看兩個核心的解讀。

        stream:流是無限制的結構化數據序列,stream中的fact是不可變的,這意味着可以將新fact插入到stream中,但是現有fact永遠不會被更新或刪除。 stream可以從Kafka topic創建,或者從現有的stream和table中派生。

        table:一個table是一個stream或另一個table的視圖,它代表了一個不斷變化的fact的集合,它相當於傳統的數據庫表,但通過流化等流語義來豐富。表中的事實是可變的,這意味着可以將新的事實插入到表中,現有的事實可以被更新或刪除。可以從Kafka主題中創建表,也可以從現有的流和表中派生表。

部署

    ksql支持kafka0.11之后的版本,在confluent的V3和V4版本中默認並沒有加入ksql server程序,當然V3和V4是支持ksql的,在V5版本中已經默認加入ksql了,為了方便演示,我們使用confluent kafka V5版本演示,zk和kafka也是單實例啟動。

    下載

wget https://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz
tar zxvf confluent-oss-5.0.0-2.11.tar.gz -C /opt/programs/confluent_5.0.0

    啟動zk

cd /opt/programs/confluent_5.0.0 bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

    啟動kafka

cd /opt/programs/confluent_5.0.0 bin/kafka-server-start -daemon etc/kafka/server.properties

    創建topic和data

        confluent自帶了一個ksql-datagen工具,可以創建和產生相關的topic和數據,ksql-datagen可以指定的參數如下:

[bootstrap-server=<kafka bootstrap server(s)> (defaults to localhost:9092)] 
[quickstart=<quickstart preset> (case-insensitive; one of 'orders', 'users', or 'pageviews')] schema=<avro schema file> [schemaRegistryUrl=<url for Confluent Schema Registry> (defaults to http://localhost:8081)] format=<message format> (case-insensitive; one of 'avro', 'json', or 'delimited') topic=<kafka topic name> key=<name of key column> [iterations=<number of rows> (defaults to 1,000,000)] [maxInterval=<Max time in ms between rows> (defaults to 500)] [propertiesFile=<file specifying Kafka client properties>]

        創建pageviews,數據格式為delimited

cd /opt/programs/confluent_5.0.0/bin ./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500

            ps:以上命令會源源不斷在stdin上輸出數據,就是工具自己產生的數據,如下樣例

8001 --> ([ 1539063767860 | 'User_6' | 'Page_77' ]) ts:1539063767860 8011 --> ([ 1539063767981 | 'User_9' | 'Page_75' ]) ts:1539063767981 8021 --> ([ 1539063768086 | 'User_5' | 'Page_16' ]) ts:1539063768086

            不過使用consumer消費出來的數據是如下樣式

1539066430530,User_5,Page_29
1539066430915,User_6,Page_74
1539066431192,User_4,Page_28
1539066431621,User_6,Page_38
1539066431772,User_7,Page_29
1539066432122,User_8,Page_34

        創建users,數據格式為json

cd /opt/programs/confluent_5.0.0/bin ./ksql-datagen quickstart=users format=json topic=users maxInterval=100

        ps:以上命令會源源不斷在stdin上輸出數據,就是工具自己產生的數據,如下樣例

User_5 --> ([ 1517896551436 | 'User_5' | 'Region_5' | 'MALE' ]) ts:1539063787413 User_7 --> ([ 1513998830510 | 'User_7' | 'Region_4' | 'MALE' ]) ts:1539063787430 User_6 --> ([ 1514865642822 | 'User_6' | 'Region_2' | 'MALE' ]) ts:1539063787481

        不過使用consumer消費出來的數據是如下樣式

{"registertime":1507118206666,"userid":"User_6","regionid":"Region_7","gender":"OTHER"} {"registertime":1506192314325,"userid":"User_1","regionid":"Region_1","gender":"MALE"} {"registertime":1489277749526,"userid":"User_6","regionid":"Region_4","gender":"FEMALE"} {"registertime":1497188917765,"userid":"User_9","regionid":"Region_3","gender":"OTHER"} {"registertime":1493121964253,"userid":"User_4","regionid":"Region_3","gender":"MALE"} {"registertime":1515609444511,"userid":"User_5","regionid":"Region_9","gender":"FEMALE"}

    啟動ksql

cd /opt/programs/confluent_5.0.0 bin/ksql-server-start -daemon etc/ksql/ksql-server.properties

    連接ksql

cd /opt/programs/confluent_5.0.0 bin/ksql http://10.205.151.145:8088

   

    創建stream和table

        stream

            根據topic pageviews創建一個stream pageviews_original,value_format為DELIMITED

ksql>CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \
(kafka_topic='pageviews', value_format='DELIMITED');

        table

            根據topic users創建一個table users_original,value_format為json

ksql>CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH \
(kafka_topic='users', value_format='JSON', key = 'userid');

    查詢數據

ksql> SELECT * FROM USERS_ORIGINAL LIMIT 3;
ksql> SELECT * FROM pageviews_original LIMIT 3;

        ps:ksql默認是從kafka最新的數據查詢消費的,如果你想從開頭查詢,則需要在會話上進行設置:SET 'auto.offset.reset' = 'earliest';

    持久化查詢

        持久化查詢可以源源不斷的把查詢出的數據發送到你指定的topic中去,查詢的時候在select前面添加create stream關鍵字即可創建持久化查詢。

        創建查詢

ksql> CREATE STREAM pageviews2 AS SELECT userid FROM pageviews_original;

        查詢新stream

ksql> SHOW STREAMS;

            ps:可以看到新創建了stream PAGEVIEWS2,並且創建了topic PAGEVIEWS2

        查詢執行任務

ksql> SHOW QUERIES;

            ps:可以看到ID為CSAS_PAGEVIEWS2_0的任務在執行,並且有顯示執行的語句

        消費新數據

cd /opt/programs/confluent_5.0.0/bin ./kafka-console-consumer --bootstrap-server 10.205.151.145:9092 --from-beginning --topic PAGEVIEWS2

            ps:可以看到PAGEVIEWS2 topic里面正是我們通過select篩選出來的數據

        終止查詢任務

ksql> TERMINATE CSAS_PAGEVIEWS2_0;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM