Alink工作原理


一、Alink結構簡析

  1. Pipeline結構

    • 算法、預處理、特征工程等組件可加載進pipeline進行訓練預測,組件也可單獨使用
    • pipeline構成如下:
  2. 數據源

     

     

     

    • Alink對各種數據源的操作均為包裝成Operator,批與流采用不同Operator。同時,Pipeline也支持Table數據源的輸入,但其后續處理也是包裝成TableOp,使用外部源Table時要注意設置Environment和Pipeline相同
    • Alink可以對以上數據源直接獲取,也可對Flink的DataSet/DataStream包裝為Operator
  3. 批式/流式算法通用的串聯方式

    • Alink的fit和transform過程是同時支持BatchOperator和StreamOperator的,大部分數據處理等組件均支持,但根據實際使用的算法,fit過程對pi與流的支持是不同的。
    • 訓練后或保存的model即可預測批數據也可預測流數據
  4. 邏輯回歸訓練/預測過程示例

      linkFrom內部完成各業務處理邏輯,同時該部分可繼承EstimatorBase或TransformerBase形成PipelineStage

二、Alink使用介紹

  1.  使用概覽

    Pipeline pipeline = new Pipeline(
    		new Imputer()
    			.setSelectedCols("review")
    			.setOutputCols("featureText")
    			.setStrategy("value")
    			.setFillValue("null"),
    		new Segment()
    			.setSelectedCol("featureText"),
    		new StopWordsRemover()
    			.setSelectedCol("featureText"),
    		new DocCountVectorizer()
    			.setFeatureType("TF")
    			.setSelectedCol("featureText")
    			.setOutputCol("featureVector"),
    		new LogisticRegression()
    			.setVectorCol("featureVector")
    			.setLabelCol("label")
    			.setPredictionCol("pred")
    	);
    
    //pipeline.add(PipelineStage組件,index)
    
    PipelineModel model = pipeline.fit(source);
    model.save(filepath);
    
    PipelineModel model =PipelineModel.load(modelPath);
    model.transform(dataOperator);
    //可以model.getLocalPredictor("review string").map(row)形式進行本地預測
    
    Operator.execute();
    
  2. 數據獲取/保存
      1)hive示例
    data = HiveSourceBatchOp()
        .setInputTableName("tbl")
        .setPartitions("ds=2022/dt=01,ds=2022/dt=02").setHiveVersion("2.0.1") 
        .setHiveConfDir("hdfs://192.168.99.102:9000/hive-2.0.1/conf")
        .setDbName("mydb")	
    	
    sink = HiveSinkBatchOp()
        .setHiveVersion("2.0.1")
        .setHiveConfDir("hdfs://192.168.99.102:9000/hive-2.0.1/conf").setDbName("mydb")
        .setOutputTableName("tbl_sink")
        .setOverwriteSink(True)            
    

      2)Kafka

    Kafka011SinkStreamOp sink = new Kafka011SinkStreamOp()
    			.setBootstrapServers("localhost:9092")
    			.setDataFormat("json")
    			.setTopic("iris");
    

      3)DataSet

    DataSetWrapperBatchOp op = new DataSetWrapperBatchOp(dataSet,filedNames,fieldTypes);
    
  3. Alink算法與組件

    

 

   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM