100-flink 報錯排查小本子


1、關鍵字:InvalidTypesException,'Collector' are missing , hints by using the returns(...) method

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. 
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
Otherwise the type has to be specified explicitly using type information

 報這個錯是因為泛型的類型擦除,根據你寫的lambda表達式提供的類型信息不足以讓java自動去返回你要的類型,可以用return()函數來解決

報錯的代碼如下:

 DataStream<Tuple2<String, Integer>> wordCounts = text
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
                    for (String word : value.split("\\s")) {
                        out.collect(Tuple2.of(word, 1));
                    }
                });

修改:使用returns 函數(不好用),或者不要使用lambda表達式,匿名內部類也很香,讀起來不費勁

SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo)

修改后:

DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String in, Collector out) throws Exception {
                for (String word : in.split("\\s+")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

 

2、關鍵字 no timestamp marker,ProcessingTime,DataStream.assignTimestampsAndWatermarks(...)

java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker).
Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

這個是說提供的source數據里沒有timestamp字段,但是卻用的eventTime作為時間處理,需要改為processingTime或指定timestampAndWatermarks

報錯代碼:

DataStream text = env.socketTextStream("10.192.78.17", 9000, "\n");
        //首先將字符串數據解析成單詞和次數(使用元組類型Tuple2<String, Integer>表示),第一個字段是單詞,第二個字段是次數,次數初始值都設置成1
        DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String in, Collector out) throws Exception {
                for (String word : in.split("\\s")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }).keyBy((KeySelector<Tuple2<String, Integer>, String>) tuple2 -> tuple2.f0)
          .window(TumblingEventTimeWindows.of(Time.seconds(5)))
          .sum(1);

解決:將TumblingEventTimeWindows改為TumblingProcessingTimeWindows(具體還是看業務)

3、關鍵字 Cannot resolve method 'aggregate(CountAgg, WindowResultFunction)'

報錯代碼:.aggregate函數位置

DataStream<ItemViewCount> windowedData = pvData
                .keyBy(new KeySelector<UserBehavior, Long>() {
                    @Override
                    public Long getKey(UserBehavior userBehavior) throws Exception {
                        return userBehavior.getItemId();
                    }
                })
                .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(5)))
                .aggregate(new CountAgg(), new WindowResultFunction());
public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}

@Override
public Long getResult(Long acc) {
return acc;
}

@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}

public class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple1<Long>, TimeWindow> {

/**
* @param key 窗口的主鍵,即 itemId
* @param window 窗口
* @param aggregateResult 聚合函數的結果,即 count 值
* @param collector 輸出類型為 ItemViewCount
* @throws Exception
*/
@Override
public void apply(Tuple1<Long> key, TimeWindow window, Iterable<Long> aggregateResult, Collector<ItemViewCount> collector) throws Exception {
Long itemId = key.f0;
Long count = aggregateResult.iterator().next();
collector.collect(new ItemViewCount(itemId, window.getEnd(), count));
}

}

 這個報錯提示很無語,lambda表達式有時候看不出是啥問題,最后查到問題是類型指定不對,WindowResultFunction的第三個類型Tuple1<Long>改為Long即可

public class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Long, TimeWindow> {

    /**
     * @param key             窗口的主鍵,即 itemId
     * @param window          窗口
     * @param aggregateResult 聚合函數的結果,即 count 值
     * @param collector       輸出類型為 ItemViewCount
     * @throws Exception
     */
    @Override
    public void apply(Long key, TimeWindow window, Iterable<Long> aggregateResult, Collector<ItemViewCount> collector) throws Exception {
        Long itemId = key;
        Long count = aggregateResult.iterator().next();
        collector.collect(new ItemViewCount(itemId, window.getEnd(), count));
    }

}

 4、關鍵字 TypeExtractor TimestampedFileInputSplit  POJO fields

20:48:25,426 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
20:48:25,431 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

這個不是error,參考stackoverflow的一個回復:

The logs that you share do not show an error. The logs are on INFO level and no exception is thrown (at least not in the provided logs).

The log entry just says that the class TimestampedFileInputSplit cannot be treated as a POJO. In general this message indicates that the performance is not optimal but in this particular case it is not a problem.

 5、關鍵字:和第三個差不多,泛型問題引起

process(new TopNHotItems(3));  // 求點擊量前3名的商品

紅色波浪線提示can't resolve method process(xxx),TopNHotItems類源碼如下:

public class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {

    private final int topSize;

    public TopNHotItems(int topSize) {
        this.topSize = topSize;
    }

    /**
     * 用於存儲商品與點擊數的狀態,待收齊同一個窗口的數據后,再觸發 TopN 計算
     */
    private ListState<ItemViewCount> itemState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 狀態的注冊
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void processElement(
            ItemViewCount input,
            Context context,
            Collector<String> collector) throws Exception {

        // 每條數據都保存到狀態中
        itemState.add(input);
        // 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
        context.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }

    @Override
    public void onTimer(
            long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 獲取收到的所有商品點擊量
        List<ItemViewCount> allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        // 提前清除狀態中的數據,釋放空間
        itemState.clear();
        // 按照點擊量從大到小排序
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) (o2.viewCount - o1.viewCount);
            }
        });
        // 將排名信息格式化成 String, 便於打印
        StringBuilder result = new StringBuilder();
        result.append("====================================\n");
        result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n");
        for (int i = 0; i < topSize; i++) {
            ItemViewCount currentItem = allItems.get(i);
            // No1:  商品ID=12224  瀏覽量=2413
            result.append("No").append(i).append(":")
                    .append("  商品ID=").append(currentItem.itemId)
                    .append("  瀏覽量=").append(currentItem.viewCount)
                    .append("\n");
        }
        result.append("====================================\n\n");

        out.collect(result.toString());
    }
}

解決方法:KeyedProcessFunction<Tuple, ItemViewCount, String>第一個泛型需要改為Long類型


6、關鍵字  kafka offset checkpoint

16:49:45,148 WARN  org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - 
Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available.
This does not compromise Flink's checkpoint integrity.

//todo  待解決,應該是kafka參數配置問題

7、關鍵字:json classPath

Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath
原因:缺少flink配套的json包,加上這個jar
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

 8、關鍵字:InvalidFieldReferenceException   field expression  GenericType

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
Cannot reference field by field expression on GenericType<com.yb.api.beans.User>Field expressions are only supported on
POJO types,tuples, and case classes

代碼片段:

//用KeySelector分組
KeyedStream<User, String> keyedStream = dataStream.keyBy(new KeySelector<User, String>() {
       @Override
       public String getKey(User user) throws Exception {
            return user.getId();
       }
    });
//滾動聚合函數max,這個函數輸出時,max的字段會跟着變,其他的字段值永遠是第一條記錄的值,不會隨着該字段變化
//可以使用maxBy,這樣整條記錄的輸出是maxBy字段所在的那條記錄
SingleOutputStreamOperator<User> salary=
keyedStream.max("salary");
salary.print();
env.execute("maxSalary");

@Data
@AllArgsConstructor
public class User{

private String id;
private String company;
private Double salary;

}

原因:我這個類屬於上面提示的POJO類,POJO類需要無參構造方法,我用的lombok里的@AllargsConstructor,這是有參構造器,默認不會再自動

加上無參構造器

標准的POJO類的要求:

1. 所有成員變量都是私有的,用private修飾

2. 每個成員變量都有對應的getter和setter

3. 有一個無參的構造方法

解決:加上@NoArgsConstructor

9、關鍵字:savepoint

"org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided

這是我在使用rest API調用stop job接口時提示的報錯信息,意思是沒有配置state.savepoint.dir的值

因為我是手動觸發job停止,手動停止則會觸發savepoint生成,來保存停止那一刻的快照信息,方便后續重啟時使用

解決:

修改flink-conf.yaml,配置state.savepoints.dir: file:///opt/flink-1.14.0/flink-savepoints  或(hdfs://nodeHost:port/flink-savepoints)

主意一定要file://   或 hdfs://  開頭

10、Flink per-job模式,在checkpoint 到rocksdb 時候 報錯 Too many open files   

分析:打開的文件數過多,linux限制最大是65536。RocksDB 沒有設置,默認允許打開5000,RocksDB打開的目錄超過5000,所以報上面的錯誤

在flink-conf.yam加配置state.backend.rocksdb.files.open: 10000


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM