最新在自學flink,直接上的最新版,學到了table api,發現flink1.11/.1版本和flink1.10.1版本有很大差別。因為是新版本目前網上資料也不多,我通過查閱官網和自己編碼運行,簡單寫了個demo分享和講解一下。
- 新api提供的TableEnvironment接口,直接提供了接受原始數據的方法
flink1.10.1的TableEnvironment沒有僅有fromTableSource,和from兩個方法返回Table
fromTableSource(TableSource<?> source)
Creates a table from a table source.
from(String path)
Reads a registered table and returns the resulting Table.
針對流數據或者批數據必須使用TableEnvironment接口下的BatchTableEnvironment接口和StreamTableEnvironment接口提供的方法,針對原始數據源調用不同方法接受數據,但是流數據似乎還沒有提供接口接收數據,而且接受的方法應該還只是測試實驗性質。
flink1.11.1中TableEnvironment提供了fromValues方法以及其重載方法,用於接受原生數據,並且fromTableSource已經是過時方法。
fromValues(AbstractDataType<?> rowType, Expression... values)
Creates a Table from given collection of objects with a given row type.
- 新表達式
fromValues很多重載方法必須接受DataTypes.ROW抽象數據模型,對此flink1.11.1提供了新的表達式api方便開發人員進行編寫代碼
/** * Creates a row of expressions. */ public static ApiExpression row(Object head, Object... tail) { return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ROW, head, tail); }
以下是一個簡單demo
1 String words = "hello flink hello blink hello muscle hello power"; 2 List<ApiExpression> wordList = Arrays.stream(words.split("\\W+")) 3 .map(word -> row(word, 1)) 4 .collect(Collectors.toList()); 5 //注冊成表,指定字段 6 Table table = tblEnv.fromValues( 7 DataTypes.ROW( 8 DataTypes.FIELD("word", DataTypes.STRING().notNull()), 9 DataTypes.FIELD("frequency", DataTypes.INT().notNull()) 10 ), wordList); 11 table.printSchema(); 12 tblEnv.createTemporaryView("word_count", table); 13 14 //執行查詢 15 Table table2 = tblEnv.sqlQuery("select word, sum(frequency) from word_count group by word"); 16 table2.execute().print();