flink table sql兩種寫法


table & sql api

1 maven依賴

     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-table-planner_2.11</artifactId>
       <version>1.8.0</version>
       <scope>provided</scope>
     </dependency>
     
     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>1.8.0</version>
        <scope>provided</scope>
     </dependency>

flink-table-api-java-bridge_2.11-1.8.0.jar:使用java編程語言對於DataStream/DataSet table & sql API的支持
flink-table-planner_2.11-1.8.0.jar:封裝了關於table和sql的操作,將算子轉換成可執行的flink job。pom文件中默認引入了flink-table-api-java-bridge_2.11-1.8.0.jar以及flink基礎的jar。

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.8.0</version>
      <scope>provided</scope>
      <exclusions>
        <exclusion>
          <artifactId>scala-reflect</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
        <exclusion>
          <artifactId>scala-compiler</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.8.0</version>
      <scope>provided</scope>
      <exclusions>
        <exclusion>
          <artifactId>scala-reflect</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
        <exclusion>
          <artifactId>scala-compiler</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-shaded-jackson</artifactId>
      <version>2.7.9-6.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep_2.11</artifactId>
      <version>1.8.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils_2.11</artifactId>
      <version>1.8.0</version>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <artifactId>flink-test-utils-junit</artifactId>
          <groupId>org.apache.flink</groupId>
        </exclusion>
        <exclusion>
          <artifactId>curator-test</artifactId>
          <groupId>org.apache.curator</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-tests_2.11</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>1.8.0</version>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <artifactId>frocksdbjni</artifactId>
          <groupId>com.data-artisans</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime_2.11</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>

2 代碼示例

  • 數據源采用每隔1秒鍾生成一條隨機數據,數據結構為:

    name sex age score createTime processingTime
    name-1 male 17 100 1578377656793 1578377656793
  • 每隔10分鍾統計根據性別統計一次90分以上學生人數。

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.Tumble;

public class StreamTableDemo {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
         
         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
         
         DataStreamSource<Tuple5<String, String, Integer, Integer, Long>>  dataStreamSource = env.addSource(new SourceFunction<Tuple5<String, String, Integer,Integer,Long>>() {
                Random random = new Random();
                int i=0;
                @Override
                public void run(SourceContext<Tuple5<String, String, Integer, Integer, Long>> ctx) throws Exception {
                     while (true) {
                           i++;
                           String name = "name-" + i;
                           String sex = random.nextBoolean() == true ? "male" : "female";
                           int age = random.nextInt(3) + 15;
                           int score = random.nextInt(100) + 1;
                           Long currentTime = System.currentTimeMillis();
                           ctx.collect(new Tuple5(name, sex, age,score,currentTime));
                           TimeUnit.SECONDS.sleep(1);
                     }
                }
                @Override
                public void cancel() {                   
                }
         }, "source");
         tEnv.registerFunction("utc2local", new ScalarFunctionImpl());

         tEnv.registerDataStream("students", dataStreamSource, "name, sex, age, score, createTime, processingTime.proctime");

//       tEnv.registerTableSource("students", new StreamSourceTableTest());

//	 Table table = tEnv.sqlQuery("select sex," +
	    			     "utc2local(TUMBLE_START(processingTime, INTERVAL '10' MINUTE)) as wStart," +
	    			     "utc2local(TUMBLE_END(processingTime, INTERVAL '10' MINUTE)) as wEnd," + 
	    			     "count(sex) FROM students WHERE score > 90 " + 
	    			     "GROUP BY TUMBLE(processingTime, INTERVAL '10' MINUTE) , sex");


         Table table = tEnv.scan("students")
                           .where("score > 90")
                           .window(Tumble.over("10.minutes").on("processingTme").as("w"))
                           .window(Over.partitionBy("school").orderBy("proctime").preceding("1.minutes").following("current_range").as("w"))
                           .groupBy("w, sex")
                           .select("sex, utc2local(w.start), utc2local(w.end), sex.count");
//       table.writeToSink(new CsvTableSink(sinkPath, ",", 1, WriteMode.NO_OVERWRITE));

         TypeInformation[] types = new TypeInformation[] {Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG};
         tEnv.toAppendStream(table, Types.ROW(types)).print();
         env.execute("table-stream-window-demo");
     }

}

2.1 sql查詢語言

詳細示例:
sql語法

Table table = tEnv.sqlQuery(
                            "select sex,"
                            "utc2local(TUMBLE_START(processingTime, INTERVAL '10' MINUTE)) as wStart,"+
                            "utc2local(TUMBLE_END(processingTime, INTERVAL '10' MINUTE)) as wEnd," + 
                            "count(sex) FROM students WHERE score >90 " + 
                            "GROUP BY TUMBLE(processingTime, INTERVAL '10' MINUTE) , sex");

  • 2.1.1 數據查詢語句DQL

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  
selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'

measureColumn:
      expression AS alias

pattern:
      patternTerm [ '|' patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'
  • 2.1.2 數據操縱語句DML
insert:
  INSERT INTO tableReference
  query
  
query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]
2.2 調用table api
Table  table = tEnv.scan("students")
                   .where("score > 90")
                   .window(Tumble.over("10.minutes").on("processingTime").as("w"))
                   .groupBy(<"w, sex")
                   .select("sex, utc2local(w.start), utc2local(w.end), sex.count");
2.3 實例最終gragh:

2.4 實例打印結果:

根據sex字段和窗口分區打印統計結果:

male,2020-01-07 13:40:00.0,2020-01-07 13:50:00.0,45
female,2020-01-07 13:40:00.0,2020-01-07 13:50:00.0,36
male,2020-01-07 13:50:00.0,2020-01-07 14:00:00.0,20
female,2020-01-07 13:50:00.0,2020-01-07 14:00:00.0,24
2.5 sql算子語法:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM