pipelinedb--流、滑動窗口測試


https://blog.csdn.net/liuxiangke0210/article/details/74010951

https://yq.aliyun.com/articles/166

一、pipelineDB默認的用戶不是postgres而是pipeline。

pipeline=# \c
You are now connected to database "pipeline" as user "steven".

  

進入數據庫 命令:pipeline  pipeline

[steven@steven1 ~]$ pipeline pipeline
pipeline (9.5.3)
Type "help" for help.

pipeline=#

  

創建一個流 stream,一個stream就是一個FDW,其實不存儲任何數據。

pipeline=# create stream stream_test(x integer, y integer, z text);
CREATE FOREIGN TABLE

 

查看流結構

pipeline=# \d stream_test;
                   Foreign table "public.stream_test"
      Column       |           Type           | Modifiers | FDW Options
-------------------+--------------------------+-----------+-------------
 x                 | integer                  |           |
 y                 | integer                  |           |
 z                 | text                     |           |
 arrival_timestamp | timestamp with time zone |           |
Server: pipelinedb

  

  

創建一個CONTINUOUS 連續視圖

pipeline=# create continuous view v_sum as select sum (x + y) from stream_test;
CREATE VIEW
pipeline=# create continuous view v_group as select count(*) as coun,x,y,z from stream_test group by x,y,z;
CREATE VIEW
 
        
pipeline=# create continuous view v_single as select x,z from stream_test;
CREATE VIEW

  

  

stream 只能被continuous查詢,如果直接查詢會報錯,被告知只能被continous view讀取。

 

查看continues  views結構

pipeline=# \d v_group
    View "public.v_group"
 Column |  Type   | Modifiers
--------+---------+-----------
 coun   | bigint  |
 x      | integer |
 y      | integer |
 z      | text    |
pipeline=# \d v_single
    View "public.v_single"
 Column |  Type   | Modifiers
--------+---------+-----------
 x      | integer |
 z      | text    |

  

創建好continuous,會附帶創建一些別的東西。

pipeline=# \d
                 List of relations
 Schema |       Name       |     Type      | Owner
--------+------------------+---------------+--------
 public | v                | view          | steven
 public | v_group          | view          | steven
 public | v_group_mrel     | table         | steven
 public | v_group_osrel    | foreign table | steven
 public | v_group_seq      | sequence      | steven
 public | v_mrel           | table         | steven
 public | v_osrel          | foreign table | steven
 public | v_seq            | sequence      | steven
 public | v_single         | view          | steven
 public | v_single_mrel    | table         | steven
 public | v_single_osrel   | foreign table | steven
 public | v_single_seq     | sequence      | steven
 public | v_sum            | view          | steven
 public | v_sum_mrel       | table         | steven
 public | v_sum_osrel      | foreign table | steven
 public | v_sum_seq        | sequence      | steven
(34 rows)

 

v_group  這個跟數據庫中普通的View很類似,不存儲任何東西,可以把他理解成一個materialized view,並且是非常高吞吐量,realtime的物化視圖。

*_mrel,這個就是存儲具體數據的,跟pg中的物理表是一樣一樣的。上面的cv就是這個物理表的一個殼子,不過這個物理表存儲的內容可能是HLL格式。

*_seq,這個是給物理表創建的一個PK,看看cv_mrel發現默認會有個$pk字段。

*cv_osrel  這個是internal relation representing an output stream

 

插入數據到stream

pipeline=# insert into stream_test (x,y,z) values(1,2,'a'),(3,4,'b'),(5,6,'c'),(7,8,'d'),(1,2,'a');
INSERT 0 5

  

查詢

pipeline=# select * from v_sum;
 sum
-----
  39
(1 row)


pipeline=# select * from v_group;
 coun | x | y | z
------+---+---+---
    1 | 7 | 8 | d
    1 | 5 | 6 | c
    2 | 1 | 2 | a
    1 | 3 | 4 | b
(4 rows)
pipeline=# select * from v_group_mrel;
 coun | x | y | z | $pk
------+---+---+---+-----
    1 | 7 | 8 | d |   1
    1 | 5 | 6 | c |   2
    2 | 1 | 2 | a |   3
    1 | 3 | 4 | b |   4
(4 rows)

cvcv_mrel只是多了個$pk,這是在普通情況下,數據是這樣的,如果做agg可能數據存儲為HLL格式.

 

 

滑動窗口

我們來看看滑動窗口,在流計算中,窗口是個很重要的東西,例如最近5分鍾,最近1小時,最近1天的匯總。  

1、創建一個流,列名time,數據類型timestamp;

pipeline=# create stream sliding (time timestamp);

  

2、創建一個滑動窗口(流動視圖)

pipeline=# create continuous view cv_sliding with(sw='1 minute') as select time from sliding;
CREATE VIEW

  

3、插入一條當前時間數據

pipeline=# insert into sliding(time) values(now());
INSERT 0 1

  

4、查詢

pipeline=# select * from cv_sliding;
            time
----------------------------
 2018-05-18 08:46:58.771057     
(1 row)

  

5、過一會再插入兩條時間數據,再次查詢

pipeline=# insert into sliding(time) values(now());
INSERT 0 1
pipeline=# insert into sliding(time) values(now());
INSERT 0 1

  

pipeline=# select * from cv_sliding;
            time
----------------------------
 2018-05-18 08:46:58.771057
 2018-05-18 08:47:22.253052
 2018-05-18 08:47:29.265144
(3 rows)

  可以看到三條數據

 

6、過一會查詢,少了一條,再過一會全部消失

pipeline=# select * from cv_sliding;
            time
----------------------------
 2018-05-18 08:47:22.253052
 2018-05-18 08:47:29.265144
(2 rows)

  

pipeline=# select * from cv_sliding;
 time
------
(0 rows)

  

ttl功能

pipeline=# create continuous view v_ttl with (ttl = '10 minute',ttl_column= 'minute') as select minute(arrival_timestamp), count(*) from sliding group by minute;
CREATE VIEW

  

 

pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1

pipeline=# select * from v_ttl;
         minute         | count
------------------------+-------
 2018-05-18 09:04:00+00 |     4

  

pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# select * from v_ttl;
         minute         | count
------------------------+-------
 2018-05-18 09:04:00+00 |     4
 2018-05-18 09:06:00+00 |     1
(2 rows)

  

 

transform

1、創建流和相對應的流動視圖

pipeline=# create stream str1(x bigint,y text,z timestamp);
CREATE FOREIGN TABLE
pipeline=# create stream str2(x bigint,y text,z timestamp);
CREATE FOREIGN TABLE
pipeline=# create continuous view cv_1 as select x,y,z from str1;
CREATE VIEW
pipeline=# create continuous view cv_2 as select x,y,z from str2;
CREATE VIEW
pipeline=#

  

2、創建transform

pipeline=# create continuous transform tran_1 as select x,y,z from str1 then execute procedure pipeline_stream_insert('str2');
CREATE VIEW
pipeline=# insert into str1(x,y,z) values(1,'hi,i from str1',now());
INSERT 0 1
pipeline=# select * from cv_1;
 x |       y        |             z
---+----------------+---------------------------
 1 | hi,i from str1 | 2018-05-18 09:21:01.11329
(1 row)

pipeline=# select * from cv_2;
 x |       y        |             z
---+----------------+---------------------------
 1 | hi,i from str1 | 2018-05-18 09:21:01.11329
(1 row)

  

在創建Transform用到的pipeline_stream_insertPipelineDB自己提供的一個函數,這個我們可以自己定義一個函數。

pipeline=# create table t(x bigint,y text,z timestamp);

CREATE TABLE

 

pipeline=# CREATE OR REPLACE FUNCTION insert_into_t()

pipeline-#   RETURNS trigger AS

pipeline-#   $$

pipeline$#   BEGIN

pipeline$#     INSERT INTO t (x, y,z) VALUES (NEW.x, NEW.y,NEW.z);

pipeline$#     RETURN NEW;

pipeline$#   END;

pipeline$#   $$

pipeline-#   LANGUAGE plpgsql;

CREATE FUNCTION

 

pipeline=# CREATE CONTINUOUS TRANSFORM tran_t AS

pipeline-#   SELECT x,y,z FROM str1

pipeline-#   THEN EXECUTE PROCEDURE insert_into_t();

CREATE CONTINUOUS TRANSFORM

 

pipeline=# insert into str1(x,y,z) values(10,'I want insert table t',now());

INSERT 0 1

pipeline=# select * from t;

 x  |           y           |             z

----+-----------------------+---------------------------

 10 | I want insert table t | 2017-05-15 14:01:48.17516

(1 row)

 

自己寫了一個trigger,然后把數據插入到表T中。

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM