ALINK(六):PYALINK與PYLINK一同使用


https://gitee.com/517424787/Alink/blob/master/docs/pyalink/pyalink-pyflink.md

與 PyFlink 一同使用

在最新的發布中,PyAlink 與 PyFlink 進行了一定的整合。 用戶在新版本的 PyAlink 中能夠使用 PyFlink 的部分功能,同時 PyAlink 腳本也支持像 PyFlink 腳本一樣使用 flink run 來提交作業了。

需要注意的是:這個版本只有 Flink-1.10 對應的 Python 包 pyalink 才具有,pyalink-flink-1.9 沒有以下功能。

一個簡單的例子

我們首先來看一個 PyAlink 與 PyFlink 結合的簡單例子:

from pyalink.alink import *
env, btenv, senv, stenv = getMLEnv() t = stenv.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b']) source = TableSourceStreamOp(t) source.print() StreamOperator.execute()

這段代碼中, getMLEnv 設定 PyAlink 的執行環境與 PyFlink 一致,同時返回的執行環境; 接着使用 stenv.from_elements 來創建一個簡單的 Table; 然后使用 PyAlink 的 TableSourceStreamOp 將 Table 轉換為 Alink 所接受的 Operator, 進行輸出打印。

這段代碼示例既可以直接在 Notebook 中運行,也可以直接保存成.py 的腳本文件,使用 PyFlink 腳本的運行方式來執行:

  1. python ***.py: 直接使用本地運行環境;
  2. flink run -py ***.py:將腳本提交給遠程集群來運行,參考 Job Submission Examples

與 PyFlink 共用執行環境

在新版本中,PyAlink 新增了 getMLEnv 的接口,直接獲取 PyFlink 的執行環境,見上文的代碼示例。 這個接口返回四元組(benv, btenv, senv, stenv),分別對應 PyFlink 中的四種執行環境: ExecutionEnvironment、BatchTableEnvironment、StreamExecutionEnvironment 和 StreamTableEnvironment。 基於這四個變量,用戶可以調用 PyFlink 的接口。

此外,在之前的版本中,PyAlink 提供了方便使用 Flink 不同執行環境的函數:useLocalEnv 和 useRemoteEnv。 這兩個接口在新版本中將同樣返回四元組 (benv, btenv, senv, stenv)。 用戶可以通過返回的執行環境來調用 PyFlink 的接口。

useLocalEnv/useRemoteEnv 與 getMLEnv 的區別在於:

  • useLocalEnv/useRemoteEnv 顯示指定了執行環境是本地還是遠程集群,可以根據需要調用、切換,
  • getMLEnv則默認情況為本地執行,同時可以根據腳本的運行方式采用對應的執行環境。

這里要注意的是,為了便於調試,PyAlink 允許在調用useLocalEnv/useRemoteEnv后, 再調用getMLEnv,但此時的腳本是不支持flink run來提交作業的。

與 Table 互相轉換

在文首的例子中,我們看到 PyFlink 的 Table 可以轉換為 PyAlink 的 Operator, 然后使用 Alink 的算法組件進行后續操作。

具體來說,PyAlink 提供了 TableSourceBatchOp 和 TableSourceStreamOp 將 PyFlink 中的 Table 分別轉換為 Alink 中的 BatchOperator 和 StreamOperator

同時,對於 PyAlink 中的 Operator,提供了 getOutputTable 來獲取算法組件對應的 Table 。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM