報錯:
Caused by: org.apache.flink.table.api.TableException: Only the first field can reference an atomic type. at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1117) at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1112) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:1112) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:546) at org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133) at net.qutoutiao.Bootstrap.main(Bootstrap.java:92) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) ... 13 invokeInteractiveModeForExecution
因為我的是DataSource轉Table,使用的是Tuple,最后發現
tuple要使用java包里面的(scala import org.apache.flink.api.java.tuple._),而不是scala自帶的tuple,不然會認為是geneic類型,導致報錯。
此外如果是pojo類貌似必須攜帶 public pojo(){ }無參構造器,否則也會報錯。
具體源碼可以查看方法 env.fromCollection(),
這里面會維護一個TypeInformation<OUT>類型用於后續類解析。
flink 大神鏈接:http://ju.outofmemory.cn/feed/1264/?page=1
阿里鏈接:http://jm.taobao.org/