table & sql api
1 maven依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
</dependency>
flink-table-api-java-bridge_2.11-1.8.0.jar:使用java編程語言對於DataStream/DataSet table & sql API的支持
flink-table-planner_2.11-1.8.0.jar:封裝了關於table和sql的操作,將算子轉換成可執行的flink job。pom文件中默認引入了flink-table-api-java-bridge_2.11-1.8.0.jar以及flink基礎的jar。
1.1 flink-table-api-java-bridge_2.11-1.8.0.jar pom文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
1.2 flink-table-planner_2.11-1.8.0.jar pom文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-compiler</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-compiler</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.7.9-6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>1.8.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>flink-test-utils-junit</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>curator-test</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_2.11</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.8.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>frocksdbjni</artifactId>
<groupId>com.data-artisans</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
2 代碼示例
-
數據源采用每隔1秒鍾生成一條隨機數據,數據結構為:
name sex age score createTime processingTime name-1 male 17 100 1578377656793 1578377656793 -
每隔10分鍾統計根據性別統計一次90分以上學生人數。
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.Tumble;
public class StreamTableDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStreamSource<Tuple5<String, String, Integer, Integer, Long>> dataStreamSource = env.addSource(new SourceFunction<Tuple5<String, String, Integer,Integer,Long>>() {
Random random = new Random();
int i=0;
@Override
public void run(SourceContext<Tuple5<String, String, Integer, Integer, Long>> ctx) throws Exception {
while (true) {
i++;
String name = "name-" + i;
String sex = random.nextBoolean() == true ? "male" : "female";
int age = random.nextInt(3) + 15;
int score = random.nextInt(100) + 1;
Long currentTime = System.currentTimeMillis();
ctx.collect(new Tuple5(name, sex, age,score,currentTime));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
}
}, "source");
tEnv.registerFunction("utc2local", new ScalarFunctionImpl());
tEnv.registerDataStream("students", dataStreamSource, "name, sex, age, score, createTime, processingTime.proctime");
// tEnv.registerTableSource("students", new StreamSourceTableTest());
// Table table = tEnv.sqlQuery("select sex," +
"utc2local(TUMBLE_START(processingTime, INTERVAL '10' MINUTE)) as wStart," +
"utc2local(TUMBLE_END(processingTime, INTERVAL '10' MINUTE)) as wEnd," +
"count(sex) FROM students WHERE score > 90 " +
"GROUP BY TUMBLE(processingTime, INTERVAL '10' MINUTE) , sex");
Table table = tEnv.scan("students")
.where("score > 90")
.window(Tumble.over("10.minutes").on("processingTme").as("w"))
.window(Over.partitionBy("school").orderBy("proctime").preceding("1.minutes").following("current_range").as("w"))
.groupBy("w, sex")
.select("sex, utc2local(w.start), utc2local(w.end), sex.count");
// table.writeToSink(new CsvTableSink(sinkPath, ",", 1, WriteMode.NO_OVERWRITE));
TypeInformation[] types = new TypeInformation[] {Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG};
tEnv.toAppendStream(table, Types.ROW(types)).print();
env.execute("table-stream-window-demo");
}
}
2.1 sql查詢語言
詳細示例:
sql語法
Table table = tEnv.sqlQuery(
"select sex,"
"utc2local(TUMBLE_START(processingTime, INTERVAL '10' MINUTE)) as wStart,"+
"utc2local(TUMBLE_END(processingTime, INTERVAL '10' MINUTE)) as wEnd," +
"count(sex) FROM students WHERE score >90 " +
"GROUP BY TUMBLE(processingTime, INTERVAL '10' MINUTE) , sex");
- 2.1.1 數據查詢語句DQL
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
- 2.1.2 數據操縱語句DML
insert:
INSERT INTO tableReference
query
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
2.2 調用table api
Table table = tEnv.scan("students")
.where("score > 90")
.window(Tumble.over("10.minutes").on("processingTime").as("w"))
.groupBy(<"w, sex")
.select("sex, utc2local(w.start), utc2local(w.end), sex.count");
2.3 實例最終gragh:
2.4 實例打印結果:
根據sex字段和窗口分區打印統計結果:
male,2020-01-07 13:40:00.0,2020-01-07 13:50:00.0,45
female,2020-01-07 13:40:00.0,2020-01-07 13:50:00.0,36
male,2020-01-07 13:50:00.0,2020-01-07 14:00:00.0,20
female,2020-01-07 13:50:00.0,2020-01-07 14:00:00.0,24