首先是關於flume的基礎介紹
組件名稱 |
功能介紹 |
Agent代理 |
使用JVM 運行Flume。每台機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。 |
Client客戶端 |
生產數據,運行在一個獨立的線程。 |
Source源 |
從Client收集數據,傳遞給Channel。 |
Sink接收器 |
從Channel收集數據,進行相關操作,運行在一個獨立線程。 |
Channel通道 |
連接 sources 和 sinks ,這個有點像一個隊列。 |
Events事件 |
傳輸的基本數據負載。 |
目前來說flume是支持多種source
其中是支持讀取jms消息隊列消息,網上並沒有找到關於讀取rabbitmq的教程
雖然flume並不支持讀取rabbitMq,所以需要對flume進行二次開發
這里主要就是flume怎么從rabbitMq讀取數據
這里從git上找到了一個關於flume從rabbitMq讀取數據的插件
下載地址是:https://github.com/gmr/rabbitmq-flume-plugin
上面有一些英文的描述,大家可以看下
環境介紹
centOS 7.3 jdk1.8 cdh5.14.0
1.用 mvn 打包該項目,會生成兩個JAR包
2.因為我這邊使用的以cdh方式安裝集成flume的,所以把這兩個jar 扔到 /usr/lib 下面
如果是普通的安裝方式需要把這兩個jar包復制到 flume安裝目錄的lib下面
3.進入cdh管理頁面配置Agent
下面是詳細的配置,我這邊是直接把消息寫入kafka集群里 的
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
tier1.sources.source1.bind = 127.0.0.1
tier1.sources.source1.port = 5672
tier1.sources.source1.virtual-host = /
tier1.sources.source1.username = guest
tier1.sources.source1.password = guest
tier1.sources.source1.queue = test
tier1.sources.source1.prefetchCount = 10
tier1.sources.source1.channels = channel1
tier1.sources.source1.threads = 2
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
tier1.sources.source1.interceptors.i1.preserveExisting = true
tier1.channels.channel1.type = memory
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = flume_out
tier1.sinks.sink1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,27.0.0.1:9094
tier1.sinks.sink1.requiredAcks = 1
tier1.sinks.sink11.batchSize = 20
配置完成更新配置重新啟動Agent
這個是接收到rabbitMq消息
大功告成,如果配置中有疑問的可以留言,我看到后會回復