17-Flink消費Kafka寫入Mysql


戳更多文章:

1-Flink入門

2-本地環境搭建&構建第一個Flink應用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式緩存

7-重啟策略

8-Flink中的窗口

9-Flink中的Time

Flink時間戳和水印

Broadcast廣播變量

FlinkTable&SQL

Flink實戰項目實時熱銷排行

Flink寫入RedisSink

17-Flink消費Kafka寫入Mysql

本文介紹消費Kafka的消息實時寫入Mysql。

  1. maven新增依賴:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> 

2.重寫RichSinkFunction,實現一個Mysql Sink

public class MysqlSink extends RichSinkFunction<Tuple3<Integer, String, Integer>> { private Connection connection; private PreparedStatement preparedStatement; String username = ""; String password = ""; String drivername = ""; //配置改成自己的配置 String dburl = ""; @Override public void invoke(Tuple3<Integer, String, Integer> value) throws Exception { Class.forName(drivername); connection = DriverManager.getConnection(dburl, username, password); String sql = "replace into table(id,num,price) values(?,?,?)"; //假設mysql 有3列 id,num,price preparedStatement = connection.prepareStatement(sql); preparedStatement.setInt(1, value.f0); preparedStatement.setString(2, value.f1); preparedStatement.setInt(3, value.f2); preparedStatement.executeUpdate(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } } 
  1. Flink主類
public class MysqlSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // 1,abc,100 類似這樣的數據,當然也可以是很復雜的json數據,去做解析 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); env.getConfig().disableSysoutLogging(); //設置此可以屏蔽掉日記打印情況 env.getConfig().setRestartStrategy( RestartStrategies.fixedDelayRestart(5, 5000)); env.enableCheckpointing(2000); DataStream<String> stream = env .addSource(consumer); DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value)) .map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> { String[] args1 = value.split(","); return new Tuple3<Integer, String, Integer>(Integer .valueOf(args1[0]), args1[1],Integer .valueOf(args1[2])); }); sourceStream.addSink(new MysqlSink()); env.execute("data to mysql start"); } } 

所有代碼,我放在了我的公眾號,回復Flink可以下載

  • 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
  • 更多大數據技術歡迎和作者一起探討~
 
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM