ALINK(三):PYALINK 以及ALINK 任務運行(本地模式與集群模式)


一 前言

目前不支持pyflink-shell.sh的任何模式。

只支持jupyter notebook以及python shell以及jar包提交的方式.

下面是來自官方釘釘群的回復:

 

 

 二 jupyter notebook 下

1 本地模式

  使用方法創建本地運行環境:useLocalEnv(parallism, flinkHome=None, config=None)

  其中,

  參數 parallism 表示執行所使用的並行度;

  flinkHome 為 flink 的完整路徑,默認使用 PyAlink 自帶的 flink路徑;

       config為Flink所接受的配置參數。運行后出現如下所示的輸出,表示初始化運行環境成功:

 

       JVM listening on ***Python listening on ***

2 遠程集群模式

①啟動hadoop集群

②啟動flink集群

$FLINK_HOME/bin/start-cluster.sh

注意flink集群的端口號,默認是master:8081,這個要寫入后面的代碼中去的。

由於不支持pyflink-shell.sh,所以只能打開jupyter notebook來做實驗了

過方法可以連接一個已經啟動的 Flink 集群:useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)

其中,參數

  • host 和 port 表示集群的地址;
  • parallelism 表示執行作業的並行度;
  • flinkHome 為 flink 的完整路徑,默認使用 PyAlink 自帶的 flink-1.9.0 路徑;
  • localIp 指定實現 Flink DataStream 的打印預覽功能時所需的本機IP地址,需要 Flink 集群能訪問。默認為localhost
  • shipAlinkAlgoJar 是否將 PyAlink 提供的 Alink 算法包傳輸給遠程集群,如果遠程集群已經放置了 Alink 算法包,那么這里可以設為 False,減少數據傳輸。

Flink-1.10 及以上版本對應的 pyalink 包,還支持類似 pyflink 腳本的遠程集群運行方式。

 

完整測試代碼如下(下面的Desktop和8082要改成自己的):

from pyalink.alink import *
useRemoteEnv("Desktop", 8082, 2, flinkHome=None, localIp="localhost", config=None)
 
URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/review_rating_train.csv"
SCHEMA_STR = "review_id bigint, rating5 bigint, rating3 bigint, review_context string"
LABEL_COL = "rating5"
TEXT_COL = "review_context"
VECTOR_COL = "vec"
PRED_COL = "pred"
PRED_DETAIL_COL = "predDetail"
source = CsvSourceBatchOp() \
    .setFilePath(URL)\
    .setSchemaStr(SCHEMA_STR)\
    .setFieldDelimiter("_alink_")\
    .setQuoteChar(None)
 
## Split data for train and test
trainData = SplitBatchOp().setFraction(0.9).linkFrom(source)
testData = trainData.getSideOutput(0)
 
pipeline = (
    Pipeline()
    .add(
        Segment()
        .setSelectedCol(TEXT_COL)
    )
    .add(
        StopWordsRemover()
        .setSelectedCol(TEXT_COL)
    ).add(
        DocHashCountVectorizer()
        .setFeatureType("WORD_COUNT")
        .setSelectedCol(TEXT_COL)
        .setOutputCol(VECTOR_COL)
    )
)
 
 
 
naiveBayes = (
    NaiveBayesTextClassifier()
    .setVectorCol(VECTOR_COL)
    .setLabelCol(LABEL_COL)
    .setPredictionCol(PRED_COL)
    .setPredictionDetailCol(PRED_DETAIL_COL)
)
model = pipeline.add(naiveBayes).fit(trainData)
 
 
predict = model.transform(testData)
metrics = (
    EvalMultiClassBatchOp()
    .setLabelCol(LABEL_COL)
    .setPredictionDetailCol(PRED_DETAIL_COL)
    .linkFrom(predict)
    .collectMetrics()
)
 
print("ConfusionMatrix:", metrics.getConfusionMatrix())
print("LabelArray:", metrics.getLabelArray())
print("LogLoss:", metrics.getLogLoss())
print("Accuracy:", metrics.getAccuracy())
print("Kappa:", metrics.getKappa())
print("MacroF1:", metrics.getMacroF1())
print("Label 1 Accuracy:", metrics.getAccuracy("1"))
print("Label 1 Kappa:", metrics.getKappa("1"))
print("Label 1 Precision:", metrics.getPrecision("1"))

實驗結果如下:

ConfusionMatrix: [[4944, 374, 190, 181, 223], [29, 1207, 128, 137, 82], [1, 2, 317, 22, 10], [0, 0, 0, 62, 0], [1, 0, 1, 1, 187]]
LabelArray: ['5', '4', '3', '2', '1']
LogLoss: 1.3876163466154336
Accuracy: 0.8293616495863687
Kappa: 0.6641967288935378
MacroF1: 0.6239089988842421
Label 1 Accuracy: 0.960735893320163
Label 1 Kappa: 0.5242700620715995
Label 1 Precision: 0.9842105263157894
下面的這個來自web ui

 

 

 

 三 在集群上運行Alink算法

  1. 准備Flink集群
  wget https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
 tar -xf flink-1.10.0-bin-scala_2.11.tgz && cd flink-1.10.0  ./bin/start-cluster.sh

 

  1. 准備Alink算法包
  git clone https://github.com/alibaba/Alink.git
 cd Alink && mvn -Dmaven.test.skip=true clean package shade:shade
  1. 運行Java示例
  ./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar  # ./bin/flink run -p 2 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.1-SNAPSHOT.jar  # ./bin/flink run -p 2 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM