來源:https://www.yuque.com/pinshu/alink_guide/czg4cx
1 Alink Schema String簡介【Alink使用技巧】
Alink在進行表數據讀取和轉換時,有時需要顯示聲明數據表的列名和列類型信息,即Schema信息。Schema String就是將此信息使用字符串的方式描述,這樣便於作為Java函數或者Python函數的參數輸入。
Schema String的定義格式與SQL Create Table語句所輸入的格式相同,列名與對應列類型間使用空格分隔,各列定義間使用逗號分隔。具體格式如下:
colname coltype[, colname2 coltype2[, ...]]
例如,"f0 string, f1 bigint, f2 double",即表明數據表共有3列,名稱分別為f0, f1和f2;對應類型分別為字符串類型,長整型和雙精度浮點型。
關於各種列類型的寫法,可以參照下面Flink Type與Type String的對應表。注意:為了適應不同用戶的習慣,同一個Flink Type可能對應着多種Type String的寫法。另外,Type String對於大小寫不敏感,可以寫STRING,也可以寫String, string等形式。
2 DataFrame和Alink批式數據的互相轉化【Alink使用技巧】
Alink提供了collectToDataframe()和fromDataframe()方法,實現了DataFrame和Alink批式數據的互相轉化。
Alink批式數據 -> DataFrame
Alink的批式數據源或者計算結果,如果能轉成Python的DataFrame形式,則可以利用Python豐富的函數庫及可視化功能,進行后續的分析和顯示。
Alink中每個批式數據源或批式算子都支持collectToDataframe()方法,不需要輸入參數,返回的結果就是DataFrame。注意,該方法中帶有collect字樣,表明其執行過程中會使用Flink的collect方法,觸發Flink任務執行。
示例如下,我們使用CsvSourceBatchOp讀取UCI網站的iris.data數據
source = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")
然后調用變量source的collectToDataframe()方法,得到相應的DataFrame,付給變量df_iris
df_iris = source.collectToDataframe()
到這里,我們就已經實現了Alink數據到Python DataFrame的轉化,下面我們就可以使用Python的函數,進一步處理df_iris,譬如:使用head()方法,顯示前5條數據。
df_iris.head()
運行結果為:
DataFrame -> Alink批式數據
對於DataFrame形式的數據,Alink提供了fromDataframe()方法,將數據轉換為Alink批式數據。具體使用示例如下:
iris = BatchOperator.fromDataframe(df_iris, "sepal_length double, sepal_width double, petal_length double, petal_width double, category string")
使用Alink批式算子BatchOperator的靜態方法fromDataframe(),第一個參數為DataFrame數據,第二個參數為數據Schema的描述,由於DataFrame的數據類型與Alink有點差異,通過設置schema參數,可以嚴格保證轉化后的數據與我們期望的一致。關於Schema String更多的介紹可以參見關於 Alink Schema String的簡介。
最后,我們看一下轉換后的Alink批式數據iris,取前5條數據進行打印輸出,代碼如下:
iris.firstN(5).print()
3 Python數組如何轉化為批式數據源?【Alink使用技巧】
Python數組如何轉化為批式數據源,是將DataFrame作為橋梁,分兩步實現的:
- 首先,Python Pandas提供了幾種途徑,可以在代碼中直接輸入數據,並構建DataFrame
- PyAlink提供了DataFrame到SourceBatchOp的轉換
示例如下,首先import pandas,然后定義一個包含String及整數類型的二維數組,並將其轉化為DataFrame
import pandas as pd arr_2D =[ ['Alice',1], ['Bob',2], ['Cindy',3] ] df = pd.DataFrame(arr_2D)
然后使用BatchOperator的fromDataFrame方法,將前面定義好的DataFrame類型變量df作為第一個參數,后面的參數用來定義數據的列名與類型,使用SchemaStr格式,即列名與其類型間用空格分隔,各列定義之間使用逗號進行分隔。對應腳本如下:
BatchOperator.fromDataframe(df, 'name string, value int').print()
在上面腳本的最后部分,是對轉換得到的Alink BatchOperator數據源執行print方法,顯示數據的內容如下: