https://blog.csdn.net/liuxiangke0210/article/details/74010951
https://yq.aliyun.com/articles/166
一、pipelineDB默認的用戶不是postgres而是pipeline。
pipeline=# \c You are now connected to database "pipeline" as user "steven".
進入數據庫 命令:pipeline pipeline
[steven@steven1 ~]$ pipeline pipeline pipeline (9.5.3) Type "help" for help. pipeline=#
創建一個流 stream,一個stream就是一個FDW,其實不存儲任何數據。
pipeline=# create stream stream_test(x integer, y integer, z text); CREATE FOREIGN TABLE
查看流結構
pipeline=# \d stream_test;
Foreign table "public.stream_test"
Column | Type | Modifiers | FDW Options
-------------------+--------------------------+-----------+-------------
x | integer | |
y | integer | |
z | text | |
arrival_timestamp | timestamp with time zone | |
Server: pipelinedb
創建一個CONTINUOUS 連續視圖
pipeline=# create continuous view v_sum as select sum (x + y) from stream_test; CREATE VIEW pipeline=# create continuous view v_group as select count(*) as coun,x,y,z from stream_test group by x,y,z; CREATE VIEW
pipeline=# create continuous view v_single as select x,z from stream_test; CREATE VIEW
stream 只能被continuous查詢,如果直接查詢會報錯,被告知只能被continous view讀取。
查看continues views結構
pipeline=# \d v_group
View "public.v_group"
Column | Type | Modifiers
--------+---------+-----------
coun | bigint |
x | integer |
y | integer |
z | text |
pipeline=# \d v_single
View "public.v_single"
Column | Type | Modifiers
--------+---------+-----------
x | integer |
z | text |
創建好continuous,會附帶創建一些別的東西。
pipeline=# \d
List of relations
Schema | Name | Type | Owner
--------+------------------+---------------+--------
public | v | view | steven
public | v_group | view | steven
public | v_group_mrel | table | steven
public | v_group_osrel | foreign table | steven
public | v_group_seq | sequence | steven
public | v_mrel | table | steven
public | v_osrel | foreign table | steven
public | v_seq | sequence | steven
public | v_single | view | steven
public | v_single_mrel | table | steven
public | v_single_osrel | foreign table | steven
public | v_single_seq | sequence | steven
public | v_sum | view | steven
public | v_sum_mrel | table | steven
public | v_sum_osrel | foreign table | steven
public | v_sum_seq | sequence | steven
(34 rows)
v_group 這個跟數據庫中普通的View很類似,不存儲任何東西,可以把他理解成一個materialized view,並且是非常高吞吐量,realtime的物化視圖。
*_mrel,這個就是存儲具體數據的,跟pg中的物理表是一樣一樣的。上面的cv就是這個物理表的一個殼子,不過這個物理表存儲的內容可能是HLL格式。
*_seq,這個是給物理表創建的一個PK,看看cv_mrel發現默認會有個$pk字段。
*cv_osrel 這個是internal relation representing an output stream
插入數據到stream
pipeline=# insert into stream_test (x,y,z) values(1,2,'a'),(3,4,'b'),(5,6,'c'),(7,8,'d'),(1,2,'a'); INSERT 0 5
查詢
pipeline=# select * from v_sum;
sum
-----
39
(1 row)
pipeline=# select * from v_group;
coun | x | y | z
------+---+---+---
1 | 7 | 8 | d
1 | 5 | 6 | c
2 | 1 | 2 | a
1 | 3 | 4 | b
(4 rows)
pipeline=# select * from v_group_mrel;
coun | x | y | z | $pk
------+---+---+---+-----
1 | 7 | 8 | d | 1
1 | 5 | 6 | c | 2
2 | 1 | 2 | a | 3
1 | 3 | 4 | b | 4
(4 rows)
cv跟cv_mrel只是多了個$pk,這是在普通情況下,數據是這樣的,如果做agg可能數據存儲為HLL格式.
滑動窗口
我們來看看滑動窗口,在流計算中,窗口是個很重要的東西,例如最近5分鍾,最近1小時,最近1天的匯總。
1、創建一個流,列名time,數據類型timestamp;
pipeline=# create stream sliding (time timestamp);
2、創建一個滑動窗口(流動視圖)
pipeline=# create continuous view cv_sliding with(sw='1 minute') as select time from sliding; CREATE VIEW
3、插入一條當前時間數據
pipeline=# insert into sliding(time) values(now()); INSERT 0 1
4、查詢
pipeline=# select * from cv_sliding;
time
----------------------------
2018-05-18 08:46:58.771057
(1 row)
5、過一會再插入兩條時間數據,再次查詢
pipeline=# insert into sliding(time) values(now()); INSERT 0 1 pipeline=# insert into sliding(time) values(now()); INSERT 0 1
pipeline=# select * from cv_sliding;
time
----------------------------
2018-05-18 08:46:58.771057
2018-05-18 08:47:22.253052
2018-05-18 08:47:29.265144
(3 rows)
可以看到三條數據
6、過一會查詢,少了一條,再過一會全部消失
pipeline=# select * from cv_sliding;
time
----------------------------
2018-05-18 08:47:22.253052
2018-05-18 08:47:29.265144
(2 rows)
pipeline=# select * from cv_sliding; time ------ (0 rows)
ttl功能
pipeline=# create continuous view v_ttl with (ttl = '10 minute',ttl_column= 'minute') as select minute(arrival_timestamp), count(*) from sliding group by minute; CREATE VIEW
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# select * from v_ttl;
minute | count
------------------------+-------
2018-05-18 09:04:00+00 | 4
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# select * from v_ttl;
minute | count
------------------------+-------
2018-05-18 09:04:00+00 | 4
2018-05-18 09:06:00+00 | 1
(2 rows)
transform
1、創建流和相對應的流動視圖
pipeline=# create stream str1(x bigint,y text,z timestamp); CREATE FOREIGN TABLE pipeline=# create stream str2(x bigint,y text,z timestamp); CREATE FOREIGN TABLE pipeline=# create continuous view cv_1 as select x,y,z from str1; CREATE VIEW pipeline=# create continuous view cv_2 as select x,y,z from str2; CREATE VIEW pipeline=#
2、創建transform
pipeline=# create continuous transform tran_1 as select x,y,z from str1 then execute procedure pipeline_stream_insert('str2');
CREATE VIEW
pipeline=# insert into str1(x,y,z) values(1,'hi,i from str1',now());
INSERT 0 1
pipeline=# select * from cv_1;
x | y | z
---+----------------+---------------------------
1 | hi,i from str1 | 2018-05-18 09:21:01.11329
(1 row)
pipeline=# select * from cv_2;
x | y | z
---+----------------+---------------------------
1 | hi,i from str1 | 2018-05-18 09:21:01.11329
(1 row)
在創建Transform用到的pipeline_stream_insert是PipelineDB自己提供的一個函數,這個我們可以自己定義一個函數。
pipeline=# create table t(x bigint,y text,z timestamp); CREATE TABLE pipeline=# CREATE OR REPLACE FUNCTION insert_into_t() pipeline-# RETURNS trigger AS pipeline-# $$ pipeline$# BEGIN pipeline$# INSERT INTO t (x, y,z) VALUES (NEW.x, NEW.y,NEW.z); pipeline$# RETURN NEW; pipeline$# END; pipeline$# $$ pipeline-# LANGUAGE plpgsql; CREATE FUNCTION pipeline=# CREATE CONTINUOUS TRANSFORM tran_t AS pipeline-# SELECT x,y,z FROM str1 pipeline-# THEN EXECUTE PROCEDURE insert_into_t(); CREATE CONTINUOUS TRANSFORM pipeline=# insert into str1(x,y,z) values(10,'I want insert table t',now()); INSERT 0 1 pipeline=# select * from t; x | y | z ----+-----------------------+--------------------------- 10 | I want insert table t | 2017-05-15 14:01:48.17516 (1 row) 自己寫了一個trigger,然后把數據插入到表T中。
