pyflink從入門到入土


一 安裝環境與安裝

您需要一台具有以下功能的計算機:

  • Java 8 or 11
  • Python 3.6, 3.7 or 3.8

使用Python Table API需要安裝PyFlink,它已經被發布到 PyPi,您可以通過如下方式安裝PyFlink:

$ python -m pip install apache-flink

安裝PyFlink后,您便可以編寫Python Table API作業了。

二 編寫一個Flink Python Table API程序 

編寫Flink Python Table API程序的第一步是創建TableEnvironment。這是Python Table API作業的入口類。

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

接下來,我們將介紹如何創建源表和結果表。

復制代碼
 
         
t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\input.csv')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\ouput.csv')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
 
復制代碼

 

上面的程序展示了如何創建及在ExecutionEnvironment中注冊表名分別為mySourcemySink的表。 其中,源表mySource有一列: word,該表代表了從輸入文件input.csv中讀取的單詞; 結果表mySink有兩列: word和count,該表會將計算結果輸出到文件output.csv中,字段之間使用\t作為分隔符。

接下來,我們介紹如何創建一個作業:該作業讀取表mySource中的數據,進行一些變換,然后將結果寫入表mySink

最后,需要做的就是啟動Flink Python Table API作業。上面所有的操作,比如創建源表 進行變換以及寫入結果表的操作都只是構建作業邏輯圖,只有當execute_insert(sink_name)被調用的時候, 作業才會被真正提交到集群或者本地進行執行。

from pyflink.table.expressions import lit
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.execute_insert('mySink').wait()

該教程的完整代碼如下:

復制代碼
from pyflink.dataset import ExecutionEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment
)


exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\input.csv')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('C:\\Users\\DELL\\Desktop\\PYFLINK\\ouput.csv')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
復制代碼
復制代碼
from pyflink.table import EnvironmentSettings, TableEnvironment,BatchTableEnvironment

environment_settings = EnvironmentSettings.new_instance().use_blink_planner().in_batch_mode().build()
t_env = BatchTableEnvironment.create(environment_settings=environment_settings)
t_env.get_config().get_configuration().set_string('parallelism.default', '1')

t_env.execute_sql("""
         CREATE TABLE mySource (
           word STRING
         ) WITH (
           'connector' = 'filesystem',
           'format' = 'csv',
           'path' = 'C:\\Users\\DELL\\Desktop\\PYFLINK\\input.csv'
         )
     """

)



t_env.execute_sql("""
         CREATE TABLE mySink (
           word STRING,
           `count` BIGINT
         ) WITH (
           'connector' = 'filesystem',
           'format' = 'csv',
           'path' = 'C:\\Users\\DELL\\Desktop\\PYFLINK\\word_count_output.csv'
         )
     """)
#
#t_env.from_path('mySource') \
#    .group_by('word') \
#    .select('word, count(1)') \
#    .execute_insert('mySink').wait()
#

from pyflink.table.expressions import lit
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
復制代碼

 

三 執行一個Flink Python Table API程序

首先,你需要在文件 “input.csv” 中准備好輸入數據。你可以選擇通過如下命令准備輸入數據:

input.csv

flink
pyflink
flink

接下來,可以在命令行中運行作業(假設作業名為WordCount.py)(注意:如果輸出結果文件“output.csv”已經存在,你需要先刪除文件,否則程序將無法正確運行起來):

$ python WordCount.py

上述命令會構建Python Table API程序,並在本地mini cluster中運行。如果想將作業提交到遠端集群執行, 可以參考作業提交示例

最后,你可以通過如下命令查看你的運行結果:

ouput.csv

flink    2
pyflink    1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM