flume 的source 、channel和sink 多種組合 2017年01月11日 22:51:15 閱讀數:2744 樂高積木flume flume 有三大組件source 、channel和sink,各個組件之間都可以相互組合使用,各組件間耦合度低。使用靈活,方便。 1.多sink channel 的內容只輸出一次,同一個event 如果sink1 輸出,sink2 不輸出;如果sink1 輸出,sink1 不輸出。 最終 sink1+sink2=channel 中的數據。 配置文件如下: a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.shell = /bin/bash -c a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #sink1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy #sink2 a1.sinks.k2.type = file_roll a1.sinks.k2.channel = c1 #a1.sinks.k2.sink.rollInterval=0 a1.sinks.k2.sink.directory = /opt/apps/tmp 2.多 channel 多sink ,每個sink 輸出內容一致 (memory channel 用於kafka操作,實時性高,file channel 用於 sink file 數據安全性高) (多channel 單 sink 的情況沒有舉例,個人感覺用處不廣泛。) 配置文件如下: a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.shell = /bin/bash -c a1.sources.r1.channels = c1 c2 a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log #多個channel 的數據相同 a1.sources.r1.selector.type=replicating # channel1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #channel2 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /opt/apps/flume-1.7.0/checkpoint a1.channels.c2.dataDirs = /opt/apps/flume-1.7.0/data #sink1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy #sink2 a1.sinks.k2.type = file_roll a1.sinks.k2.channel = c2 #a1.sinks.k2.sink.rollInterval=0 a1.sinks.k2.sink.directory = /opt/apps/tmp
3. 多source 單 channel 單 sink 多個source 可以讀取多種信息放在一個channel 然后輸出到同一個地方 配置文件如下: a1.sources = r1 r2 a1.sinks = k1 a1.channels = c1 # source1 a1.sources.r1.type = exec a1.sources.r1.shell = /bin/bash -c a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log # source2 a1.sources.r2.type = exec a1.sources.r2.shell = /bin/bash -c a1.sources.r2.channels = c1 a1.sources.r2.command = tail -F /opt/apps/logs/tail2.log # channel1 in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #sink1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy