[源碼分析] 從FlatMap用法到Flink的內部實現


[源碼分析] 從FlatMap用法到Flink的內部實現

0x00 摘要

本文將從FlatMap概念和如何使用開始入手,深入到Flink是如何實現FlatMap。希望能讓大家對這個概念有更深入的理解。

0x01 Map vs FlatMap

首先我們先從概念入手。

自從響應式編程慢慢壯大以來,這兩個單詞現在越來越被大家熟悉了。前端能見到它們的身影,后台也能見到;安卓里面有,iOS也有。很多兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給大家簡單講解下。

map

它把數組流中的每一個值,使用所提供的函數執行一遍,一一對應。得到與元素個數相同的數組流。然后返回這個新數據流。

flatMap

flat是扁平的意思。所以這個操作是:先映射(map),再拍扁(join)。

flatMap輸入可能是多個子數組流。所以flatMap先針對 每個子數組流的每個元素進行映射操作。然后進行扁平化處理,最后匯集所有進行扁平化處理的結果集形成一個新的列表(扁平化簡而言之就是去除所有的修飾)。

flatMap與map另外一個不一樣的地方就是傳入的函數在處理完后返回值必須是List。

實例

比如拿到一個文本文件之后,我們是按行讀取,按行處理。如果要對每一行的單詞數進行計數,那么應該選擇Map方法,如果是統計詞頻,就應該選擇flatMap方法。

如果還不清楚,可以看看下面這個例子:

梁山新進一批好馬,准備給每個馬軍頭領配置一批。於是得到函數以及頭領名單如下:

函數 = ( 頭領 => 頭領 + 好馬 )
五虎將 = List(關勝、林沖、秦明、呼延灼、董平 )
八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 )

// Map函數的例子
利用map函數,我們可以得到 五虎將馬軍

五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 )
結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 )

// flatMap函數的例子
但是為了得到統一的馬軍,則可以用flatMap:

馬軍頭領 = List(五虎將,八驃騎)
馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 ) 

結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 )

現在大家應該清楚了吧。接下來看看幾個FlatMap的實例。

Scala語言的實現

Scala本身對於List類型就有map和flatMap操作。舉例如下:

val names = List("Alice","James","Apple")
val strings = names.map(x => x.toUpperCase)
println(strings)
// 輸出 List(ALICE, JAMES, APPLE)

val chars = names.flatMap(x=> x.toUpperCase())
println(chars)
// 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)

Flink的例子

以上是scala語言層面的實現。下面我們看看Flink框架是如何使用FlatMap的。

網上常見的一個Flink應用的例子:

//加載數據源
val source = env.fromElements("china is the best country","beijing is the capital of china")

//轉化處理數據
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)

Flink源碼中的例子

case class WordWithCount(word: String, count: Long)

val text = env.socketTextStream(host, port, '\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")

windowCounts.print()

上面提到的都是簡單的使用,如果有復雜需求,在Flink中,我們可以通過繼承FlatMapFunction和RichFlatMapFunction來自定義算子。

函數類FlatMapFunction

對於不涉及到狀態的使用,可以直接繼承 FlatMapFunction,其定義如下:

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
	void flatMap(T value, Collector<O> out) throws Exception;
}

如何自定義算子呢,這個可以直接看看Flink中的官方例子

// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String, String> {
  @Override
  public void flatMap(String value, Collector<String> out) {
    for (String token : value.split("\\W")) {
      out.collect(token);
    }
  }
}

// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());

Rich函數類RichFlatMapFunction

對於涉及到狀態的情況,用戶可以使用繼承 RichFlatMapFunction 類的方式來實現UDF。

RichFlatMapFunction屬於Flink的Rich函數類。從名稱上來看,這種函數類在普通的函數類上增加了Rich前綴,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函數類,Rich函數類增加了:

  • open()方法:Flink在算子調用前會執行這個方法,可以用來進行一些初始化工作。
  • close()方法:Flink在算子最后一次調用結束后執行這個方法,可以用來釋放一些資源。
  • getRuntimeContext方法:獲取運行時上下文。每個並行的算子子任務都有一個運行時上下文,上下文記錄了這個算子運行過程中的一些信息,包括算子當前的並行度、算子子任務序號、廣播數據、累加器、監控數據。最重要的是,我們可以從上下文里獲取狀態數據

FlatMap對應的RichFlatMapFunction如下:

@Public
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
	@Override
	public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
}

其基類 AbstractRichFunction 如下,可以看到主要是和運行時上下文建立了聯系,並且有初始化和退出操作

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
  
	private transient RuntimeContext runtimeContext;

	@Override
	public void setRuntimeContext(RuntimeContext t) {
		this.runtimeContext = t;
	}

	@Override
	public RuntimeContext getRuntimeContext() {
			return this.runtimeContext;
	}

	@Override
	public IterationRuntimeContext getIterationRuntimeContext() {
    if (this.runtimeContext instanceof IterationRuntimeContext) {
			return (IterationRuntimeContext) this.runtimeContext;
		} 
	}

	@Override
	public void open(Configuration parameters) throws Exception {}

	@Override
	public void close() throws Exception {}
}

如何最好的使用? 當然還是官方文檔和例子最靠譜。

因為涉及到狀態,所以如果使用,你必須創建一個 StateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱(你可以創建多個狀態,並且它們必須具有唯一的名稱以便可以引用它們),狀態所持有值的類型,並且可能包含用戶指定的函數,例如ReduceFunction。 根據不同的狀態類型,可以創建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

狀態通過 RuntimeContext 進行訪問,因此只能在 rich functions 中使用。 但是我們也會看到一個例子。RichFunctionRuntimeContext 提供如下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

下面是一個 FlatMapFunction 的例子,展示了如何將這些部分組合起來:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}

object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

這個例子實現了一個簡單的計數窗口。 我們把元組的第一個元素當作 key(在示例中都 key 都是 “1”)。 該函數將出現的次數以及總和存儲在 “ValueState” 中。 一旦出現次數達到 2,則將平均值發送到下游,並清除狀態重新開始。 請注意,我們會為每個不同的 key(元組中第一個元素)保存一個單獨的值。

0x03 從Flink源碼入手看FlatMap實現

FlatMap從Flink編程模型角度講屬於一個算子,用來對數據流或者數據集進行轉換。從框架角度說,FlatMap是怎么實現的呢? 或者說FlatMap是怎么從用戶代碼轉換到Flink運行時呢 ?

1. DataSet

首先說說 DataSet相關這套系統中FlatMap的實現。

請注意,DataSteam對應的那套系統中,operator名字都是帶着Stream的,比如StreamOperator。

DataSet

val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) 這段代碼調用的就是DataSet中的API。具體如下:

public abstract class DataSet<T> {
  
	public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
    
		String callLocation = Utils.getCallLocationName();
    
		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
		return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
	}
}

FlatMapOperator

可以看出,flatMap @ DataSet 主要就是生成了一個FlatMapOperator,這個可以理解為是邏輯算子。其定義如下:

public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, FlatMapOperator<IN, OUT>> {

	protected final FlatMapFunction<IN, OUT> function;
	protected final String defaultName;

	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function, String defaultName) {
		super(input, resultType);
		this.function = function;
		this.defaultName = defaultName;
	}

	@Override
	protected FlatMapFunction<IN, OUT> getFunction() {
		return function;
	}

  // 這個translateToDataFlow就是生成計划(Plan)的關鍵代碼
	@Override
	protected FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
		String name = getName() != null ? getName() : "FlatMap at " + defaultName;
		// create operator
		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function,
			new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
		// set input
		po.setInput(input);
		// set parallelism
		if (this.getParallelism() > 0) {
			// use specified parallelism
			po.setParallelism(this.getParallelism());
		} else {
			// if no parallelism has been specified, use parallelism of input operator to enable chaining
			po.setParallelism(input.getParallelism());
		}
		return po;
	}
}

FlatMapOperator的基類如下:

public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>> extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> {

}

// Base class for operations that operates on a single input data set.
public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> {
  	private final DataSet<IN> input;
}

生成計划

DataSet API所編寫的批處理程序跟DataStream API所編寫的流處理程序在生成作業圖(JobGraph)之前的實現差別很大。流處理程序是生成流圖(StreamGraph),而批處理程序是生成計划(Plan)並由優化器對其進行優化並生成優化后的計划(OptimizedPlan)。

計划(Plan)以數據流(dataflow)的形式來表示批處理程序,但它只是批處理程序最初的表示,在一個批處理程序生成作業圖之前,計划還會被進行優化以產生更高效的方案。Plan不同於流圖(StreamGraph),它以sink為入口,因為一個批處理程序可能存在若干個sink,所以Plan采用集合來存儲它。另外Plan還封裝了批處理作業的一些基本屬性:jobId、jobName以及defaultParallelism等。

生成Plan的核心部件是算子翻譯器(OperatorTranslation),createProgramPlan方法通過它來”翻譯“出計划,核心代碼如下

public class OperatorTranslation {
  
   // 接收每個需遍歷的DataSink對象,然后將其轉換成GenericDataSinkBase對象
   public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
       List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
       //遍歷sinks集合
       for (DataSink<?> sink : sinks) {
            //將翻譯生成的GenericDataSinkBase加入planSinks集合*,對每個sink進行”翻譯“
            planSinks.add(translate(sink));
        }
       //以planSins集合構建Plan對象
       Plan p = new Plan(planSinks);
       p.setJobName(jobName);
       return p;
    }

	private <I, O> org.apache.flink.api.common.operators.Operator<O>    translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
    //會調用到 FlatMapOperator 的 translateToDataFlow
	org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);    
  }
  
}

FlatMapOperatorBase就是生成的plan中的一員。

public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
	@Override
	protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
		FlatMapFunction<IN, OUT> function = userFunction.getUserCodeObject();
		
		FunctionUtils.setFunctionRuntimeContext(function, ctx);
		FunctionUtils.openFunction(function, parameters);

		ArrayList<OUT> result = new ArrayList<OUT>(input.size());

		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);

		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);

		for (IN element : input) {
			IN inCopy = inSerializer.copy(element);
			function.flatMap(inCopy, resultCollector);
		}

		FunctionUtils.closeFunction(function);

		return result;
	}
}

而最后優化時候,則FlatMapOperatorBase會被優化成FlatMapNode。

public class GraphCreatingVisitor implements Visitor<Operator<?>> {
	public boolean preVisit(Operator<?> c) {
    else if (c instanceof FlatMapOperatorBase) {
			n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
		}
  }
}

自此,FlatMap就被組合到 DataSet的 OptimizedPlan 中。下一步Flink會依據OptimizedPlan來生成 JobGraph。

作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。

在運行狀態下,如果上游有數據流入,則FlatMap這個算子就會發揮作用。

2. DataStream

對於DataStream,則是另外一套體系結構。首先我們找一個使用DataStream的例子看看。

//獲取數據: 從socket中獲取
val textDataStream = env.socketTextStream("127.0.0.1", 8888, '\n')
val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))

//groupby: 按照指定的字段聚合
val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5),Time.seconds(1))
windowDstram.sum("count").print()

上面例子中,flatMap 調用的是DataStream中的API,具體如下:

public class DataStream<T> {
  
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    //clean函數用來移除FlatMapFunction類對象的外部類部分,這樣就可以進行序列化
    //getType用來獲取類對象的輸出類型
		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
				getType(), Utils.getCallLocationName(), true);
		return flatMap(flatMapper, outType);
	}
  
  // 構建了一個StreamFlatMap的Operator
	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
		return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
	}  
  
  // 依次調用下去
	@PublicEvolving
	public <R> SingleOutputStreamOperator<R> transform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			OneInputStreamOperator<T, R> operator) {
		return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
	}
  
	protected <R> SingleOutputStreamOperator<R> doTransform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			StreamOperatorFactory<R> operatorFactory) {
		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();
    // 構建Transform對象,Transform對象中包含其上游Transform對象,這樣上游下游就串成了一個Transform鏈。
		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
				this.transformation,
				operatorName,
				operatorFactory,
				outTypeInfo,
				environment.getParallelism());
		@SuppressWarnings({"unchecked", "rawtypes"})
		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    // 將這Transform對象放入env的transform對象列表。
		getExecutionEnvironment().addOperator(resultTransform);
    // 返回流
		return returnStream;
	}  
}

上面源碼中的幾個概念需要澄清。

Transformation:首先,FlatMap在FLink編程模型中是算子API,在DataStream中會生成一個Transformation,即邏輯算子。

邏輯算子Transformation最后會對應到物理算子Operator,這個概念對應的就是StreamOperator

StreamOperator:DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。

processElement()方法也是UDF的邏輯被調用的地方,例如FlatMapFunction里的flatMap()方法。

public class StreamFlatMap<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT> {

	private transient TimestampedCollector<OUT> collector;

	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
		super(flatMapper);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();
		collector = new TimestampedCollector<>(output);
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		collector.setTimestamp(element);
    // 調用用戶定義的FlatMap
		userFunction.flatMap(element.getValue(), collector);
	}
}

我們可以看到,StreamFlatMap繼承了AbstractUdfStreamOperator,從而間接繼承了StreamOperator。

public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
}

StreamOperator是根接口。對於 Streaming 來說所有的算子都繼承自 StreamOperator。繼承了StreamOperator的擴展接口則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractUdfStreamOperator。

從 API 到 邏輯算子 Transformation,再到 物理算子Operator,就生成了 StreamGraph。下一步Flink會依據StreamOperator來生成 JobGraph。

作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。至此就完成了從用戶業務代碼到Flink運行系統的轉化。

0x04 參考

Flink中richfunction的一點小作用

【淺顯易懂】scala中map與flatMap的區別

Working with State

flink簡單應用: scala編寫wordcount

【Flink】Flink基礎之實現WordCount程序(Java與Scala版本)

Flink進階教程:以flatMap為例,如何進行算子自定義

Flink運行時之批處理程序生成計划


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM