

Flink包含8中分區策略,這8中分區策略(分區器)分別如下面所示,本文將從源碼的角度一一解讀每個分區器的實現方式。
GlobalPartitioner
ShufflePartitioner
RebalancePartitioner
RescalePartitioner
BroadcastPartitioner
ForwardPartitioner
KeyGroupStreamPartitioner
CustomPartitionerWrapper
繼承關系圖
接口
名稱
ChannelSelector
實現
public interface ChannelSelector<T extends IOReadableWritable> {
/**
* 初始化channels數量,channel可以理解為下游Operator的某個實例(並行算子的某個subtask).
*/
void setup(int numberOfChannels);
/**
*根據當前的record以及Channel總數,
*決定應將record發送到下游哪個Channel。
*不同的分區策略會實現不同的該方法。
*/
int selectChannel(T record);
/**
*是否以廣播的形式發送到下游所有的算子實例
*/
boolean isBroadcast();
}
抽象類
名稱
StreamPartitioner
實現
public abstract class StreamPartitioner<T> implements
ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
private static final long serialVersionUID = 1L;
protected int numberOfChannels;
@Override
public void setup(int numberOfChannels) {
this.numberOfChannels = numberOfChannels;
}
@Override
public boolean isBroadcast() {
return false;
}
public abstract StreamPartitioner<T> copy();
}
繼承關系圖
GlobalPartitioner
簡介
該分區器會將所有的數據都發送到下游的某個算子實例(subtask id = 0)
源碼解讀
/**
* 發送所有的數據到下游算子的第一個task(ID = 0)
* @param <T>
*/
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
//只返回0,即只發送給下游算子的第一個task
return 0;
}
@Override
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "GLOBAL";
}
}
圖解
ShufflePartitioner
簡介
隨機選擇一個下游算子實例進行發送
源碼解讀
/**
* 隨機的選擇一個channel進行發送
* @param <T>
*/
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private Random random = new Random();
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
//產生[0,numberOfChannels)偽隨機數,隨機發送到下游的某個task
return random.nextInt(numberOfChannels);
}
@Override
public StreamPartitioner<T> copy() {
return new ShufflePartitioner<T>();
}
@Override
public String toString() {
return "SHUFFLE";
}
}
圖解
BroadcastPartitioner
簡介
發送到下游所有的算子實例
源碼解讀
/**
* 發送到所有的channel
*/
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
/**
* Broadcast模式是直接發送到下游的所有task,所以不需要通過下面的方法選擇發送的通道
*/
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}
@Override
public boolean isBroadcast() {
return true;
}
@Override
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "BROADCAST";
}
}
圖解
RebalancePartitioner
簡介
通過循環的方式依次發送到下游的task
源碼解讀
/**
*通過循環的方式依次發送到下游的task
* @param <T>
*/
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private int nextChannelToSendTo;
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
//初始化channel的id,返回[0,numberOfChannels)的偽隨機數
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
//循環依次發送到下游的task,比如:nextChannelToSendTo初始值為0,numberOfChannels(下游算子的實例個數,並行度)值為2
//則第一次發送到ID = 1的task,第二次發送到ID = 0的task,第三次發送到ID = 1的task上...依次類推
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "REBALANCE";
}
}
圖解
RescalePartitioner
簡介
基於上下游Operator的並行度,將記錄以循環的方式輸出到下游Operator的每個實例。
舉例: 上游並行度是2,下游是4,則上游一個並行度以循環的方式將記錄輸出到下游的兩個並行度上;上游另一個並行度以循環的方式將記錄輸出到下游另兩個並行度上。
若上游並行度是4,下游並行度是2,則上游兩個並行度將記錄輸出到下游一個並行度上;上游另兩個並行度將記錄輸出到下游另一個並行度上。
源碼解讀
@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private int nextChannelToSendTo = -1;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "RESCALE";
}
}
圖解
尖叫提示
Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。
StreamGraph
:是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
JobGraph
:StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
ExecutionGraph
:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,並不是一個具體的數據結構。
而StreamingJobGraphGenerator就是StreamGraph轉換為JobGraph。在這個類中,把ForwardPartitioner和RescalePartitioner列為POINTWISE分配模式,其他的為ALL_TO_ALL分配模式。代碼如下:
if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
// 上游算子(生產端)的實例(subtask)連接下游算子(消費端)的一個或者多個實例(subtask)
DistributionPattern.POINTWISE,
resultPartitionType);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
// 上游算子(生產端)的實例(subtask)連接下游算子(消費端)的所有實例(subtask)
DistributionPattern.ALL_TO_ALL,
resultPartitionType);
}
ForwardPartitioner
簡介
發送到下游對應的第一個task,保證上下游算子並行度一致,即上有算子與下游算子是1:1的關系
源碼解讀
/**
* 發送到下游對應的第一個task
* @param <T>
*/
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "FORWARD";
}
}
圖解
尖叫提示
在上下游的算子沒有指定分區器的情況下,如果上下游的算子並行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner,對於ForwardPartitioner,必須保證上下游算子並行度一致,否則會拋出異常
//在上下游的算子沒有指定分區器的情況下,如果上下游的算子並行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
//如果上下游的並行度不一致,會拋出異常
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
KeyGroupStreamPartitioner
簡介
根據key的分組索引選擇發送到相對應的下游subtask
源碼解讀
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/**
* 根據key的分組索引選擇發送到相對應的下游subtask
* @param <T>
* @param <K>
*/
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
//調用KeyGroupRangeAssignment類的assignKeyToParallelOperator方法,代碼如下所示
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
...
}
org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
...
/**
* 根據key分配一個並行算子實例的索引,該索引即為該key要發送的下游算子實例的路由信息,
* 即該key發送到哪一個task
*/
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
/**
*根據key分配一個分組id(keyGroupId)
*/
public static int assignToKeyGroup(Object key, int maxParallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
//獲取key的hashcode
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
/**
* 根據key分配一個分組id(keyGroupId),
*/
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
//與maxParallelism取余,獲取keyGroupId
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
//計算分區index,即該key group應該發送到下游的哪一個算子實例
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
...
圖解
CustomPartitionerWrapper
簡介
通過Partitioner
實例的partition
方法(自定義的)將記錄輸出到下游。
public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
Partitioner<K> partitioner;
KeySelector<T, K> keySelector;
public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
this.partitioner = partitioner;
this.keySelector = keySelector;
}
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
//實現Partitioner接口,重寫partition方法
return partitioner.partition(key, numberOfChannels);
}
@Override
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "CUSTOM";
}
}
比如:
public class CustomPartitioner implements Partitioner<String> {
// key: 根據key的值來分區
// numPartitions: 下游算子並行度
@Override
public int partition(String key, int numPartitions) {
return key.length() % numPartitions;//在此處定義分區策略
}
}
小結
本文主要從源碼層面對Flink的8中分區策略進行了一一分析,並對每一種分區策略給出了相對應的圖示,方便快速理解源碼。如果你覺得本文對你有用,可以關注我,了解更多精彩內容。


本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。