Flink流處理之迭代案例


當前Flink將迭代的重心集中在批處理上,之前我們談及了批量迭代和增量迭代主要是針對批處理(DataSet)API而言的,而且Flink為批處理中的迭代提供了針對性的優化。

可是對於流處理(DataStream),Flink相同提供了對迭代的支持。這一節我們主要來分析流處理中的迭代,我們將會看到流處理中的迭代相較於批處理有類似之處。但差異也是十分之明顯。

可迭代的流處理程序同意定義“步函數”(step function)並將其內嵌到一個可迭代的流(IterativeStream)中。由於一個流處理程序可能永不終止,因此不同於批處理中的迭代機制,流處理中無法設置迭代的最大次數。取而代之的是,你能夠指定等待反饋輸入的最大時間間隔(假設超過該時間間隔沒有反饋元素到來。那么該迭代將會終止)。通過應用split或filter轉換,你能夠指定流的哪一部分用於反饋給迭代頭,哪一部分分發給下游。這里我們以filter作為演示樣例來展示可迭代的流處理程序的API使用模式。

首先。基於輸入流構建IterativeStream。這是一個迭代的起始。通常稱之為迭代頭:

IterativeStream<Integer> iteration = inputStream.iterate();

接着。我們指定一系列的轉換操作用於表述在迭代過程中運行的邏輯(這里簡單以map轉換作為演示樣例)。map API所接受的UDF就是我們上文所說的步函數:

DataStream<Integer> iteratedStream = iteration.map(/* this is executed many times */);

然后。作為迭代我們肯定須要有數據反饋給迭代頭進行反復計算,所以我們從迭代過的流中過濾出符合條件的元素組成的部分流,我們稱之為反饋流:

DataStream<Integer> feedbackStream = iteratedStream.filter(/* one part of the stream */);

將反饋流反饋給迭代頭就意味着一個迭代的完整邏輯的完畢,那么它就能夠“關閉”這個閉合的“環”了。通過調用IterativeStream的closeWith這一實例方法能夠關閉一個迭代(也可表述為定義了迭代尾)。傳遞給closeWith的數據流將會反饋給迭代頭:

iteration.closeWith(feedbackStream);

另外,一個慣用的模式是過濾出須要繼續向前分發的部分流,這個過濾轉換事實上定義的是“終止迭代”的邏輯條件,符合條件的元素將被分發給下游而不用於進行下一次迭代:

DataStream<Integer> output = iteratedStream.filter(/* some other part of the stream */);

跟分析批處理中的迭代一樣,我們仍然以解決實際問題的案例作為切入點來看看流處理中的迭代跟批處理中的迭代有何不同。

首先描寫敘述一下須要解決的問題:產生一個由一系列二元組(兩個字段都是在一個區間內產生的正整數來作為斐波那契數列的兩個初始值)構成的數據流。然后對該數據流中的二元組不斷地迭代使其產生斐波那契數列,直到某次產生的值大於給定的閾值,則停止迭代並輸出迭代次數。

該案例參考自Flink隨源代碼公布的迭代演示樣例,此案例問題規模較小而且能夠說明問題。

但它演示樣例代碼中的一系列變量稍顯混亂,為了增強程序的表述性,筆者會對其稍作調整。

這個案例假設拆分到對單個元素(二元組)的角度來看。其運行步驟例如以下圖所看到的:

Streaming-iteration-demo

n表示迭代次數。在最初的map轉換中初始化為0。m是判定迭代停止的閾值。

另外,T后面跟的是字段索引。比方T2表示取元組中位置為3的字段。

且注意隨着迭代T在不斷變化。

上面我們已經對問題的核心過程進行了分析。接下來我們會分步解決問題的構建迭代的流處理程序。

首先,我們先通過source函數創建初始的流對象inputStream:

DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());

該source函數會生成二元組序列,二元組的兩個字段值是隨機生成的作為斐波那契數列的初始值:

private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {    

    private Random random = new Random();    
    private volatile boolean isRunning = true;    
    private int counter = 0;   

    public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {        
        while (isRunning && counter < MAX_RANDOM_VALUE) {            
            int first = random.nextInt(MAX_RANDOM_VALUE / 2 - 1) + 1;            
            int second = random.nextInt(MAX_RANDOM_VALUE / 2 -1) + 1;  

            if (first > second) continue;            

            ctx.collect(new Tuple2<Integer, Integer>(first, second));            
            counter++;            
            Thread.sleep(50);        
        }    
    }    

    public void cancel() {        
        isRunning = false;    
    }
}

為了對新計算的斐波那契數列中的值以及累加的迭代次數進行存儲,我們須要將二元組數據流轉換為五元組數據流,並據此創建迭代對象:

IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =        
    inputStream.map(new TupleTransformMapFunction()).iterate(5000);

注意上面代碼段中iterate API的參數5000,不是指迭代5000次,而是等待反饋輸入的最大時間間隔為5秒。

流被覺得是無界的。所以無法像批處理迭代那樣指定最大迭代次數。但它同意指定一個最大等待間隔,假設在給定的時間間隔里沒有元素到來。那么將會終止迭代。

元組轉換的map函數實現:

private static class TupleTransformMapFunction extends RichMapFunction<Tuple2<Integer,        
    Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {    
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(            
        Tuple2<Integer, Integer> inputTuples) throws Exception {        
        return new Tuple5<Integer, Integer, Integer, Integer, Integer>(                
            inputTuples.f0,                
            inputTuples.f1,                
            inputTuples.f0,                
            inputTuples.f1,                
            0);    
    }
}

上面五元組中,當中索引為0。1這兩個位置的元素,始終都是最初生成的兩個元素不會變化,而后三個字段都會隨着迭代而變化。

在迭代流iterativeStream創建完畢之后,我們將基於它運行斐波那契數列的步函數並產生斐波那契數列流fibonacciStream:

DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =        
    iterativeStream.map(new FibonacciCalcStepFunction());

這里的fibonacciStream僅僅是一個代稱,當中的數據並非真正的斐波那契數列,事實上就是上面那個五元組。

當中用於計算斐波那契數列的步函數實現例如以下:

private static class FibonacciCalcStepFunction extends        
    RichMapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,        
    Tuple5<Integer, Integer, Integer, Integer, Integer>> {    
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(            
        Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) throws Exception {        
        return new Tuple5<Integer, Integer, Integer, Integer, Integer>(                
            inputTuple.f0,                
            inputTuple.f1,                
            inputTuple.f3,                
            inputTuple.f2 + inputTuple.f3,                
            ++inputTuple.f4);    
    }
}

正如上文所述。后三個字段會產生變化。在計算之前,數列最后一個元素會被保留。也就是f3相應的元素,然后通過f2元素加上f3元素會產生最新值並更新f3元素。而f4則會累加。

隨着迭代次數添加,不是整個數列都會被保留。僅僅有最初的兩個元素和最新的兩個元素會被保留,這里也不是必需保留整個數列,由於我們不須要完整的數列。我們僅僅須要對最新的兩個元素進行推斷就可以。

上文我們對每一個元素計算斐波那契數列的新值並產生了fibonacciStream,可是我們須要對最新的兩個值進行推斷。看它們是否超過了指定的閾值。超過了閾值的元組將會被輸出,而沒有超過的則會再次參與迭代。因此這將產生兩個不同的分支。我們也為此構建了分支流:

SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =        
    fibonacciStream.split(new FibonacciOverflowSelector());

而對是否超過閾值的元組進行推斷並分離的實現例如以下:

private static class FibonacciOverflowSelector implements OutputSelector< Tuple5<Integer, Integer, Integer, Integer, Integer>> {    
    public Iterable<String> select(            
        Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) {        
        if (inputTuple.f2 < OVERFLOW_THRESHOLD && inputTuple.f3 < OVERFLOW_THRESHOLD) {            
            return Collections.singleton(ITERATE_FLAG);        
        }        

        return Collections.singleton(OUTPUT_FLAG);    
    }
}

在篩選方法select中,我們對不同的分支以不同的常量標識符進行標識:ITERATE_FLAG(還要繼續迭代)和OUTPUT_FLAG(直接輸出)。

產生了分支流之后。我們就能夠從中檢出不同的流分支做迭代或者輸出處理。

對須要再次迭代的,就通過迭代流的closeWith方法反饋給迭代頭:

iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));

而對於不須要的迭代就直接讓其流向下游處理,這里我們僅僅是簡單得將流“重構”了一下然后直接輸出:

DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream        
    .select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());
outputStream.print();

所謂的重構就是將之前的五元組又一次縮減為三元組,實現例如以下:

private static class BuildOutputTupleMapFunction extends RichMapFunction<        
    Tuple5<Integer, Integer, Integer, Integer, Integer>,        
    Tuple3<Integer, Integer, Integer>> {    
    public Tuple3<Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,            
        Integer> inputTuple) throws Exception {        
        return new Tuple3<Integer, Integer, Integer>(                
            inputTuple.f0,                
            inputTuple.f1,                
            inputTuple.f4);    
    }
}

終於我們將會得到類似例如以下的輸出:

(7,14,5)

(18,37,3)

(3,46,3)

(23,32,3)

(31,43,2)

(13,45,2)

(37,42,2)

……

前兩個整數是斐波那契數列的兩個初始值。第三個整數表示其須要經歷多少次迭代其斐波那契數列最新的兩個值才會超過閾值。

終於完整的主干程序代碼例如以下:

public static void main(String[] args) throws Exception {    
    StreamExecutionEnvironment env = StreamExecutionEnvironment            
        .getExecutionEnvironment().setBufferTimeout(1);

    DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource()); 

    IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =            
        inputStream.map(new TupleTransformMapFunction()).iterate(5000); 

    DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =            
        iterativeStream.map(new FibonacciCalcStepFunction()); 

    SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =            
        fibonacciStream.split(new FibonacciOverflowSelector()); 

    iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG)); 

    DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream            
        .select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction()); 

    outputStream.print(); 
    env.execute("Streaming Iteration Example");
}

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat


QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM