來源:https://developer.aliyun.com/article/769981
http://bubuko.com/infodetail-3554826.html
1 開發環境依賴
PyFlink作業的開發和運行需要依賴Python 3.5/3.6/3.7 版本和Java 8或者Java 11,本游樂場所使用的環境是Java 1.8.0_211, Python 3.7.7 還有一些其他基礎軟件如下;
- Java 1.8.0_211
- Python 3.7.7
- PIP 20.0.2
- PyCharm Runtime version: 11.0.7
- MocOS 10.14.6
2 PyCharm 配置 Python interpreter
應用PyCharm進行開發首先要配置一下項目所使用的Python環境,配置路徑PyCharm -> Preferences -> Project Interpreter
如下:
點擊 Add
配置新的環境,如下:
一路”OK“,完成配置。
安裝PyFlink
我們先利用PyCharm創建一些項目,名為PyFlinkPlayground
, 並為項目選擇我們剛才創建的Virtualenv環境,如下:
創建之后,我們會看到External Libraries
里面使用了PlaygroundEnv
, 但是初始化並沒有PyFlink,所以我們需要進行顯示的安裝,如下:
我們可以手工安裝PyFlink,直接在PyCharm的Terminal
下進行安裝,這時候我們自動就是啟動的PlaygroundEnv
環境,在安裝的過程中你也可以看到site-packages
內容會不斷增加,
(PlaygroundEnv) jincheng:~ jincheng.sunjc$ python --version Python 3.7.7 (PlaygroundEnv) jincheng:~ jincheng.sunjc$ python -m pip install apache-flink==1.11.1 Collecting apache-flink==1.11.1 Using cached apache_flink-1.11.1-cp37-cp37m-macosx_10_9_x86_64.whl (206.7 MB) ... ... Successfully installed apache-beam-2.19.0 apache-flink-1.11.1 avro-python3-1.9.1 certifi-2020.6.20 chardet-3.0.4 cloudpickle-1.2.2 crcmod-1.7 dill-0.3.1.1 docopt-0.6.2 fastavro-0.21.24 future-0.18.2 grpcio-1.30.0 hdfs-2.5.8 httplib2-0.12.0 idna-2.10 jsonpickle-1.2 mock-2.0.0 numpy-1.19.1 oauth2client-3.0.0 pandas-0.25.3 pbr-5.4.5 protobuf-3.12.4 py4j-0.10.8.1 pyarrow-0.15.1 pyasn1-0.4.8 pyasn1-modules-0.2.8 pydot-1.4.1 pymongo-3.11.0 pyparsing-2.4.7 python-dateutil-2.8.0 pytz-2020.1 requests-2.24.0 rsa-4.6 six-1.15.0 typing-3.7.4.3 typing-extensions-3.7.4.2 urllib3-1.25.10 (PlaygroundEnv) jincheng:~ jincheng.sunjc$
最終完成之后你可以在 site-packages
下面找的 pyflink
目錄,如下:
有了這些信息我們就可以進行PyFlink的作業開發了。
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings, StreamTableEnvironment def hello_world(): """ 從隨機Source讀取數據,然后直接利用PrintSink輸出。 """ settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings) source_ddl = """ CREATE TABLE random_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ) """ sink_ddl = """ CREATE TABLE print_sink ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'print' ) """ # 注冊source和sink t_env.execute_sql(source_ddl); t_env.execute_sql(sink_ddl); # 數據提取 tab = t_env.from_path("random_source"); # 這里我們暫時先使用 標注了 deprecated 的API, 因為新的異步提交測試有待改進... tab.insert_into("print_sink"); # 執行作業 t_env.execute("Flink Hello World"); if __name__ == '__main__': hello_world()
上面代碼在PyCharm里面右鍵運行就應該打印如下結果了:
開發日志
正常來講我們可能開發一些UDF,可能打印一些日志或者特殊情況還可能進行Python代碼的調試,怎么解?
- 首先,我們定義一個UDF,在UDF里面添加調試日志,如下:
# 定義UDF @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def pass_by(str): logging.error("Some debugging infomation...") return str
- 然后在SQL里面使用這個UDF,如下:
# 注冊 UDF t_env.register_function('pass_by', pass_by) # 使用UDF tab.select("f_sequence, f_random, pass_by(f_random_str) ")
- 完整的代碼
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings, StreamTableEnvironment, DataTypes from pyflink.table.udf import udf import logging # 定義UDF @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def pass_by(str): logging.error("Some debugging infomation...") return "pass_by_" + str def hello_world(): """ 從隨機Source讀取數據,然后直接利用PrintSink輸出。 """ settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings) t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True) source_ddl = """ CREATE TABLE random_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ) """ sink_ddl = """ CREATE TABLE print_sink ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'print' ) """ # 注冊source和sink t_env.execute_sql(source_ddl); t_env.execute_sql(sink_ddl); # 注冊 UDF t_env.create_temporary_system_function('pass_by', pass_by) # 數據提取 tab = t_env.from_path("random_source"); # 這里我們暫時先使用 標注了 deprecated 的API, 因為新的異步提交測試有待改進... tab.select("f_sequence, f_random, pass_by(f_random_str) ").execute_insert("print_sink")
if __name__ == '__main__': hello_world()
那么運行之后,日志在哪里呢?就是在項目的 PlaygroundEnv -> site-packages -> pyflink -> log
目錄 ,如下:
到這里,簡單的 開發環境就OK了,大家可以改改代碼,直觀體驗一下。。。