一 前言
目前不支持pyflink-shell.sh的任何模式。
只支持jupyter notebook以及python shell以及jar包提交的方式.
下面是來自官方釘釘群的回復:
二 jupyter notebook 下
1 本地模式
使用方法創建本地運行環境:useLocalEnv(parallism, flinkHome=None, config=None)。
其中,
參數 parallism 表示執行所使用的並行度;
flinkHome 為 flink 的完整路徑,默認使用 PyAlink 自帶的 flink路徑;
config為Flink所接受的配置參數。運行后出現如下所示的輸出,表示初始化運行環境成功:
JVM listening on ***Python listening on ***
2 遠程集群模式
①啟動hadoop集群
②啟動flink集群
$FLINK_HOME/bin/start-cluster.sh
注意flink集群的端口號,默認是master:8081,這個要寫入后面的代碼中去的。
由於不支持pyflink-shell.sh,所以只能打開jupyter notebook來做實驗了
過方法可以連接一個已經啟動的 Flink 集群:useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)
。
其中,參數
host
和port
表示集群的地址;parallelism
表示執行作業的並行度;flinkHome
為 flink 的完整路徑,默認使用 PyAlink 自帶的 flink-1.9.0 路徑;localIp
指定實現Flink DataStream
的打印預覽功能時所需的本機IP地址,需要 Flink 集群能訪問。默認為localhost
。shipAlinkAlgoJar
是否將 PyAlink 提供的 Alink 算法包傳輸給遠程集群,如果遠程集群已經放置了 Alink 算法包,那么這里可以設為 False,減少數據傳輸。
Flink-1.10 及以上版本對應的 pyalink 包,還支持類似 pyflink 腳本的遠程集群運行方式。
完整測試代碼如下(下面的Desktop和8082要改成自己的):
from pyalink.alink import * useRemoteEnv("Desktop", 8082, 2, flinkHome=None, localIp="localhost", config=None) URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/review_rating_train.csv" SCHEMA_STR = "review_id bigint, rating5 bigint, rating3 bigint, review_context string" LABEL_COL = "rating5" TEXT_COL = "review_context" VECTOR_COL = "vec" PRED_COL = "pred" PRED_DETAIL_COL = "predDetail" source = CsvSourceBatchOp() \ .setFilePath(URL)\ .setSchemaStr(SCHEMA_STR)\ .setFieldDelimiter("_alink_")\ .setQuoteChar(None) ## Split data for train and test trainData = SplitBatchOp().setFraction(0.9).linkFrom(source) testData = trainData.getSideOutput(0) pipeline = ( Pipeline() .add( Segment() .setSelectedCol(TEXT_COL) ) .add( StopWordsRemover() .setSelectedCol(TEXT_COL) ).add( DocHashCountVectorizer() .setFeatureType("WORD_COUNT") .setSelectedCol(TEXT_COL) .setOutputCol(VECTOR_COL) ) ) naiveBayes = ( NaiveBayesTextClassifier() .setVectorCol(VECTOR_COL) .setLabelCol(LABEL_COL) .setPredictionCol(PRED_COL) .setPredictionDetailCol(PRED_DETAIL_COL) ) model = pipeline.add(naiveBayes).fit(trainData) predict = model.transform(testData) metrics = ( EvalMultiClassBatchOp() .setLabelCol(LABEL_COL) .setPredictionDetailCol(PRED_DETAIL_COL) .linkFrom(predict) .collectMetrics() ) print("ConfusionMatrix:", metrics.getConfusionMatrix()) print("LabelArray:", metrics.getLabelArray()) print("LogLoss:", metrics.getLogLoss()) print("Accuracy:", metrics.getAccuracy()) print("Kappa:", metrics.getKappa()) print("MacroF1:", metrics.getMacroF1()) print("Label 1 Accuracy:", metrics.getAccuracy("1")) print("Label 1 Kappa:", metrics.getKappa("1")) print("Label 1 Precision:", metrics.getPrecision("1"))
實驗結果如下:
ConfusionMatrix: [[4944, 374, 190, 181, 223], [29, 1207, 128, 137, 82], [1, 2, 317, 22, 10], [0, 0, 0, 62, 0], [1, 0, 1, 1, 187]]
LabelArray: ['5', '4', '3', '2', '1']
LogLoss: 1.3876163466154336
Accuracy: 0.8293616495863687
Kappa: 0.6641967288935378
MacroF1: 0.6239089988842421
Label 1 Accuracy: 0.960735893320163
Label 1 Kappa: 0.5242700620715995
Label 1 Precision: 0.9842105263157894
下面的這個來自web ui
三 在集群上運行Alink算法
- 准備Flink集群
wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
tar -xf flink-1.10.0-bin-scala_2.11.tgz && cd flink-1.10.0 ./bin/start-cluster.sh
- 准備Alink算法包
git clone https://github.com/alibaba/Alink.git
cd Alink && mvn -Dmaven.test.skip=true clean package shade:shade
- 運行Java示例
./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar # ./bin/flink run -p 2 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar # ./bin/flink run -p 2 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples