自定義source類,並將相關工程打包放在flume的lib目錄下
public class MySource extends AbstractSource implements Configurable, PollableSource { //全局變量,僅做演示,無實際意義 private String prefix; private String suffix; @Override public void configure(Context context) { prefix = context.getString("prefix"); suffix = context.getString("suffix","atguigu"); } @Override public Status process() throws EventDeliveryException { Status status = null; try { //模擬接收數據 for (int i = 0; i < 5; i++) { SimpleEvent event = new SimpleEvent(); event.setBody((prefix+"--"+i+"--"+suffix).getBytes()); //將數據發送到channel getChannelProcessor().processEvent(event); status = Status.READY; } }catch (Exception e){ status = Status.BACKOFF; } try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
flume配置
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.atguigu.source.MySource a1.sources.r1.prefix = feiji a1.sources.r1.suffix = xiaxian # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
測試略