flink寫入mysql的兩種方式


方式一 通過JDBCOutputFormat

在flink中沒有現成的用來寫入MySQL的sink,但是flink提供了一個類,JDBCOutputFormat,通過這個類,如果你提供了jdbc的driver,則可以當做sink使用。

JDBCOutputFormat其實是flink的batch api,但也可以用來作為stream的api使用,社區也推薦通過這種方式來進行。

JDBCOutputFormat用起來很簡單,只需要一個prepared statement,driver和database connection,就可以開始使用了。

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .finish();

如下的sql語句可以作為prepared statement:

String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";

對應的表的結構:

1 CREATE TABLE cases
2 (
3  caseid VARCHAR(255),
4  tracehash VARCHAR(255)
5 );

但有一點要明確,JDBCOutputFormat只能處理Row,而Row是對prepared statement的參數的一個包裝類。這意味着我們需要將流中的case轉換為row,通過map就能做的。

1 DataStream<Case> cases = ...
2 
3   DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
4    Row row = new Row(2); // our prepared statement has 2 parameters
5    row.setField(0, aCase.getId()); //first parameter is case ID
6    row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
7    return row;
8   });

這樣,我們就能添加sink了:

1 rows.writeUsingOutputFormat(jdbcOutput);

這樣,你就可以將數據寫入mysql了。

但是在你在流上附加了窗口之后,可能會得到下面的報錯:

1 "Unknown column type for column %s. Best effort approach to set its value: %s."

因為窗口處理的類型,沒有明確的類型定義,如下修改之前的定義,顯式的指定類型:

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
6  .finish();

JDBCOutputFormat has a batchInterval, which you can specify on the JDBCOutputFormatBuilder. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.

JDBCOutputFormat 還有一個很有用的參數,batchInterval,見名知意,就是多少數據提交一次,盡量高效率的向數據庫提交數據。當然還有比如timeout等其他參數,可以探索。

方式二 通過自定義sink提交

我們通過繼承RichSinkFunction<IN>來實現自定義sink:

 1 public class RichCaseSink extends RichSinkFunction<Case> {
 2 
 3   private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
 4       + "VALUES (?, ?) "
 5       + "ON CONFLICT (caseid) DO UPDATE SET "
 6       + "  tracehash=?";
 7 
 8   private PreparedStatement statement;
 9 
10 
11   @Override
12   public void invoke(Case aCase) throws Exception {
13 
14     statement.setString(1, aCase.getId());
15     statement.setString(2, aCase.getTraceHash());
16     statement.setString(3, aCase.getTraceHash());
17     statement.addBatch();
18     statement.executeBatch();
19   }
20 
21   @Override
22   public void open(Configuration parameters) throws Exception {
23     Class.forName("com.mysql.jdbc.Driver");
24     Connection connection =
25         DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio");
26 
27     statement = connection.prepareStatement(UPSERT_CASE);
28   }
29 
30 }

這樣,就可以在流上添加sink 了:

1 DataStream<Case> cases = ...
2 cases.addSink(new RichCaseSink());

當然,上面的實現很簡略,沒有給出批量提交或者超時提交,這個都可以很容易的添加,比如close()中關閉連接。

但是上面的實現中,最大的問題還是沒有跟flink的狀態管理相結合,這個才是重頭戲。

方式二 加強版的自定義sink

在checkpoint的時候保存數據,繼承接口CheckpointedFunction :

 1 @Override
 2 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 3   long checkpointId = context.getCheckpointId();
 4   List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId);
 5   if(cases == null){
 6     cases = new ArrayList<>();
 7     pendingCasesPerCheckpoint.put(checkpointId, cases);
 8   }
 9   cases.addAll(pendingCases);
10   pendingCases.clear();
11 }

在消息到達的時候不插入數據,只是留存數據:

1 @Override
2 public void invoke(Case aCase) throws Exception {
3   pendingCases.add(aCase);
4 }

這樣,通過繼承CheckpointListener,我們就能在某個checkpoint完成的時候插入數據:

 1 @Override
 2 public void notifyCheckpointComplete(long checkpointId) throws Exception {
 3 
 4  Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt =
 5    pendingCasesPerCheckpoint.entrySet().iterator();
 6 
 7  while (pendingCheckpointsIt.hasNext()) {
 8 
 9   Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next();
10   Long pastCheckpointId = entry.getKey();
11   List<Case> pendingCases = entry.getValue();
12 
13   if (pastCheckpointId <= checkpointId) {
14 
15    for (Case pendingCase : pendingCases) {
16     statement.setString(1, pendingCase.getId());
17     statement.setString(2, pendingCase.getTraceHash());
18     statement.setString(3, pendingCase.getTraceHash());
19     statement.addBatch();
20    }
21    pendingCheckpointsIt.remove();
22   }
23  }
24  statement.executeBatch();
25 
26 }

前提,是需要設置checkpoint,比如:

ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);

這樣,每隔10s,當一個checkpoint做成功,就會插入一次數據。

當然,上面的代碼驗證可用,但不建議在生產環境使用,生產環境需要考慮更多的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM