一、Alink結構簡析
-
Pipeline結構
- 算法、預處理、特征工程等組件可加載進pipeline進行訓練預測,組件也可單獨使用
- pipeline構成如下:
-
數據源
- Alink對各種數據源的操作均為包裝成Operator,批與流采用不同Operator。同時,Pipeline也支持Table數據源的輸入,但其后續處理也是包裝成TableOp,使用外部源Table時要注意設置Environment和Pipeline相同
- Alink可以對以上數據源直接獲取,也可對Flink的DataSet/DataStream包裝為Operator
-
批式/流式算法通用的串聯方式
- Alink的fit和transform過程是同時支持BatchOperator和StreamOperator的,大部分數據處理等組件均支持,但根據實際使用的算法,fit過程對pi與流的支持是不同的。
- 訓練后或保存的model即可預測批數據也可預測流數據
- 邏輯回歸訓練/預測過程示例
linkFrom內部完成各業務處理邏輯,同時該部分可繼承EstimatorBase或TransformerBase形成PipelineStage
二、Alink使用介紹
-
使用概覽
Pipeline pipeline = new Pipeline( new Imputer() .setSelectedCols("review") .setOutputCols("featureText") .setStrategy("value") .setFillValue("null"), new Segment() .setSelectedCol("featureText"), new StopWordsRemover() .setSelectedCol("featureText"), new DocCountVectorizer() .setFeatureType("TF") .setSelectedCol("featureText") .setOutputCol("featureVector"), new LogisticRegression() .setVectorCol("featureVector") .setLabelCol("label") .setPredictionCol("pred") ); //pipeline.add(PipelineStage組件,index) PipelineModel model = pipeline.fit(source); model.save(filepath); PipelineModel model =PipelineModel.load(modelPath); model.transform(dataOperator); //可以model.getLocalPredictor("review string").map(row)形式進行本地預測 Operator.execute();
- 數據獲取/保存
1)hive示例
data = HiveSourceBatchOp() .setInputTableName("tbl") .setPartitions("ds=2022/dt=01,ds=2022/dt=02").setHiveVersion("2.0.1") .setHiveConfDir("hdfs://192.168.99.102:9000/hive-2.0.1/conf") .setDbName("mydb") sink = HiveSinkBatchOp() .setHiveVersion("2.0.1") .setHiveConfDir("hdfs://192.168.99.102:9000/hive-2.0.1/conf").setDbName("mydb") .setOutputTableName("tbl_sink") .setOverwriteSink(True)
2)Kafka
Kafka011SinkStreamOp sink = new Kafka011SinkStreamOp() .setBootstrapServers("localhost:9092") .setDataFormat("json") .setTopic("iris");
3)DataSet
DataSetWrapperBatchOp op = new DataSetWrapperBatchOp(dataSet,filedNames,fieldTypes);
- Alink算法與組件