Flink入門訓練--以New York City Taxi為例


最近在學Flink,准備用Flink搭建一個實時的推薦系統。找到一個好的網站(也算作是flink創始者的官方網站),上面有關於Flink的上手教程,用來練練手,熟悉熟悉,下文僅僅是我的筆記。

1. 數據集

網站New York City Taxi & Limousine Commission提供了關於紐約市從2009-1015年關於出租車駕駛的公共數據集。

具體數據下載方法,可見# Taxi Data Streams,下載完數據后,不要解壓縮。

我們的第一個數據集包含紐約市的出租車出行的信息,每一次出行包含兩個事件:START和END,可以分別理解為開始和結束該行程。每一個事件又包括11個屬性,詳細介紹如下:

taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events
startTime      : DateTime  // the start time of a ride
endTime        : DateTime  // the end time of a ride,
                           //   "1970-01-01 00:00:00" for start events
startLon       : Float     // the longitude of the ride start location
startLat       : Float     // the latitude of the ride start location
endLon         : Float     // the longitude of the ride end location
endLat         : Float     // the latitude of the ride end location
passengerCnt   : Short     // number of passengers on the ride

另一個數據集包含出租車的費用信息,與每一次行程對應:

taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
startTime      : DateTime  // the start time of a ride
paymentType    : String    // CSH or CRD
tip            : Float     // tip(小費) for this ride
tolls          : Float     // tolls for this ride
totalFare      : Float     // total fare collected

2. 生成數據流

首先定義TaxiRide事件,即數據集中的每一個record。

我們使用Flink的source函數(TaxiRideSource)讀取TaxiRide流,這個source是基於事件時間進行的。同樣的,費用事件TaxiFare的流通過函數TaxiFareSource進行傳送。為了讓生成的流更加真實,事件傳送的時間是與timestamp成比例的。兩個真實相隔十分鍾發生的事件在流中也相差十分鍾。此外,我們可以定義一個變量speed-up factor為60,該變量為加速因子,那么真實事件中的一分鍾在流中只有1秒鍾,縮短60倍嘛。不僅如此,我們還可以定義最大服務延時,這個延時使得每個事件在最大服務延時之內隨機出現,這么做的目的是讓這個流的事件產生與在real-world發生的不確定性更接近。

對於這個應用,我們設置speed-up factor為600(即10分鍾相當於1秒),以及最大延時時間為60。

所有的行動都應使用事件時間(event time)(相對於處理時間(processing time))來實現。

Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.

事件時間(event time)將程序語義與服務速度分離開,即使在歷史數據或無序傳送的數據的情況下也能保證一致的結果。簡單來說就是,在數據處理的過程中,依賴的時間跟在流中出現的時間無關,只跟該事件發生的時間有關。

private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {  
  
  // 設置服務開始時間servingStartTime  
  long servingStartTime = Calendar.getInstance().getTimeInMillis();  
  
  // 數據開始時間dataStartTime,即第一個ride的timestamp  
  long dataStartTime;  
  
  Random rand = new Random(7452);  
  
  // 使用優先隊列進行emit,其比較方式為他們的等待時間  
  PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(  
         32,  
			 new Comparator<Tuple2<Long, Object>>() {  
              @Override  
			  public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {  
		               return o1.f0.compareTo(o2.f0); }  
	          });  
  
  // 讀取第一個ride,並將第一個ride插入到schedule里  
  String line;  
  TaxiRide ride;  
  if (reader.ready() && (line = reader.readLine()) != null) {  
  // read first ride  
  ride = TaxiRide.fromString(line);  
  // extract starting timestamp  
  dataStartTime = getEventTime(ride);  
  // get delayed time,這個delayedtime是dataStartTime加一個隨機數,隨機數有最大范圍,用來模擬真實世界情況  
  long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);  
  
  // 將ride插入到schedule里  
  emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));  
  // 設置水印時間  
  long watermarkTime = dataStartTime + watermarkDelayMSecs;  
  // 下一個水印時間是時間戳是 watermarkTime - maxDelayMsecs - 1  
  // 只能證明,這個時間一定是小於dataStartTime的  Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);  
  // 將該水印放入Schedule,且這個水印被優先隊列移到了ride之前  
  emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));  
  
  } else {  
      return;  
  }  
  
  // 從文件里讀取下一個ride(peek)  
  if (reader.ready() && (line = reader.readLine()) != null) {  
      ride = TaxiRide.fromString(line);  
  }  
  
  // read rides one-by-one and emit a random ride from the buffer each time  
  while (emitSchedule.size() > 0 || reader.ready()) {  
  
	  // insert all events into schedule that might be emitted next  
	  // 在Schedule里的下一個事件的延時后時間  long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;  
	  // 當前從文件讀取的ride的事件時間  
	  long rideEventTime = ride != null ? getEventTime(ride) : -1;  
	  // 這個while循環用來進行當前Schedule為空的情況  
	  while(  
            ride != null && ( // while there is a ride AND  
			emitSchedule.isEmpty() || // and no ride in schedule OR  
			rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule  
			)  
      {  
          // insert event into emit schedule  
		  long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);  
		  emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));  
  
		  // read next ride  
		  if (reader.ready() && (line = reader.readLine()) != null) {  
	            ride = TaxiRide.fromString(line);  
				rideEventTime = getEventTime(ride);  
		  }  
		  else {  
	            ride = null;  
			    rideEventTime = -1;  
		  }  
      }  
  
      // 提取Schedule里的第一個ride,叫做head  
	  Tuple2<Long, Object> head = emitSchedule.poll();  
	  // head應該要到達的時間  
	  long delayedEventTime = head.f0;  
	  long now = Calendar.getInstance().getTimeInMillis();  
  
	  // servingTime = servingStartTime + (delayedEventTime - dataStartTime)/ this.servingSpeed  
	  long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);  
	  // 應該再等多久,才讓這個ride發生呢?(哈哈,我好喜歡這個描述)  
	  long waitTime = servingTime - now;  
	  // 既然要等,那就睡着等吧  
	  Thread.sleep( (waitTime > 0) ? waitTime : 0);  
	  // 如果這個head是一個TaxiRide  
	  if(head.f1 instanceof TaxiRide) {  
	         TaxiRide emitRide = (TaxiRide)head.f1;  
			  // emit ride  
			 sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));  
	   }  
      // 如果這個head是一個水印標志  
	  else if(head.f1 instanceof Watermark) {  
	         Watermark emitWatermark = (Watermark)head.f1;  
			 // emit watermark  
			 sourceContext.emitWatermark(emitWatermark);  
			 // 並設置下一個水印標志到Schedule中  
			 long watermarkTime = delayedEventTime + watermarkDelayMSecs;  
			 // 同樣,保證這個水印的時間戳在下一個ride的timestamp之前  
			 Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);  
			 emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));  
	  }  
   }  
}

那么,如何在java中運行這些sources,下面是一個示例:

// get an ExecutionEnvironment
StreamExecutionEnvironment env = StreamExcutionEnvironment.getExecutionEnvironment();
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
	new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));

另外,有一些應用需要我們使用加入檢查點的機制。檢查點(checkpoint)是從failure中恢復的一種機制。他也需要建立CheckpointedTaxiRideSource來在流中運行。

3. 數據清洗🛀

3.1 數據連接🔗

由於我們的應用要研究的是在紐約市內的出租車情況,所以我們要排除掉紐約市外的地點。通過這個過濾器:

private static class NYCFilter implements FilterFunction<TaxiRide> {  
  @Override  
  public boolean filter(TaxiRide taxiRide) throws Exception {  
      return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&  
             GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);  
  }  
}

執行過濾器:

// start the data generator  
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));  
  
DataStream<TaxiRide> filteredRides = rides  
  // filter out rides that do not start or stop in NYC  
  .filter(new NYCFilter());

現在我們需要把TaxiRide和TaxiFare兩者的數據記錄結合。在這個過程中,我們要同時處理兩個source的流數據。這里介紹幾個用到的Transformation functions:

  • FlatMap: 輸入1個record,輸出為0或1或更多個records的映射
  • Filter:進行評估,如果結果為Ture,則傳輸record
  • KeyBy:用來將記錄按照第一個元素(一個字符串)進行分組,根據該key將數據進行重新分區,然后將記錄再發送給下一個算子

由於我們沒辦法控制ride和fare到達的先后,所以我們儲存先到的信息直到和他匹配的信息到來。這就需要用到有狀態的計算

public class RidesAndFaresExercise extends ExerciseBase {  
   public static void main(String[] args) throws Exception {  
  
  ParameterTool params = ParameterTool.fromArgs(args);  
  final String ridesFile = params.get("rides", pathToRideData);  
  final String faresFile = params.get("fares", pathToFareData);  
 
  final int delay = 60; // at most 60 seconds of delay  
  final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second  
 
  // set up streaming execution environment  
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
  env.setParallelism(ExerciseBase.parallelism);  
  
  DataStream<TaxiRide> rides = env  
            .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))  
            .filter((TaxiRide ride) -> ride.isStart)  
            .keyBy("rideId");  
  
  DataStream<TaxiFare> fares = env  
            .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))  
            .keyBy("rideId");  
  
  DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides  
            .connect(fares)  
            .flatMap(new EnrichmentFunction());  
  
  printOrTest(enrichedRides);  
  
  env.execute("Join Rides with Fares (java RichCoFlatMap)");  
  }  
  
   public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {  
  
   // keyed, managed state  
   private ValueState<TaxiRide> rideState;  
   private ValueState<TaxiFare> fareState;  
  
   @Override  
   public void open(Configuration config) throws Exception {  
         rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));  
		 fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));  
  }  
  
   @Override  
   public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
         TaxiFare fare = fareState.value();  
		 if (fare != null) {  
            fareState.clear();  
			out.collect(new Tuple2(ride, fare));  
		  } else {  
            rideState.update(ride);  
		  }  
      }  
  
    @Override  
    public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
         TaxiRide ride = rideState.value();  
		 if (ride != null){  
            rideState.clear();  
			out.collect(new Tuple2(ride, fare));  
		  } else {  
            fareState.update(fare);  
		  }  
      }  
   }  
}

運行,可以看到,生成的數據是這樣的,ride和fare結合到了一起:

3> (196965,START,2013-01-01 11:54:08,1970-01-01 00:00:00,-73.99048,40.75611,-73.98388,40.767143,2,2013007021,2013014447,196965,2013007021,2013014447,2013-01-01 11:54:08,CSH,0.0,0.0,6.5)
1> (197311,START,2013-01-01 11:55:44,1970-01-01 00:00:00,-73.98894,40.72127,-73.95267,40.771126,1,2013008802,2013012009,197311,2013008802,2013012009,2013-01-01 11:55:44,CRD,2.7,0.0,16.2)
2> (196608,START,2013-01-01 11:53:00,1970-01-01 00:00:00,-73.97817,40.761055,-73.98574,40.75613,2,2013004060,2013014162,196608,2013004060,2013014162,2013-01-01 11:53:00,CSH,0.0,0.0,5.5)

3.2 狀態緩存清理

那么現在,我們想要上面的兩者結合操作更加的Robust。對於現實中的數據,有時某些record會丟失,這意味着我們可能只收到TaxiRide and TaxiFare中的一個,另一個永遠不會到。所以先到的那個record會一直占用着內存。為了解決這個問題,我們嘗試在CoProcessFunction中清理掉沒有被匹配的狀態。

這個功能定義在類 ExpiringStateExercise中:

首先給出missing data的輸入,這里我們丟掉所有ride的END事件,START事件每隔1000個丟失一個。😯

DataStream<TaxiRide> rides = env  
      .addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor)))  
      .filter((TaxiRide ride) -> (ride.isStart && (ride.rideId % 1000 != 0)))  
      .keyBy(ride -> ride.rideId);

SingleOutputStreamOperator processed = rides  
      .connect(fares)  
      // Applies the given ProcessFunction on the input stream, thereby creating a transformed output stream.  
      // The function will be called for every element in the input streams and can produce zero or more output elements.  
	  .process(new EnrichmentFunction());  

我們使用CoprocessingFunction來進行上面描述的操作。對於有兩個inputs的流來說,下面的描述生動形象的介紹了我們需要override的3個方法:

For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

processElement1(...) & processElement2(...) 用於兩個數據流的call。onTimer()用於設定拋棄掉沒有尋到匹配的record的動作。

@Override  
// Called when a timer set using TimerService fires.  
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
   if (fareState.value() != null) {  
      ctx.output(unmatchedFares, fareState.value());  
	  fareState.clear();  
  }  
   if (rideState.value() != null) {  
      ctx.output(unmatchedRides, rideState.value());  
	  rideState.clear();  
  }  
}  
  
@Override  
// A Context that allows querying the timestamp of the element,  
// querying the TimeDomain of the firing timer and getting a TimerService for registering timers and querying the time.  
// The context is only valid during the invocation of this method, do not store it.  
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
   // 當前處理事件是ride,且當前狀態中fare為非空, 則輸出。  
   // (由於ride在之前已經被keyby()過,這里只會傳送跟fare相同rideId的ride)  TaxiFare fare = fareState.value();  
   if (fare != null) {  
	   fareState.clear();  
	   out.collect(new Tuple2(ride, fare));  
  } else { // 否則,更新rideState  
	   rideState.update(ride);  
       // 只要水印到達,我們就停止等待相應的fare  
       // Registers a timer to be fired when the event time watermark passes the given time.  
	   context.timerService().registerEventTimeTimer(ride.getEventTime());  
  }  
}

輸出結果如下,可以看到輸出的內容的時間戳都相差1000,跟之前定義的一致。

1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3
3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5
4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38

3.3 窗口

現在,我們想確定每小時獲得最多小費(tip)的駕駛員(每一條fare的record里有小費這一欄)。 最簡單的方法是分兩步:首先使用一小時長的時間窗口(time window)來計算每小時內每個駕駛員的總提示,然后從該窗口流的結果中找到每小時獲得最多小費的駕駛員。

我們在下列code中會遇到以下幾個問題:

AggregareFunction: 這個函數有一個將輸入元素加到accumulator的方法。首先,這個函數接口有一個初始化accumulator的方法,並且可以將兩個accumulators融合成一個,不僅如此還可以從accumulator中提取出output。

ProcessWindowFunction:這個函數輸入一個包含窗口的所有元素的可迭代的集合以及一個包含time和state的Context object,這些輸入能夠使他提供更加豐富的功能。

public class HourlyTipsExercise extends ExerciseBase {  
  
   public static void main(String[] args) throws Exception {  
  
   // read parameters  
   ParameterTool params = ParameterTool.fromArgs(args);  
   final String input = params.get("input", ExerciseBase.pathToFareData);  
  
   final int maxEventDelay = 60; // events are out of order by max 60 seconds  
   final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second  
  
   // set up streaming execution environment  
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
   env.setParallelism(ExerciseBase.parallelism);  
  
   // start the data generator  
   DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor)));  
  
   // compute tips per hour for each driver  
   DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares  
            // 根據driveId 進行分組  
			.keyBy((TaxiFare fare) -> fare.driverId)  
            // 設置窗口時間為1小時  
		    .timeWindow(Time.hours(1))  
            // AddTips()為aggFunction, WrapWithWindowInfo()為windowFunction  
			.aggregate(new AddTips(), new WrapWithWindowInfo());  
  
   // find the highest total tips in each hour  
   DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips  
            .timeWindowAll(Time.hours(1))  
            .maxBy(2);  
  
   printOrTest(hourlyMax);  
  
   // execute the transformation pipeline  
   env.execute("Hourly Tips (java)");  
  }  
    
 /* Adds up the tips. */  
 public static class AddTips implements AggregateFunction<  
            TaxiFare, // input type  
			Float, // accumulator type  
			Float     // output type  
			>		  
   {  
      @Override  
	  public Float createAccumulator() {  
	         return 0F;  
	  }  
  
      @Override  
	  public Float add(TaxiFare fare, Float aFloat) {  
	         return fare.tip + aFloat;  
	  }  
  
      @Override  
	  public Float getResult(Float aFloat) {  
	         return aFloat;  
	  }  
  
      @Override  
	  public Float merge(Float aFloat, Float accumulator) {  
	         return aFloat + accumulator;  
	  }  
   }  
  
   /*  
 * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. */  
 public static class WrapWithWindowInfo extends ProcessWindowFunction<  
            Float, Tuple3<Long, Long, Float>, Long, TimeWindow> {  
      @Override  
	  public void process(Long key, Context context, Iterable<Float> elements, 	Collector<Tuple3<Long, Long, Float>> out) throws Exception {  
      Float sumOfTips = elements.iterator().next();  
	  out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));  
	  }  
   }  
}

以下是輸出結果:

1> (1357002000000,2013000493,54.45)
2> (1357005600000,2013010467,64.53)
3> (1357009200000,2013010589,104.75)

4. Broadcast State

廣播變量(Broadcast State):這種機制用來支持數據從需要上游任務廣播傳送到下游任務的事件。

這篇文章對廣播變量講的很詳細:# A Practical Guide to Broadcast State in Apache Flink

在這個機制中,我們將系統分為actions stream和pattern stream。actions stream即為正常的數據流,也就是例子中 rides。pattern為我們廣播的數據流,這里可以理解為我們的監聽室需要對rides進行監聽,即我們傳輸一個pattern到broadcast state中,然后operator打印出action stream中符合這個pattern的數據。

在這里,我們的pattern是一個interger n,代表分鍾數。我們想要打印的是在我們傳送這個pattern的時刻,所有已經開始了n分鍾且還沒有結束的rides。

接下來是他的應用代碼:

首先,在這個簡單的例子中,我們需要一個廣播變量描述符,但是並不用他儲存東西。

final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(  
      "dummy",  
  BasicTypeInfo.LONG_TYPE_INFO,  
  BasicTypeInfo.LONG_TYPE_INFO  
);

然后,設置一個socket接口,用來接收pattern:

BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)  
      .assignTimestampsAndWatermarks(new QueryStreamAssigner())  
      .broadcast(dummyBroadcastState);

當我們得到按照rideId分組后的rides stream以及從socket返回的分鍾n的broadcast stream后,我們連接這兩個streams。然后將它傳送到QueryFunction()處理。QueryFunction將pattern(也就是socket返回的分鍾數n)與ride進行匹配,最后返回被匹配的rides。

DataStream<TaxiRide> reports = rides  
      .keyBy((TaxiRide ride) -> ride.taxiId)  
      .connect(queryStream)  
      .process(new QueryFunction());

public static class QueryFunction extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {  
	private ValueStateDescriptor<TaxiRide> taxiDescriptor =  new ValueStateDescriptor<>("saved ride", TaxiRide.class);  
	private ValueState<TaxiRide> taxiState;  
  
  @Override  
  public void open(Configuration config) {  
      // 得到每一個taxi的上一個事件的狀態 
	  taxiState = getRuntimeContext().getState(taxiDescriptor);  
  }  
  
  @Override  
  public void processElement(TaxiRide ride, ReadOnlyContext ctx, Collector< TaxiRide> out) throws Exception {  
     // For every taxi, let's store the most up-to-date information.  
	 // TaxiRide implements Comparable to make this easy.  TaxiRide savedRide = taxiState.value();  
	 if (ride.compareTo(savedRide) > 0) {  
         taxiState.update(ride);  
	  }  
   }  
  
  @Override  
  public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) throws Exception {  
      DateTimeFormatter timeFormatter =  
            DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();  
  
	  Long thresholdInMinutes = Long.valueOf(msg);  
	  Long wm = ctx.currentWatermark();  
	  System.out.println("QUERY: " + thresholdInMinutes + " minutes at " + timeFormatter.print(wm));  
  
	  // Collect to the output all ongoing rides that started at least thresholdInMinutes ago.  
	  ctx.applyToKeyedState(taxiDescriptor, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {  
         @Override  
		 public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {  
	         TaxiRide ride = taxiState.value();  
			 if (ride.isStart) {  
	             long minutes = (wm - ride.getEventTime()) / 60000;  
				 if (ride.isStart && (minutes >= thresholdInMinutes)) {  
	                 out.collect(ride);  
				  }  
	         }  
         }  
      });  
   }  
}

Reference:

  1. data Artisans
  2. 《Flink基礎教程》
  3. 《Learning Apache Flink》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM