flink1.11.1,全新的table api


最新在自學flink,直接上的最新版,學到了table api,發現flink1.11/.1版本和flink1.10.1版本有很大差別。因為是新版本目前網上資料也不多,我通過查閱官網和自己編碼運行,簡單寫了個demo分享和講解一下。

 

  • 新api提供的TableEnvironment接口,直接提供了接受原始數據的方法

flink1.10.1的TableEnvironment沒有僅有fromTableSource,和from兩個方法返回Table

fromTableSource(TableSource<?> source)
Creates a table from a table source.

from(String path)
Reads a registered table and returns the resulting Table.

針對流數據或者批數據必須使用TableEnvironment接口下的BatchTableEnvironment接口和StreamTableEnvironment接口提供的方法,針對原始數據源調用不同方法接受數據,但是流數據似乎還沒有提供接口接收數據,而且接受的方法應該還只是測試實驗性質。

flink1.11.1中TableEnvironment提供了fromValues方法以及其重載方法,用於接受原生數據,並且fromTableSource已經是過時方法。

fromValues(AbstractDataType<?> rowType, Expression... values)
Creates a Table from given collection of objects with a given row type.

 

  • 新表達式

fromValues很多重載方法必須接受DataTypes.ROW抽象數據模型,對此flink1.11.1提供了新的表達式api方便開發人員進行編寫代碼

    /**
     * Creates a row of expressions.
     */
    public static ApiExpression row(Object head, Object... tail) {
        return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ROW, head, tail);
    }

以下是一個簡單demo

 1         String words = "hello flink hello blink hello muscle hello power";
 2         List<ApiExpression> wordList = Arrays.stream(words.split("\\W+"))
 3                 .map(word -> row(word, 1))
 4                 .collect(Collectors.toList());
 5         //注冊成表,指定字段
 6         Table table = tblEnv.fromValues(
 7                 DataTypes.ROW(
 8                         DataTypes.FIELD("word", DataTypes.STRING().notNull()),
 9                         DataTypes.FIELD("frequency", DataTypes.INT().notNull())
10                 ), wordList);
11         table.printSchema();
12         tblEnv.createTemporaryView("word_count", table);
13 
14         //執行查詢
15         Table table2 = tblEnv.sqlQuery("select word, sum(frequency) from word_count group by word");
16         table2.execute().print();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM