Flume 自定義Source


自定義source類,並將相關工程打包放在flume的lib目錄下

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //全局變量,僅做演示,無實際意義
    private String prefix;
    private String suffix;

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix","atguigu");
    }

    @Override
    public Status process() throws EventDeliveryException {

        Status status = null;

        try {
            //模擬接收數據
            for (int i = 0; i < 5; i++) {
                SimpleEvent event = new SimpleEvent();

                event.setBody((prefix+"--"+i+"--"+suffix).getBytes());

                //將數據發送到channel
                getChannelProcessor().processEvent(event);

                status = Status.READY;
            }
        }catch (Exception e){
            status = Status.BACKOFF;
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return status;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }


}

flume配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.MySource
a1.sources.r1.prefix = feiji
a1.sources.r1.suffix = xiaxian

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

測試略


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM