flink系列-11、PyFlink 核心功能介紹(整理自 Flink 中文社區)


視頻地址

PyFlink 核心功能介紹

  • 文章概述:PyFlink 的核心功能原理介紹及相關 demo 演示。
  • 作者:程鶴群(軍長)(Apache Flink Committer,阿里巴巴技術專家),是 Flink 社區的一名 PMC ,現在在阿里巴巴的實時計算團隊。2015年加入阿里巴巴搜索事業部,從事主搜離線相關開發。2017年開始參與 Flink SQL 相關的開發,2019年開始深入參與 PyFlink 相關的開發。
  • 整理:謝縣東
  • 校對:***

課程概要

今天的分享主要包含以下幾個部分:

  1. PyFlink 的發展史。
  2. 介紹 PyFlink 的核心功能以及其背后的一些原理。
  3. PyFlink 的 demo 演示。
  4. PyFlink 社區扶持計划。

1.PyFlink 的發展史

PyFlink 發展歷程

1.1、v1.8.x

  1. Flink 在1.8版本的時候就已經提供 Python API,只在 Datase/Stream 上提供支持。
  2. 存在一些問題,比如:
    2.1 Table API 不支持 Python。
    2.2 兩套各自獨立實現的一個 Python API。
    2.3 底層實現是 JPython,JPython 無法支持 Python3.x。

1.2、v1.9.x

  1. 2019年8月發布。
  2. 支持 Python Table API。

1.3、v1.10.x

  1. 2020年2月發布。
  2. 提供了 Python UDF 的支持。
  3. 提供 UDF 的依賴管理。

1.4、未來發展

  1. 提供 Pandas UDF 的支持。
  2. 提供用戶自定義的一些 UDF Metrics。
  3. ML API。
  4. 在應用性方面,提供 SQL DDL 支持 Python UDF。
  5. 在后面的一些版本中,我們也希望越來越多的人能夠參與到 PyFlink 的貢獻跟開發中去。

2.PyFlink 核心功能及原理介紹

這里的核心功能主要是從每個版本的划分來跟大家進行介紹,第1個 PyFlink 1.9 版本里面提供 Python Table API 的支持,然后是 PyFlink 1.10 里面提供了 Python UDF 還有相關依賴管理,最后 1.11 版本里面提供了 Pandas UDF 和用戶自定義的 Metrics。

1.Python Table API

什么是 Python Table API 呢?我們可以從編程的角度來介紹一下。Python Table API 大概提供了一些 Python 的 API ,比如這里主要可以看一下 Table 的接口, Table 接口上有很多 Table 相關的算子,這些算子可以分為兩類:

  • 1.跟 sql 相關的算子。比如 select、filter、join、window 等;
  • 2.在 sql 的基礎上擴展的一些算子。比如 drop_columns(..),可以用來提升 sql 的便利性,比如當有一個很大的表並且這時候想去刪除某一列的時候,可以用 drop_columns 來刪除某一列。

Python Table API

對於我們來說,我們可以隨意的組合 Table 上的這些方法,然后去編寫不同的業務邏輯。我們接下來看一下,如何用 Table API 來寫一個 WordCount 的例子,可以讓大家有一個比較完整的認識。

2.WordCount

如下圖所示,是一個完整的 Python Table API 的 WordCount 的例子。主要可以包含4個部分。

  • 首先,我們需要去初始化環境,比如第6行,我們先拿到了一個 ExecutionEnvironment,然后第7行,去創建一個 TableEnvironment。
  • 創建 TableEnvironment 之后,我們需要去定義 source 跟 sink ,這里 source 跟 sink 都是指定了輸入和輸出的文件路徑,還去指定了文件里面 Table 對應的一些字段,以及字段對應的數據類型。而且可以定義輸出的分隔符。
  • 那么我們定義好 source 跟 sink 之后,我們再來看一下,怎么來編寫計算邏輯?我們可以用 from_path 算子來去讀取 source 表,讀取完之后,我們就可以進行 group by 的一些聚合,做 group by 跟 wordcount。
  • 做完之后,我們就可以把結果表用 insert_into 進行輸出。最后我們可以調用 Environment 的 execute 來提交作業。

wordcount

經過上面4步,我們就完整的寫出了一個 Python Table API 的 WordCount。那么對於 WordCount 例子,它的底層實現邏輯是怎么樣的呢?我們接下來看一下,Python Table API 的一個架構。

3.Table API 架構

Table API 架構

  • 通過這個架構圖,可以看到,Python Table API 它是建立在 Java Table API 的基礎上的,我們並沒有單獨的從上到下實現一套 Python Table API。
  • 他是很特別的,而是在 Java Table API 的基礎上加了一層薄薄的 API,這兩層 API 是可以相互調用的。
  • 在 client 端的時候,會起一個 Python VM 然后也會起一個 Java VM ,兩個 VM 進行通信。通信的細節可以看下面這張圖。

VM 通信

  • 我們可以看到 Python 跟 Java VM 里面都會用 Py4J 各自起一個 Gateway。然后 Gateway 會維護一些對象。
  • 比如我們在 Python 這邊創建一個 table 對象的時候,它也會在相應的 Java 這邊創建一個相同 table 對象。如果創建一個 TableEnvironment 對象的時候,在 Java 這邊也會創建一個 TableEnvironment 對象。
  • 如果你調用 table 對象上的方法,那么也會映射到 Java 這邊,所以是一個一一映射的關系。
  • 基於這一套架構,我們可以得出一個結論:如果你用 Python Table API 寫出了一個作業,這個作業沒有 Python UDF 的時候,那么這個作業的性能跟你用 Java 寫出來的作業性能是一樣的。因為它底層的架構都是同一套 Java 的架構。

剛剛我們介紹了 PyFlink 1.9 版本里面的 Python Table API ,我們剛剛提到了 table 的接口上面提供了很多不同的算子,我們可以用這些算子去組合,實現不同的業務邏輯。但是對於這些算子來說,它的功能還是不能夠滿足一些特定的情況,比如某些業務需要去編寫一些自定義的邏輯,那么這個時候就需要去強依賴我們的 Python UDF,所以在 PyFlink 1.10 版本里面,我們提供了 Python UDF 並且提供了相應的依賴管理。

1.Python UDF 架構

  • 如果你的作業是包含一個 Python UDF 的作業,那么從提交的時候,就是左邊的架構圖,然后 deploy 到 Remote 端的時候,我們可以看到 Remote 端的架構圖可以分為兩個部分。左邊這個是 Java 的 Operator,右邊這個是 Python 的 Operator。
  • 大體的流程我們可以大概看一下:
    1. 在 open 方法里進行 Java Operator 和 Python Operator 環境的初始化。
    2. 環境初始化好之后,會進行數據處理。當 Java Operator 收到數據之后,先把數據放到一個input buffer 緩沖區中,達到一定的閾值后,才會 flash 到 Python 這邊。Python 這邊在處理完之后,也會先將數據放到一個結果的緩沖區中,當達到一定閾值,比如達到一定的記錄的行數,或者是達到一定的時間的位置,才會把結果 flash 到這邊。
    3. state 訪問的鏈路。
    4. logging 訪問的鏈路。
    5. metrics 匯報的鏈路。

Python UDF 架構

2.Python UDF 的使用

1.Pyflink-1.9 版本中,Python API 中支持注冊使用 Java UDF,使用方法如下:
你可以調 TableEnvironment 上的 register_java_function 這個方法,有兩個參數,一個參數是你給 UDF 起的名字,第2個是 Java 類的路徑。

table_env.register_java_function("func1", "java.user.defined.function.class.name")

下面是一個例子:
java UDF

2.Python UDF 的使用:
你可以調 TableEnvironment 上的 register_function 這個方法,有兩個參數,一個參數是你給 UDF 起的名字,第2個是 python_udf 的一個對象。

table_env.register_function("func1", python_udf)

下面是一個例子:
python UDF

3.Python UDF 的定義方式

PyFlink 里面也支持一些其他的方式去定義 UDF,我們可以看一下,總共有4種方式:

  1. 可以繼承 ScalaFunction 基類,然后去重寫 eval 方法。
  2. 可以直接去定義一個 Named Function,然后再用 UDF 的簽名去聲明 UDF 的輸入類型和輸出類型。
  3. 也可以用剛剛例子里面的 Lambda Function 的這種方式,來定義你的 Python UDF。
  4. 最后一種是 Callable Function的方式。也是聲明它的輸入和輸出的類型。

Python UDF 定義方式

4.依賴管理

我們寫完 UDF 的時候,經常遇到一個問題,UDF 里面可能會有一些依賴,那么我們怎么去解決這些依賴問題呢?PyFlink 提供了4種依賴的 API,我們可以一起看一下。

  1. 依賴文件
    如果你的 UDF 里面依賴一個文件的話,那么你可以用 add_python_file 加載依賴的文件的路徑,指定完之后,作業提交的時候,就會把這個文件分發到集群,那么在遠程執行的時候,你的 UDF 就可以去訪問這個文件。
table_env.add_python_file(file_path)
  1. 依賴存檔(打包)文件
    可能會去依賴一個存檔的文件,這個時候你可以用 add_python_archive 方法,傳入兩個參數。第2個參數是一個可選的參數。第1個參數表示對你存檔文件的重命名。如果你調用了 API,那么在 UDF 里面就可以去訪問存檔文件里面的所有文件。
table_env.add_python_archive("py_env.zip", "myenv") 
# the files contained in the archive file can be accessed in UDF 
def my_udf(): 
    with open("myenv/py_env/data/data.txt") as f: 
  1. 依賴第三方項目
    這個時候我們可以用 set_python_requirements 方法去指定你的第三方依賴。他也是有兩個參數,第1個參數是傳一個文件,文件里面寫了所依賴的第三方項目,以及它對應的版本。第2個參數是一個可選的參數,如果你的集群是一個有網絡的環境,那么第2個參數可以不填,當第2個參數不填的時候,作業提交開始初始化的時候, Python 就會去根據你的 requirements 文件里面配置的依賴,自動的去網絡下載你的依賴,然后安裝。如果你的集群是沒有網絡的,你可以預先把這些依賴下載好,下載到 cached 的目錄里面去。然后你把目錄也一起提交到機群,集群拿到這個目錄會去安裝你的這些依賴。
# commands executed in shell 
echo numpy==1.16.5 > requirements.txt 
pip download -d cached_dir -r requirements.txt --no-binary :all: 
# python code 
table_env.set_python_requirements("requirements.txt", "cached_dir")
  1. 指定Python Interpreter路徑
    假設你的 Python UDF 運行的時候,會依賴某一個版本的 Python 解釋器。那么這個時候可以去指定你所希望 Python UDF 運行的一個解釋器的路徑。
table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")

我們在 Pyflink 1.11 的版本里面提供了 Pandas UDF,還有用戶自定義的 Metrics。當然 Pyflink 1.11 版本里面,不光是這兩個功能,我這里主要是介紹一下這兩個功能。Pyflink 1.11版本也會即將在2020年的5月份進行發布。
接下來會從功能跟性能的兩個角度來介紹一下 Pandas UDF。

1.Pandas UDF – 功能

我們先來看一下功能方面,如果你要編寫一個 Pandas UDF,那么跟剛才定義普通 UDF 的形式基本上是一致的。你這里只需要去聲明一個 udf_type,指定他是 Pandas 就行了。當你指定之后, UDF 運行起來的時候系統傳給他的 i 跟 j 就變成一個 pandas.Series 的數據結構。這個時候你就可以直接用 series 來進行操作。與此同時這個會有一個好處,就是我們拿到的是一個 pandas 的數據結構,我們就可以去調用 pandas 相關的一些庫函數,並且還可以去調用一些數字計算相關的庫函數,這樣可以極大的擴展我們的功能。不需要去自己去實現一套邏輯。

Pandas UDF 功能

2.Pandas UDF - 性能

那么性能上 Pandas UDF 的好處,主要有兩點。

  1. 減少了調用的開銷,因為剛剛說到了 Pandas UDF 傳給你的 UDF 是一個 pandas.series,它相當於是將多行的數據統一一次性的傳給了你的 UDF。相比較普通的 UDF 比如多行,他每一行都會去調用你 UDF。Pandas UDF 就是多行只需要調用一次,所以這可以減少 Pandas UDF 調用的開銷。
  2. 可以減少 UDF 的序列化跟反序列化開銷。這里具體介紹一下,為什么減少了序列化跟反序列化?
    我們可以看一下右邊這個圖,左邊是 Java Operator,右邊是 Python Operator。假設我們 Operator 收到了一個 X 然后 X 在這里會進行一個序列化,變成 arrow 的內存數據格式,這個時候用 X’ 來表示。那么這個時候 Java 這邊會把 X’ 傳給 Python 這邊,Python 這邊就可以直接來訪問 arrow 數據結構,因為 pandas 底層的數據結構就是用 arrow 來表示的,所以這個時候不需要再 Python 這邊進行反序列化,可以直接來操作 X’。然后我們在 X’ 加一之后,得到 Y’, Y’ 也是直接生成的 arrow 內存數據格式,這里也不需要反序列化。那么我們把 Y’ 傳到 Java 這邊的時候,這個時候需要進行一個反序列化。
    我們可以發現這個時候,只需要在 Java 這邊進行一個序列化和反序列化。Python 這邊可以省去了序列化和反序列化開銷。
    而且這里需要提出的一點是,如果你的輸入 X 也是一個 arrow 的內存數據格式,那么 Java 這邊的序列化跟反序列化也是可以避免的。比如你的 source 是一個 pocket,那么他底層是一個 arrow 數據格式,這個時候就可以避免掉 Java 這邊的序列化和反序列化。這個就是 Pandas UDF 的一個性能提升。

Pandas UDF 性能

3.User-defined Metrics

我們再來看一下用戶自定義 Metrics,

  1. Metric 注冊
    先來看一下 Metric 的注冊,Metric 注冊可以是在 metric_group 上調用對應的 Metric 方法來注冊。
    Metric 注冊

  2. Metric Scope
    metric_group 還可以調用他的 add_group 方法去定義你的 Metric 的一個域,你可以對你的 Metric 進行分類。
    Metric Scope

  3. Metric 類型
    目前 PyFlink 里面提供的 Metric 類型有以下這么4種:

    • Counter
      類似一個累加器。一開始需要在 open 方法里面進行 Counter 的注冊,然后調用 match_group 上 counter 方法,這里我們給了一個 Metric 的名字叫 my_counter。定義完之后,就可以在 eval 方法里面進行使用。然后 counter 可以提供 inc 方法,你可以調用 inc 進行相應的增加。
      Counter

    • Gauge
      它是用來反映一個瞬時值。我們可以看一下,假設我們需要在 Metric 上顯示 length 值的變化情況。那么我們需要用 gauge 方法來注冊,名字是 my_gauge。第2個參數這里需要注意它是一個 UDF ,我們需要返回要監控數值的值是什么,返回這個值。然后在 eval 方法里或者其他 UDF 的調用里可以改變這個值。框架底層就會不斷去匯報這個值當前值是多少。
      Gauge

    • Meter
      Meter 這種 Metric 是表示當前這一秒往前一個時間區間內所有數值相加的一個均值。我們看可以調用 meter 方法來注冊。第2個參數是一個默認的參數,默認是60秒,表示60秒內所有值的一個均值。這里需要注意的是,Meter 每一秒都會去匯報當前這一秒往前60秒時間區間內,所有值的均值。可以用 Meter 的 mark_event 方法來匯報
      Meter

    • Distribution (sum/count/min/max/mean)
      最后一種是 Distribution 的一個 Metric 類型,它對你的值能提供一些 sum/count/min/max/mean 等統計的信息。你可以調用 metric_group 上的distribution 這個方法。值得更新,可以調用 distribution.update 方法。
      Distribution

3. PyFlink 的 demo 演示

接下來對這些核心功能做一些 demo 的演示跟講解。這里我們提供了一個 playgrounds 的 git。 git 的目的是讓我們的用戶能夠盡快的去熟悉 PyFlink 所有的功能。具體參考:https://github.com/pyflink/playgrounds

4.PyFlink 社區扶持計划

  • 為什么要發起 PyFlink 社區扶持計划?
    用戶逐漸變多、有經驗用戶少
  • 社區目標:並肩作戰,營造雙贏
  • 如何參與 PyFlink 計划?
    https://survey.aliyun.com/apps/zhiliao/B5JOoruzY
    初步審核符合條件后我們會在收到問卷的 10 個工作日內與您聯系。
  • 扶持目標
    面向所有 PyFlink 社區公司客戶
  • PyFlink 問題支持&共享
    如果你有一些相關的問題或者是其他的一些意見,可以發到社區的郵件列表里面去。
    問題支持


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM