flink讀寫mysql總是出問題,記錄一下
先是pom文件:
1 <properties>
2 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3 <maven.compiler.source>1.7</maven.compiler.source>
4 <maven.compiler.target>1.7</maven.compiler.target>
5 <java.version>1.8</java.version>
6 <flink.version>1.7.2</flink.version>
7 <scala.version>2.11.12</scala.version>
8 <scala.binary.version>2.11</scala.binary.version>
9
10 </properties>
11
12 <dependencies>
13 <dependency>
14 <groupId>org.apache.flink</groupId>
15 <artifactId>flink-core</artifactId>
16 <version>1.7.2</version>
17 </dependency>
18 <dependency>
19 <groupId>org.apache.flink</groupId>
20 <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
21 <version>${flink.version}</version>
22 </dependency>
23
24 <dependency>
25 <groupId>org.apache.flink</groupId>
26 <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
27 <version>${flink.version}</version>
28 </dependency>
29 <dependency>
30 <groupId>org.apache.flink</groupId>
31 <artifactId>flink-table_2.11</artifactId>
32 <version>${flink.version}</version>
33 </dependency>
34 <dependency>
35 <groupId>mysql</groupId>
36 <artifactId>mysql-connector-java</artifactId>
37 <version>5.1.45</version>
38 </dependency>
39
40
41 <dependency>
42 <groupId>junit</groupId>
43 <artifactId>junit</artifactId>
44 <version>4.11</version>
45 <scope>test</scope>
46 </dependency>
47 </dependencies>
1 import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 2 import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink; 3 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; 4 import org.apache.flink.api.java.typeutils.RowTypeInfo; 5 import org.apache.flink.streaming.api.datastream.DataStreamSource; 6 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 7 import org.apache.flink.types.Row; 8
9 public class App { 10 public static void main( String[] args ) throws Exception { 11 StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 12
13 DataStreamSource<Row> rowDataStreamSource = streamExecutionEnvironment.createInput(JDBCInputFormat.buildJDBCInputFormat() 14 .setDBUrl("jdbc:mysql://10.33.79.158:3306/promot") 15 .setDrivername("com.mysql.jdbc.Driver") 16 .setUsername("agap") 17 .setPassword("appgallery") 18 .setQuery("select pt_d,a from zz_part") 19 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) 20 .finish()); 21
22 JDBCAppendTableSink jdbcSink = JDBCAppendTableSink.builder() 23 .setDBUrl("jdbc:mysql://10.33.79.158:3306/promot") 24 .setDrivername("com.mysql.jdbc.Driver") 25 .setUsername("agap") 26 .setPassword("appgallery") 27 .setParameterTypes(BasicTypeInfo.STRING_TYPE_INFO, 28 BasicTypeInfo.INT_TYPE_INFO) 29 .setQuery("insert into zz_part(pt_d,a) values(?,?)") 30 .build(); 31
32 jdbcSink.emitDataStream(rowDataStreamSource); 33
34 streamExecutionEnvironment.execute(); 35 } 36 }