開源實現:https://github.com/keedio/flume-ng-sql-source
這里記錄的是自己手動實現。
測試中要讀取的表
CREATE TABLE `student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

記錄表(必須),告訴 Flume 每次從哪開始讀取
CREATE TABLE `flume_meta` ( `source_tab` varchar(255) COLLATE utf8_bin NOT NULL, `current_index` bigint(255) DEFAULT NULL, PRIMARY KEY (`source_tab`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

一、編寫自定義 Source
1.添加 pom 依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com</groupId> <artifactId>flume</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
2.編寫類
MySQLSourceHelper,JDBC 工具類,主要是讀取數據表和更新讀取記錄
package source; import org.apache.flume.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.sql.*; import java.util.ArrayList; import java.util.List; public class MySQLSourceHelper { private static final Logger LOG = LoggerFactory.getLogger(MySQLSourceHelper.class); // 開始 id private String startFrom; private static final String DEFAULT_START_VALUE = "0"; // 表名 private String table; // 用戶傳入的查詢的列 private String columnsToSelect; private static final String DEFAULT_Columns_To_Select = "*"; private static String dbUrl, dbUser, dbPassword, dbDriver; private static Connection conn = null; private static PreparedStatement ps = null; // 獲取 JDBC 連接 private static Connection getConnection() { try { Class.forName(dbDriver); return DriverManager.getConnection(dbUrl, dbUser, dbPassword); } catch (SQLException | ClassNotFoundException e) { e.printStackTrace(); } return null; } // 構造方法 MySQLSourceHelper(Context context) { // 有默認值參數:獲取 flume 任務配置文件中的參數,讀不到的采用默認值 this.startFrom = context.getString("start.from", DEFAULT_START_VALUE); this.columnsToSelect = context.getString("columns.to.select", DEFAULT_Columns_To_Select); // 無默認值參數:獲取 flume 任務配置文件中的參數 this.table = context.getString("table"); dbUrl = context.getString("db.url"); dbUser = context.getString("db.user"); dbPassword = context.getString("db.password"); dbDriver = context.getString("db.driver"); conn = getConnection(); } // 構建 sql 語句,以 id 作為 offset private String buildQuery() { StringBuilder execSql = new StringBuilder("select " + columnsToSelect + " from " + table); return execSql.append(" where id ").append("> ").append(getStatusDBIndex(startFrom)).toString(); } // 執行查詢 List<List<Object>> executeQuery() { try { // 每次執行查詢時都要重新生成 sql,因為 id 不同 String customQuery = buildQuery(); // 存放結果的集合 List<List<Object>> results = new ArrayList<>(); ps = conn.prepareStatement(customQuery); ResultSet result = ps.executeQuery(customQuery); while (result.next()) { // 存放一條數據的集合(多個列) List<Object> row = new ArrayList<>(); // 將返回結果放入集合 for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) { row.add(result.getObject(i)); } results.add(row); } LOG.info("execSql:" + customQuery + "\tresultSize:" + results.size()); return results; } catch (SQLException e) { LOG.error(e.toString()); // 重新連接 conn = getConnection(); } return null; } // 將結果集轉化為字符串,每一條數據是一個 list 集合,將每一個小的 list 集合轉化為字符串 List<String> getAllRows(List<List<Object>> queryResult) { List<String> allRows = new ArrayList<>(); StringBuilder row = new StringBuilder(); for (List<Object> rawRow : queryResult) { for (Object aRawRow : rawRow) { if (aRawRow == null) { row.append(","); } else { row.append(aRawRow.toString()).append(","); } } allRows.add(row.toString()); row = new StringBuilder(); } return allRows; } // 更新 offset 元數據狀態,每次返回結果集后調用。必須記錄每次查詢的 offset 值,為程序中斷續跑數據時使用,以 id 為 offset void updateOffset2DB(BigInteger size) { try { // 以 source_tab 做為 KEY,如果不存在則插入,存在則更新(每個源表對應一條記錄) String sql = "insert into flume_meta VALUES('" + table + "','" + size + "') on DUPLICATE key update current_index='" + size + "'"; LOG.info("updateStatus Sql:" + sql); ps = conn.prepareStatement(sql); ps.execute(); } catch (SQLException e) { e.printStackTrace(); } } // 從 flume_meta 表中查詢出當前的 id 是多少 private BigInteger getStatusDBIndex(String startFrom) { BigInteger dbIndex = new BigInteger(startFrom); try { ps = conn.prepareStatement("select current_index from flume_meta where source_tab='" + table + "'"); ResultSet result = ps.executeQuery(); if (result.next()) { String id = result.getString(1); if (id != null) { dbIndex = new BigInteger(id); } } } catch (SQLException e) { e.printStackTrace(); } // 如果沒有數據,則說明是第一次查詢或者數據表中還沒有存入數據,返回最初傳入的值 return dbIndex; } // 關閉相關資源 void close() { try { ps.close(); conn.close(); } catch (SQLException e) { e.printStackTrace(); } } public String getTable() { return table; } }
MySQLSource,自定義 Source 類
package source; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class MySQLSource extends AbstractSource implements Configurable, PollableSource { // 打印日志 private static final Logger LOG = LoggerFactory.getLogger(MySQLSource.class); // sqlHelper private MySQLSourceHelper sqlSourceHelper; // 兩次查詢的時間間隔 private int queryDelay; private static final int DEFAULT_QUERY_DELAY = 10000; @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } @Override public void configure(Context context) { // 初始化 sqlSourceHelper = new MySQLSourceHelper(context); queryDelay = context.getInteger("query.delay", DEFAULT_QUERY_DELAY); } @Override public Status process() throws EventDeliveryException { try { // 存放 event 的集合 List<Event> events = new ArrayList<>(); // 存放 event 頭集合 HashMap<String, String> header = new HashMap<>(); header.put("table", sqlSourceHelper.getTable()); // 查詢數據表 List<List<Object>> result = sqlSourceHelper.executeQuery(); // 如果有返回數據,則將數據封裝為 event if (!result.isEmpty()) { List<String> allRows = sqlSourceHelper.getAllRows(result); Event event = null; for (String row : allRows) { event = new SimpleEvent(); event.setHeaders(header); event.setBody(row.getBytes()); events.add(event); } // 將 event 寫入 channel getChannelProcessor().processEventBatch(events); // 更新數據表中的 offset 信息,取最后一條數據的第一列(id 列) sqlSourceHelper.updateOffset2DB(new BigInteger(result.get(result.size()-1).get(0).toString())); } // 等待時長 Thread.sleep(queryDelay); return Status.READY; } catch (InterruptedException e) { LOG.error("Error procesing row", e); return Status.BACKOFF; } } @Override public synchronized void stop() { LOG.info("Stopping sql source {} ...", getName()); try { sqlSourceHelper.close(); } finally { super.stop(); } } }
二、打包測試
1.打包上傳
記得把 pom 依賴中的 MySQL 的 jar 包也傳上去。
參考:https://www.cnblogs.com/jhxxb/p/11582804.html
2.編寫 flume 配置文件
mysql.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = source.MySQLSource a1.sources.r1.db.driver = com.mysql.jdbc.Driver a1.sources.r1.db.url = jdbc:mysql://192.168.8.136:3306/rbac0 a1.sources.r1.db.user = root a1.sources.r1.db.password = root a1.sources.r1.table = student # a1.sources.r1.columns.to.select = * # a1.sources.r1.start.from = 0 # Describe the sink a1.sinks.k1.type = logger # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動
cd /opt/apache-flume-1.9.0-bin bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/mysql.conf -Dflume.root.logger=INFO,console
向監控表插入數據
INSERT student VALUES(NULL,'zhangsan',18);


Flume 的控制台日志

