背景
kafka早期作為一個日志消息系統,很受運維歡迎的,配合ELK玩起來很happy,在kafka慢慢的轉向流式平台的過程中,開發也慢慢介入了,一些業務系統也開始和kafka對接起來了,也還是很受大家歡迎的,由於業務需要,一部分小白也就免不了接觸kafka了,這些小白總是會安奈不住好奇心,要精確的查看kafka中的某一條數據,作為服務提供方,我也很方啊,該怎么懟?業務方不敢得罪啊,只能寫consumer去消費,然后人肉查詢。
需求
有什么方法能直接查詢kafka中已有的數據呢?那時候presto就映入眼簾了,初步探索后發現presto確實強大,和我們在用的impala有的一拼,支持的數據源也更多,什么redis、mongo、kafka都可以用sql來查詢,真是救星啊,這樣那群小白就可以直接使用presto來查詢里面的數據了。不過presto在不開發插件的情況下,對kafka的數據有格式要求,支持json、avro。關於presto的調研見presto實戰。但是我只是想用sql查詢kafka,而presto功能過於強大,必然整個框架就顯得比較厚重了,功能多嘛。有什么輕量級的工具呢?
介紹
某一天,kafka的親兒子KSQL就誕生了,KSQL是一個用於Apache kafka的流式SQL引擎,KSQL降低了進入流處理的門檻,提供了一個簡單的、完全交互式的SQL接口,用於處理Kafka的數據,可以讓我們在流數據上持續執行 SQL 查詢,KSQL支持廣泛的強大的流處理操作,包括聚合、連接、窗口、會話等等。
KSQL在內部使用Kafka的Streams API,並且它們共享與Kafka流處理相同的核心抽象,KSQL有兩個核心抽象,它們對應於到Kafka Streams中的兩個核心抽象,讓你可以處理kafka的topic數據。關於這兩個核心抽象下章節解讀。
架構
部署架構

由一個KSQL服務器進程執行查詢。一組KSQL進程可以作為集群運行。可以通過啟動更多的KSQL實例來動態添加更多的處理能力。這些KSQL實例是容錯的,如果一個實例失敗了,其他的就會接管它的工作。查詢是使用交互式的KSQL命令行客戶端啟動的,該客戶端通過REST API向集群發送命令。命令行允許檢查可用的stream和table,發出新的查詢,檢查狀態並終止正在運行的查詢。KSQL內部是使用Kafka的stream API構建的,它繼承了它的彈性可伸縮性、先進的狀態管理和容錯功能,並支持Kafka最近引入的一次性處理語義。KSQL服務器將此嵌入到一個分布式SQL引擎中(包括一些用於查詢性能的自動字節代碼生成)和一個用於查詢和控制的REST API。
處理架構

抽象概念
KSQL簡化了流應用程序,它集成了stream和table的概念,允許使用表示現在發生的事件的stream來連接表示當前狀態的table。 Apache Kafka中的一個topic可以表示為KSQL中的STREAM或TABLE,具體取決於topic處理的預期語義。下面看看兩個核心的解讀。
stream:流是無限制的結構化數據序列,stream中的fact是不可變的,這意味着可以將新fact插入到stream中,但是現有fact永遠不會被更新或刪除。 stream可以從Kafka topic創建,或者從現有的stream和table中派生。
table:一個table是一個stream或另一個table的視圖,它代表了一個不斷變化的fact的集合,它相當於傳統的數據庫表,但通過流化等流語義來豐富。表中的事實是可變的,這意味着可以將新的事實插入到表中,現有的事實可以被更新或刪除。可以從Kafka主題中創建表,也可以從現有的流和表中派生表。
部署
ksql支持kafka0.11之后的版本,在confluent的V3和V4版本中默認並沒有加入ksql server程序,當然V3和V4是支持ksql的,在V5版本中已經默認加入ksql了,為了方便演示,我們使用confluent kafka V5版本演示,zk和kafka也是單實例啟動。
下載
wget https://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz
tar zxvf confluent-oss-5.0.0-2.11.tar.gz -C /opt/programs/confluent_5.0.0
啟動zk
cd /opt/programs/confluent_5.0.0 bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties
啟動kafka
cd /opt/programs/confluent_5.0.0 bin/kafka-server-start -daemon etc/kafka/server.properties
創建topic和data
confluent自帶了一個ksql-datagen工具,可以創建和產生相關的topic和數據,ksql-datagen可以指定的參數如下:
[bootstrap-server=<kafka bootstrap server(s)> (defaults to localhost:9092)]
[quickstart=<quickstart preset> (case-insensitive; one of 'orders', 'users', or 'pageviews')] schema=<avro schema file> [schemaRegistryUrl=<url for Confluent Schema Registry> (defaults to http://localhost:8081)] format=<message format> (case-insensitive; one of 'avro', 'json', or 'delimited') topic=<kafka topic name> key=<name of key column> [iterations=<number of rows> (defaults to 1,000,000)] [maxInterval=<Max time in ms between rows> (defaults to 500)] [propertiesFile=<file specifying Kafka client properties>]
創建pageviews,數據格式為delimited
cd /opt/programs/confluent_5.0.0/bin ./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
ps:以上命令會源源不斷在stdin上輸出數據,就是工具自己產生的數據,如下樣例
8001 --> ([ 1539063767860 | 'User_6' | 'Page_77' ]) ts:1539063767860 8011 --> ([ 1539063767981 | 'User_9' | 'Page_75' ]) ts:1539063767981 8021 --> ([ 1539063768086 | 'User_5' | 'Page_16' ]) ts:1539063768086
不過使用consumer消費出來的數據是如下樣式
1539066430530,User_5,Page_29
1539066430915,User_6,Page_74
1539066431192,User_4,Page_28
1539066431621,User_6,Page_38
1539066431772,User_7,Page_29
1539066432122,User_8,Page_34
創建users,數據格式為json
cd /opt/programs/confluent_5.0.0/bin ./ksql-datagen quickstart=users format=json topic=users maxInterval=100
ps:以上命令會源源不斷在stdin上輸出數據,就是工具自己產生的數據,如下樣例
User_5 --> ([ 1517896551436 | 'User_5' | 'Region_5' | 'MALE' ]) ts:1539063787413 User_7 --> ([ 1513998830510 | 'User_7' | 'Region_4' | 'MALE' ]) ts:1539063787430 User_6 --> ([ 1514865642822 | 'User_6' | 'Region_2' | 'MALE' ]) ts:1539063787481
不過使用consumer消費出來的數據是如下樣式
{"registertime":1507118206666,"userid":"User_6","regionid":"Region_7","gender":"OTHER"} {"registertime":1506192314325,"userid":"User_1","regionid":"Region_1","gender":"MALE"} {"registertime":1489277749526,"userid":"User_6","regionid":"Region_4","gender":"FEMALE"} {"registertime":1497188917765,"userid":"User_9","regionid":"Region_3","gender":"OTHER"} {"registertime":1493121964253,"userid":"User_4","regionid":"Region_3","gender":"MALE"} {"registertime":1515609444511,"userid":"User_5","regionid":"Region_9","gender":"FEMALE"}
啟動ksql
cd /opt/programs/confluent_5.0.0 bin/ksql-server-start -daemon etc/ksql/ksql-server.properties
連接ksql
cd /opt/programs/confluent_5.0.0 bin/ksql http://10.205.151.145:8088

創建stream和table
stream
根據topic pageviews創建一個stream pageviews_original,value_format為DELIMITED
ksql>CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \
(kafka_topic='pageviews', value_format='DELIMITED');

table
根據topic users創建一個table users_original,value_format為json
ksql>CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH \
(kafka_topic='users', value_format='JSON', key = 'userid');

查詢數據
ksql> SELECT * FROM USERS_ORIGINAL LIMIT 3;
ksql> SELECT * FROM pageviews_original LIMIT 3;

ps:ksql默認是從kafka最新的數據查詢消費的,如果你想從開頭查詢,則需要在會話上進行設置:SET 'auto.offset.reset' = 'earliest';
持久化查詢
持久化查詢可以源源不斷的把查詢出的數據發送到你指定的topic中去,查詢的時候在select前面添加create stream關鍵字即可創建持久化查詢。
創建查詢
ksql> CREATE STREAM pageviews2 AS SELECT userid FROM pageviews_original;

查詢新stream
ksql> SHOW STREAMS;

ps:可以看到新創建了stream PAGEVIEWS2,並且創建了topic PAGEVIEWS2
查詢執行任務
ksql> SHOW QUERIES;

ps:可以看到ID為CSAS_PAGEVIEWS2_0的任務在執行,並且有顯示執行的語句
消費新數據
cd /opt/programs/confluent_5.0.0/bin ./kafka-console-consumer --bootstrap-server 10.205.151.145:9092 --from-beginning --topic PAGEVIEWS2

ps:可以看到PAGEVIEWS2 topic里面正是我們通過select篩選出來的數據
終止查詢任務
ksql> TERMINATE CSAS_PAGEVIEWS2_0;

