Flume 讀取RabbitMq消息隊列消息,並將消息寫入kafka


首先是關於flume的基礎介紹

組件名稱    

功能介紹

Agent代理

使用JVM 運行Flume。每台機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。

Client客戶端

生產數據,運行在一個獨立的線程。

Source源

從Client收集數據,傳遞給Channel。

Sink接收器

從Channel收集數據,進行相關操作,運行在一個獨立線程。

Channel通道

連接 sources 和 sinks ,這個有點像一個隊列。

Events事件

傳輸的基本數據負載。

目前來說flume是支持多種source

其中是支持讀取jms消息隊列消息,網上並沒有找到關於讀取rabbitmq的教程

雖然flume並不支持讀取rabbitMq,所以需要對flume進行二次開發

這里主要就是flume怎么從rabbitMq讀取數據

這里從git上找到了一個關於flume從rabbitMq讀取數據的插件

  下載地址是:https://github.com/gmr/rabbitmq-flume-plugin

上面有一些英文的描述,大家可以看下

環境介紹

centOS 7.3   jdk1.8   cdh5.14.0

1.用 mvn 打包該項目,會生成兩個JAR包

 

 

2.因為我這邊使用的以cdh方式安裝集成flume的,所以把這兩個jar  扔到  /usr/lib   下面

  如果是普通的安裝方式需要把這兩個jar包復制到 flume安裝目錄的lib下面

 

 

3.進入cdh管理頁面配置Agent

 

下面是詳細的配置,我這邊是直接把消息寫入kafka集群里 的

tier1.sources  = source1

tier1.channels = channel1

tier1.sinks    = sink1

tier1.sources.source1.type     = com.aweber.flume.source.rabbitmq.RabbitMQSource

tier1.sources.source1.bind     = 127.0.0.1

tier1.sources.source1.port     = 5672

tier1.sources.source1.virtual-host = /

tier1.sources.source1.username = guest

tier1.sources.source1.password = guest

tier1.sources.source1.queue = test

tier1.sources.source1.prefetchCount = 10

tier1.sources.source1.channels = channel1

tier1.sources.source1.threads = 2

tier1.sources.source1.interceptors = i1

tier1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

tier1.sources.source1.interceptors.i1.preserveExisting = true

tier1.channels.channel1.type   = memory

tier1.sinks.sink1.channel      = channel1

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink 

tier1.sinks.sink1.topic = flume_out 

tier1.sinks.sink1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,27.0.0.1:9094

tier1.sinks.sink1.requiredAcks = 1 

tier1.sinks.sink11.batchSize = 20 

配置完成更新配置重新啟動Agent

 

 

 

這個是接收到rabbitMq消息

 

 

大功告成,如果配置中有疑問的可以留言,我看到后會回復


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM