隨着Flink 1.10的發布,對SQL的支持也非常強大。Flink 還提供了 MySql, Hive,ES, Kafka等連接器Connector,所以使用起來非常方便。
接下來咱們針對構建流式SQL應用文章的梗概如下:
1. 搭建流式SQL應用所需要的環境准備。
2. 構建一個按每小時進行統計購買量的應用。
3. 構建每天以10分鍾的粒度進行統計應用。
4. 構建按分類進行排行,取出想要的結果應用。
1. 搭建流式應用所需要的環境准備
Kafka,用於將數據寫入到Kafka中,然后Flink通過讀取Kafka的數據然后再進行處理。版本號:2.11。
MySQL, 用於保存數據的分類。Flink從中讀取分類進行處理和計算 。版本號:8.0.15。
ElasticSearch, 用於保存結果數據和進行索引存儲。下載的時候可以在搜索引擎里邊搜索“elasticsearch 國內”,這樣就可以從國內快速下載,要不然下載的太慢了。版本號:7.6.0。
Kibana, 用於ES的結果展示,圖形化的界面美觀。 下載的時候也需要搜索“Kibana 國內”,比較快速。版本號:7.6.0。
Flink, 核心的流處理程序,版本號:1.10。Flink支持國內鏡像下載,這個到時候可以自行找一下。
Zookeeper, Kafka依賴這個應用,所以也會用到的,這個什么版本都是可以的。我的版本號:3.4.12。
當然我的是mac電腦,如果是mac電腦的話,下載ES和Kibana的時候要下載文件中帶“darwin”字樣的,可以在Mac中使用其他的不能執行。應該是程序里邊的編譯不同,這個也是一個小坑。
因為Flink需要連接Mysql, Elasticseratch , Kafka,所以也需要提前下載Flink所需要的Connector jar包到Flink的lib里邊。
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
環境都准備好了,那么需要把環境都啟動起來,進行檢查。
Elasticsearch啟動好了之后需要訪問這個網址沒有問題,說明成功了:http://localhost:9200/_cluster/health?pretty。
Flink啟動好之后需要訪問 http://localhost:8081 會有界面展示。
Kibana 啟動好了之后訪問:http://127.0.0.1:5601/ 會有界面展示。當然Kibana在目錄conf/kibana.yml里邊需要把ES的地址給打開。
Zookeeper 這個相信很多同學都會配置了,如果有不會配置的,可以自己搜索一下。
我們先看一下最后的效果圖,可能不是特別好,是這么個意思。
2. 構建一個按每個小時統計購買量應用。
我們寫一個程序,往Kafka里邊寫數據,模擬一些連續的數據源頭。
首先定義一個Pojo類。
package myflink.pojo; public class UserBehavior { //用戶ID public long userId; //商品ID public long itemId; //商品類目ID public int categoryId; //用戶行為,包括{"pv","buy","cart", "fav"} public String behavior; //行為發生的時間戳,單位秒 public String ts; public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } public long getItemId() { return itemId; } public void setItemId(long itemId) { this.itemId = itemId; } public int getCategoryId() { return categoryId; } public void setCategoryId(int categoryId) { this.categoryId = categoryId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public String getTimestamp() { return ts; } public void setTimestamp(String ts) { this.ts = ts; } }
接着寫一個往Kafka寫數據的類。隨機生成用於的行為,里邊包括用戶的id,類目id等。讓程序運行起來。
package myflink.kafka; import com.alibaba.fastjson.JSON; import myflink.pojo.UserBehavior; import org.apache.commons.lang3.RandomUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * @author huangqingshi * @Date 2020-03-15 */ public class KafkaWriter { //本地的kafka機器列表 public static final String BROKER_LIST = "localhost:9092"; //kafka的topic public static final String TOPIC_USER_BEHAVIOR = "user_behaviors"; //key序列化的方式,采用字符串的形式 public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; //value的序列化的方式 public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String[] BEHAVIORS = {"pv","buy","cart", "fav"}; private static KafkaProducer<String, String> producer; public static void writeToKafka() throws Exception{ //構建userBehavior, 數據都是隨機產生的 int randomInt = RandomUtils.nextInt(0, 4); UserBehavior userBehavior = new UserBehavior(); userBehavior.setBehavior(BEHAVIORS[randomInt]); Long ranUserId = RandomUtils.nextLong(1, 10000); userBehavior.setUserId(ranUserId); int ranCate = RandomUtils.nextInt(1, 100); userBehavior.setCategoryId(ranCate); Long ranItemId = RandomUtils.nextLong(1, 100000); userBehavior.setItemId(ranItemId); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); userBehavior.setTimestamp(sdf.format(new Date())); //轉換為json String userBehaviorStr = JSON.toJSONString(userBehavior); //包裝成kafka發送的記錄 ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER_BEHAVIOR, null, null, userBehaviorStr); //發送到緩存 producer.send(record); System.out.println("向kafka發送數據:" + userBehaviorStr); //立即發送 producer.flush(); } public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", BROKER_LIST); props.put("key.serializer", KEY_SERIALIZER); props.put("value.serializer", VALUE_SERIALIZER); producer = new KafkaProducer<>(props); while(true) { try { //每一秒寫一條數據 TimeUnit.SECONDS.sleep(1); writeToKafka(); } catch (Exception e) { e.printStackTrace(); } } } }
本地idea Console 輸出的結果是這樣的:
向kafka發送數據:{"behavior":"buy","categoryId":7,"itemId":75902,"timestamp":"2020-03-15T11:35:11Z","ts":"2020-03-15T11:35:11Z","userId":4737}
我們將Flink的任務數調整成10個,也就是同時執行的任務數。 位置在 conf/flink-conf.yaml,taskmanager.numberOfTaskSlots: 10,然后重啟下。我的已經啟動並且運行了3個任務,看下圖:
我們接下來運行Flink 內置的客戶端。命令: bin/sql-client.sh embedded,這樣我們就開始了Flink SQL之旅了。我們使用Flink的DDL,從Kafka里邊讀取數據,采用ProcessingTime的時間事件進行處理,為ts設置水位線,允許5秒延遲。更多參考 時間屬性 和 Flink DDL。里邊的Kafka 連接以及相關的配置,相信大家都不是很陌生。
CREATE TABLE user_behavior ( userId BIGINT, itemId BIGINT, categoryId BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通過計算列產生一個處理時間列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定義watermark,ts成為事件時間列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behaviors', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json' -- 數據源格式為 json );
接下來我們使用select來看一下Flink的數據,執行語句:select * from user_behavior,會出現如下圖。同時SQL上面還支持 show tables、describe user_behavior 等操作。
我們需要將結果放入Elasticsearch,這樣也比較簡單,我們還通過DDL來創建一個表。我們只需要一個語句,就可以實現連接Elasticsearch(后邊簡稱ES)並且創建相應的Type和Index了。不需要自己再去創建一次,是不是很簡單,哈。里邊有兩個字段,一個是每天的小時數,一個是購買的統計量。當有數據寫入這個表的時候,那么就會將數據寫入到ES上,非常方便。
CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector 'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本 'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址 'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相當於數據庫的表名 'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當於數據庫的庫名 'connector.bulk-flush.max-actions' = '1', -- 每條數據都刷新 'format.type' = 'json', -- 輸出數據格式 json 'update-mode' = 'append' );
每個小時的購買量,那么我們需要的是使用滾動窗口,Tumbling Window,那么使用TUMBLE_START函數,另外我們還需要獲取ts中的小時數,那么需要使用HOUR函數。將所有behavior為buy的寫入到這個表中。
INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
這個時候看Flink里邊的任務中會出現這個任務,因為是持續不斷的進行處理的。執行過程中如果有數據的話,那么會將數據寫到表 buy_cnt_per_hour,同時也會將數據寫到ES里邊。
下面我們來配置一下Kinbana來將結果進行展示,訪問 http://localhost:5601, 然后選擇左邊菜單的“Management”,然后選擇 “Index Patterns” -> “Create Index Pattern”, 輸入我們剛才創建的Index: “buy_cnt_per_hour”。可以通過左側的“Discover”按鈕就可以看到我們的數據了。
我們繼續點擊左側的“Dashboard”按鈕,創建一個“用戶行為日志分析”的Dashboard。 進入左側的 “Visualize” - “Create Visualization" 選擇“Area”圖,Bucket的按我下邊截圖左下進行配置和選擇。
保存后添加到Dashboard即可。這樣就從數據源頭到數據展示就構建完成了,是不是很快~
3. 構建每天以10分鍾的粒度進行統計獨立用戶數應用。
我們繼續使用DDL創建Flink的表以及對應的ES的Index。
CREATE TABLE cumulative_uv ( time_str STRING, uv BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'cumulative_uv', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );
創建好了需要將ts進行分解出來小時和分鍾,通過一個視圖,這個視圖和數據庫的視圖類似,不存儲數據,也不占用Flink的執行Task。首先將ts格式化,然后轉換成時間:小時:分鍾,分鍾后邊沒有0,結尾需要補個0。然后統計不同的用戶數需要使用DISTINCT函數和COUNT函數。還有使用Over Window功能,也就是從之前的數據到現在,以處理時間升序把數據按Window的功能來進行統計。直白的將就是有一條數據的話就會將數據處理, 然后有一條數據比當前最大值大的話會保留最大值。當前窗口是以每10分鍾為一個窗口。
CREATE VIEW uv_per_10min AS SELECT MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, COUNT(DISTINCT userId) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
這個視圖主要是數據比較多,只需要每10分鍾一個點其實就滿足要求了,那么現在我們需要做的就是再將數據處理一下即可寫入ES。
INSERT INTO cumulative_uv SELECT time_str, MAX(uv) FROM uv_per_10min GROUP BY time_str;
這樣ES里邊就會有新的index產生,下一步我們在kibana里邊創建一個 index pattern, 輸入index “cumulative_uv”,接下來到 “Visualize”里邊創建一個 Visualization ,名為“累計獨立用戶數”,表選擇“Line”類型的圖標,其他指標和我下圖配置的一樣即可。
累計獨立用戶數也創建好了。
4. 構建按分類進行排行,取出想要的結果應用。
接下來我們需要按主類目進行統計和排序。因為子類目非常多。
首先我們需要准備一個mysql, 然后創建好表。簡單些幾條對應的類目關系,當然可以根據自己所生成的數據進行自行寫入一些對應的關系表。
create table category ( sub_category_id bigint(20), parent_category_id bigint(20) );
insert into category(sub_category_id, parent_category_id) values(1038, 1);
insert into category(sub_category_id, parent_category_id) values(91244, 1);
insert into category(sub_category_id, parent_category_id) values(44712, 1);
insert into category(sub_category_id, parent_category_id) values(2,2);
insert into category(sub_category_id, parent_category_id) values(3,3);
insert into category(sub_category_id, parent_category_id) values(4,4);
insert into category(sub_category_id, parent_category_id) values(5,5);
insert into category(sub_category_id, parent_category_id) values(6,6);
insert into category(sub_category_id, parent_category_id) values(7,7);
insert into category(sub_category_id, parent_category_id) values(8,8);
insert into category(sub_category_id, parent_category_id) values(9,9);
定義一個Flink表,數據從Mysql獲取,用於進行類目關系關聯。
CREATE TABLE category_dim ( sub_category_id BIGINT, -- 子類目 parent_category_id BIGINT -- 頂級類目 ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink', 'connector.table' = 'category', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );
創建ES的index,用於存儲統計后的結果。
CREATE TABLE top_category ( category_name STRING, -- 類目名稱 buy_cnt BIGINT -- 銷量 ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'top_category', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );
接下來還是創建一個視圖,將表和類目關聯起來,方便后邊的統計結果。使用的是 Temporal Join。
CREATE VIEW rich_user_behavior AS SELECT U.userId, U.itemId, U.behavior, CASE C.parent_category_id WHEN 1 THEN '服飾鞋包' WHEN 2 THEN '家裝家飾' WHEN 3 THEN '家電' WHEN 4 THEN '美妝' WHEN 5 THEN '母嬰' WHEN 6 THEN '3C數碼' WHEN 7 THEN '運動戶外' WHEN 8 THEN '食品' ELSE '其他' END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.categoryId = C.sub_category_id;
將類型為“buy”的寫入到表,同時也就是寫入了ES里邊,然后ES里邊的index-top_category 也就有了數據了。
INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name;
我們繼續在Kibana里邊創建一個index pattern,輸入“top_category”,然后visualize里邊創建一個visualization 名為類目排行榜。詳細的配置可參考如下。
好了整個的過程計算創建完了。
通過使用Flink 1.10以及對應的Connector, 實現了對Mysql, Kafka, Elasticsearch 的快速連接,更快的達到的我們想要實現的效果。
里邊涉及到往kafka里邊寫數據可參考工程:https://github.com/stonehqs/flink-demo