Source 是負責接收數據到 Flume Agent 的組件。
Source 組件可以處理各種類型、各種格式的日志數據,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
官方提供的 source 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 source。
官方也提供了自定義 source 的接口:https://flume.apache.org/FlumeDeveloperGuide.html#source
根據官方說明自定義 Source 需要繼承 AbstractSource 類並實現 Configurable 和 PollableSource 接口。
實現相應方法:
getBackOffSleepIncrement(); getMaxBackOffSleepInterval(); // 初始化 context(讀取配置文件內容) configure(Context context); // 獲取數據封裝成 event 並寫入 channel,這個方法將被循環調用 process();
使用場景:讀取 MySQL 數據或者其他文件系統。
這里使用 flume 接收數據,並給每條數據添加前綴,輸出到控制台。前綴可從 flume 配置文件中配置。
一、創建自定義 Source
1.添加 pom 依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com</groupId> <artifactId>flume</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
2.編寫自定義的 Source 類
package source; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { // 定義配置文件將來要讀取的字段 private Long delay; private String field; // 初始化配置信息 @Override public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field", "Hello!"); } @Override public Status process() throws EventDeliveryException { try { // 創建事件頭信息 HashMap<String, String> hearderMap = new HashMap<>(); // 創建事件 SimpleEvent event = new SimpleEvent(); // 循環封裝事件 for (int i = 0; i < 5; i++) { // 給事件設置頭信息 event.setHeaders(hearderMap); // 給事件設置內容 event.setBody((field + i).getBytes()); // 將事件寫入 channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
二、打包測試
1.打包上傳
參考:https://www.cnblogs.com/jhxxb/p/11582804.html
2.編寫 flume 配置文件
mysource.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = source.MySource # 代碼中要獲取的配置信息 a1.sources.r1.delay = 1000 # a1.sources.r1.field = jhxxb # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動
cd /opt/apache-flume-1.9.0-bin bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/source/mysource.conf -Dflume.root.logger=INFO,console