flink按事件时间排序


1. 事件时间提取器

class CustomerStatusChangedWatermark extends AscendingTimestampExtractor<CustomerStatusChangedEvent> {
        @Override
        public long extractAscendingTimestamp(CustomerStatusChangedEvent customerStatusChangedEvent) {
            return customerStatusChangedEvent.getEventTime();
        }
    }

 

2. 测试

public class WatermarkTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<CustomerStatusChangedEvent> stream = env.fromElements(
                new CustomerStatusChangedEvent(1001L, 1, 2,
                        DateUtils.toTimeStamp(LocalDateTime.now())),
                new CustomerStatusChangedEvent(1011L, 1, 2,
                        DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(20))),
                new CustomerStatusChangedEvent(1021L, 1, 2,
                        DateUtils.toTimeStamp(LocalDateTime.now().minusSeconds(30))),
                new CustomerStatusChangedEvent(1031L, 1, 2,
                        DateUtils.toTimeStamp(LocalDateTime.now().plusSeconds(20)))
        );

        DataStream<Long> watermarkStream = stream
                .assignTimestampsAndWatermarks(new CustomerStatusChangedWatermark())
                .map(new MapFunction<CustomerStatusChangedEvent, Long>() {
                    @Override
                    public Long map(CustomerStatusChangedEvent p) throws Exception {
                        return p.getCustomerId();
                    }
                });


        watermarkStream.print();
        try {
            env.execute("按事件时间排序");
        } catch (Exception ex) {

        }
    }
}

 

3.输出

4> 1001
3> 1031
1> 1011
2> 1021


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM