(五)Flink Table API 編程


首先:flink根據使用的便捷性提供了三種API,自下而上是:

Table API & SQL

1、聲明行:用戶只關心做什么,不用關心怎么做

2、高性能:支持性能優化,可以獲取更好的執行性能

3、流批統一:相同的統計邏輯,既可以流模式運行,也可以批模式運行

4、性能穩定:語義遵循SQL標准,不易變動

5、易理解:語義明確,所見即所得

Table API:tab.groupBy("word").select("word,count(1) as count")

SQL:SELECT word,COUNT(*) as cnt FROM MyTable GROUP BY word

Table API 特點:

1、Table API使得多聲明的數據處理起來比較容易

  例如:我們把a大於10的數據存xxx的外部表,同時需要把a小於10的數據插入到外部表yyy,我們是使用TableAPI很方便。

    Table.filter(a>10).insertInto("xxx")

    Table.filter(a<10).insertInto("yyy")

2、TableAPI使得擴展標准SQL更容易(當且僅當需要的時候)

Table API 編程:

1、WordCount示例: 

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaStreamWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath();
        tEnv.connect(new FileSystem().path(path))
            .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
            .withSchema(new Schema().field("word", Types.STRING))
            .inAppendMode()
            .registerTableSource("fileSource");

        Table result = tEnv.scan("fileSource")
            .groupBy("word")
            .select("word, count(1) as count");

        tEnv.toRetractStream(result, Row.class).print();
        env.execute();
    }
}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaStreamWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        String path = JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath();
        tEnv.connect(new FileSystem().path(path))
            .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
            .withSchema(new Schema().field("word", Types.STRING))
            .inAppendMode()
            .registerTableSource("fileSource");

        Table result = tEnv.scan("fileSource")
            .groupBy("word")
            .select("word, count(1) as count");

        tEnv.toRetractStream(result, Row.class).print();
        env.execute();
    }
}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaBatchWordCount {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath();
        tEnv.connect(new FileSystem().path(path))
            .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
            .withSchema(new Schema().field("word", Types.STRING))
            .registerTableSource("fileSource");

        Table result = tEnv.scan("fileSource")
            .groupBy("word")
            .select("word, count(1) as count");

        tEnv.toDataSet(result, Row.class).print();
    }
}

參考:https://github.com/hequn8128/TableApiDemo

2、TableAPI操作

  (1)how to get a Table

    Table myTable = tableExnvironment.scan("MyTable"); //Table 是從tableExnvironment中scan出來的,那么MyTable是如果注冊呢,即:How to register a table??大致又三種:

    

 

  (2)how to emit a Table  

    

    (3)  how to query a Table

    

3、 Columns Operation & Function

 

 

4、 Row-based Operation

 

 

 

 

Table SQL 編程:

1、如何在流上運行SQL查詢

  參考:https://github.com/ververica/sql-training

2、如何使用SQL CLI客戶端

3、執行window aggregate 和non-window aggregate,並理解其區別

4、如何用SQL消費Kafka數據

5、如何用SQL將結果寫入Kafka和ElasticSearch

還有......更過會在1.9支持,敬請關注

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM