轉發請注明原創地址:http://www.cnblogs.com/dongxiao-yang/p/7652337.html
1 WindowFunction類型不匹配無法編譯。
flink 版本:1.3.0
參考https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction-with-incremental-aggregation寫的demo發現reduce加入MyWindowFunction后編譯不通過,報錯參數類型不匹配。
代碼如下

MyReduceFunction a = new MyReduceFunction(); DataStream<Tuple3<String, String, Integer>> counts4 = source .keyBy(0) .window(TumblingEventTimeWindows.of(Time .of(1, TimeUnit.SECONDS))) .reduce(a, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() { private static final long serialVersionUID = 1L; @Override public void apply( String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, String, Integer>> out) throws Exception { for (Tuple2<String, Integer> in : values) { out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); } } }); public static class MyReduceFunction implements ReduceFunction<Tuple2<String, Integer>> { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(value1.f0, (value1.f1 + value2.f1)); } }
上述代碼在reduce函數加入WindowFunction后代碼一直報錯,顯示reduce函數包含的參數類型不匹配。其實原因出在keyBy(0)這個用法上,DataStream在調用public KeyedStream<T, Tuple> keyBy(int... fields) 和public KeyedStream<T, Tuple> keyBy(String... fields) 這兩個方法的時候會調用
private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
其中,KeySelectorUtil.getSelectorForKeys返回的是一個ComparableKeySelector類型的KeySelector,而這個類的定義為
public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple>
根據KeySelector的定義可知,ComparableKeySelector輸出的所有key類型都為Tuple,所以上述WindowFunction設置的第三個泛型參數String是不對的。
解決辦法
1 自定義KeySelector

private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } } DataStream<Tuple3<String, String, Integer>> counts4 = source .keyBy(new TupleKeySelector())
上面首先定義了一個TupleKeySelector,返回Key類型為String,然后keyby的參數設置為對應的new TupleKeySelector(),表示keyStream根據一個String類型的Key分區
2 WindowFunction第三個Key泛型由String改為Tuple
2 flink-connector-elasticsearch5 接入問題
flink 版本:1.3.2
問題1:
java.lang.UnsupportedClassVersionError: org/elasticsearch/action/bulk/BulkProcessor$Listener : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
解決方案:這個問題是由於es5以后的版本要求最低的jdk版本是1.8,如果job的默認運行環境jdk低於1.8的話,需要手動指定java版本。
問題1.1:
設置env.java.home: /opt/jdk1.8.0_121 后上述錯誤依舊存在
目前看來env.java.home這個環境變量在使用yarn-cluster模式提交的情況下對container的JAVA_HOME是不起作用的
解決方案 :提交作業腳本增加-yD yarn.taskmanager.env.JAVA_HOME=/opt/jdk1.8.0_121 參數
參考文檔為:https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#full-reference ,就是說這個參數在1.1的文檔里提過,到了1.3反而沒有了。。
問題2 :WebUI 500錯誤,yarn 日志報錯
ERROR org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler - Caught exception
java.lang.AbstractMethodError
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
任務能夠正常運行,但是點擊ApplicationMaster進入webui的時候報500錯誤打不開,yarn錯誤日志如上。
這個問題是flink-connector-elasticsearch5包依賴了一個netty-commom的jar,這個包里面有一個io.netty.util.ReferenceCountUtil類和flink自帶的netty-all同名類產生了沖突。
解決方案:netty-commom無法exclude,不然es運行時會缺其他的類,暫時的解決方案是把flink-streaming-java_2.10的<scope>provide</scope>給關掉,把flink原有的ReferenceCountUtil打進fat jar里面,覆蓋掉上述錯誤的class。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.10</artifactId> <version>${flink.version}</version> </dependency
3Flink 1.4.1支持yarn node-lable特性
1 手動合並https://github.com/apache/flink/pull/5593這個pr
2 pom文件里的hadoop.version升級到支持node-lable的yarn版本 <hadoop.version>2.8.2</hadoop.version>
4 Caused by: java.lang.ClassNotFoundException: javax.ws.rs.ext.MessageBodyReader
flink 版本 1.5.0
java.lang.NoClassDefFoundError: javax/ws/rs/ext/MessageBodyReader
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.hadoop.yarn.util.timeline.TimelineUtils.<clinit>(TimelineUtils.java:50)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:179)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:966)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:269)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:444)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:92)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:221)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at org.apache.flink.client.cli.CliFrontend.lambda$main$14(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.ext.MessageBodyReader
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 52 more
打包啟動后發現上述錯誤,解決辦法是把flink-shade-hadoop2 pom文件里hadoop-yarn-common 這個依賴對於jersey-core的exclusion取消
<!--<exclusion>-->
<!--<groupId>com.sun.jersey</groupId>-->
<!--<artifactId>jersey-core</artifactId>-->
<!--</exclusion>-->
5 could not find implicit value for evidence parameter
參考解釋:https://www.iteblog.com/archives/2047.html
解決辦法: import org.apache.flink.streaming.api.scala._