flink 讀寫mysql


 flink讀寫mysql總是出問題,記錄一下

 先是pom文件:

 1   <properties>
 2     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 3     <maven.compiler.source>1.7</maven.compiler.source>
 4     <maven.compiler.target>1.7</maven.compiler.target>
 5     <java.version>1.8</java.version>
 6     <flink.version>1.7.2</flink.version>
 7     <scala.version>2.11.12</scala.version>
 8     <scala.binary.version>2.11</scala.binary.version>
 9 
10   </properties>
11 
12   <dependencies>
13     <dependency>
14       <groupId>org.apache.flink</groupId>
15       <artifactId>flink-core</artifactId>
16       <version>1.7.2</version>
17     </dependency>
18     <dependency>
19       <groupId>org.apache.flink</groupId>
20       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
21       <version>${flink.version}</version>
22     </dependency>
23 
24       <dependency>
25           <groupId>org.apache.flink</groupId>
26           <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
27           <version>${flink.version}</version>
28       </dependency>
29       <dependency>
30           <groupId>org.apache.flink</groupId>
31           <artifactId>flink-table_2.11</artifactId>
32           <version>${flink.version}</version>
33       </dependency>
34       <dependency>
35           <groupId>mysql</groupId>
36           <artifactId>mysql-connector-java</artifactId>
37           <version>5.1.45</version>
38       </dependency>
39 
40 
41     <dependency>
42       <groupId>junit</groupId>
43       <artifactId>junit</artifactId>
44       <version>4.11</version>
45       <scope>test</scope>
46     </dependency>
47   </dependencies>

 


 1 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;  2 import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;  3 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;  4 import org.apache.flink.api.java.typeutils.RowTypeInfo;  5 import org.apache.flink.streaming.api.datastream.DataStreamSource;  6 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  7 import org.apache.flink.types.Row;  8 
 9 public class App { 10     public static void main( String[] args ) throws Exception { 11         StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 12 
13         DataStreamSource<Row> rowDataStreamSource = streamExecutionEnvironment.createInput(JDBCInputFormat.buildJDBCInputFormat() 14                 .setDBUrl("jdbc:mysql://10.33.79.158:3306/promot") 15                 .setDrivername("com.mysql.jdbc.Driver") 16                 .setUsername("agap") 17                 .setPassword("appgallery") 18                 .setQuery("select pt_d,a from zz_part") 19                 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) 20  .finish()); 21 
22         JDBCAppendTableSink jdbcSink = JDBCAppendTableSink.builder() 23                 .setDBUrl("jdbc:mysql://10.33.79.158:3306/promot") 24                 .setDrivername("com.mysql.jdbc.Driver") 25                 .setUsername("agap") 26                 .setPassword("appgallery") 27  .setParameterTypes(BasicTypeInfo.STRING_TYPE_INFO, 28  BasicTypeInfo.INT_TYPE_INFO) 29                 .setQuery("insert into zz_part(pt_d,a) values(?,?)") 30  .build(); 31 
32  jdbcSink.emitDataStream(rowDataStreamSource); 33 
34  streamExecutionEnvironment.execute(); 35  } 36 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM