簡介
Flume:Flume是一種分布式,可靠且可用的服務,用於有效地收集,聚合和移動大量日志數據。 它具有基於流數據流的簡單靈活的架構。 它具有可靠的可靠性機制和許多故障轉移和恢復機制,具有強大的容錯能力。 它使用簡單的可擴展數據模型,允許在線分析應用程序。Kafka單機安裝和啟動,官方文檔:https://kafka.apache.org/quickstart
Fink官網:https://flink.apache.org/
Hadoop 官方文檔:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
環境
Ubuntu18.04
apache-flume-1.9.0-bin.tar.gz
kafka_2.11-2.3.1
hadoop-2.7.7
不安裝Zookeeper,使用Kafka自帶的Zookeeper
安裝部署
Flume部署
Flume和服務器部署在一起,Kafka、Flink和Hadoop部署在一塊進行測試
安裝jdk8
sudo apt-get update sudo apt-get install openjdk-8-jdk-headless
配置java環境變量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64 export JRE_HOME=$JAVA_HOME/jre export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
在Path后面加入解壓后的路徑
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/
apache-flume-1.9.0-bin
nginx日志====>flume
文件位置 /root/logs/access.log
xxx@xxx:/opt/apache-flume-1.9.0-bin/conf$ cat nginx-logger.conf # 定義一個名為a1的agent中各組件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置 source 組件:r1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/xxx/logs/access.log # nginx日志文件 # a1.sources.r1.shell = /bin/sh -c # 描述和配置 sink 組件:k1 #設置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #設置Kafka的broker地址和端口號 a1.sinks.k1.brokerList=kafka-broker-IP:9092 #設置Kafka的Topic a1.sinks.k1.topic=test #a1.sinks.k1.batchSize = 5 #a1.sinks.k1.requiredAcks =1 #設置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder # 描述和配置 channel 組件:c1,此處使用是內存緩存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置 source、channel、sink 之間的連接關系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
shell腳本,自動生成日志
#!/bin/bash
i=1; j=0; while (test $i -le 6170967 ) do j=`expr $i + 9` sed -n $i,$j'p' /root/logs/all.log >> /home/xxx/logs/access.log i=`expr $i + 10` sleep 5 # echo $i done #新建這個shell文件,把這個shell文件執行起來
Flume啟動agent
./bin/flume-ng agent -n a1 -c conf -f conf/nginx-logger.conf -Dflume.root.logger=INFO,console -a a1 指定 agent 的名字 -c conf 指定配置文件目錄 -f conf/nginx-logger.conf 指定配置文件
Kafka部署
/opt/kafka_2.11-2.3.1 # 進入Kafka目錄 bin/zookeeper-server-start.sh config/zookeeper.properties # 啟動Zookeeper服務器 bin/kafka-server-start.sh config/server.properties # 啟動kafka服務器 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # 創建一個主題test,一個分區,一個副本 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 開啟一個生產者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning # 啟動一個消費者
Kafka允許遠程訪問
0.8版本有兩個配置參數需要修改:
advertised.host.name=192.168.1.13 #<==kafka開放到外網的IP(本地IP地址) advertised.port=9092 #<==9092映射到外網后的端口
最新版本0.10.x broker配置棄用了advertised.host.name 和 advertised.port 這兩個個配置項,就配置advertised.listeners就可以了:
advertised.listeners=PLAINTEST://192.168.1.13:9092 # 本地IP地址:端口
需要知曉這些參數的同學可以參考:kafka配置文件詳解
其他配置項

#vim /opt/kafka2.11/config/server.properties #接受套接字的緩沖區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩沖區大小 socket.request.max.bytes=104857600 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #kafka運行日志存放的路徑 log.dirs=/opt/kafka2.11/logs #topic在當前broker上的分片個數 num.partitions=2 #用來恢復和清理data下數據的線程數量 num.recovery.threads.per.data.dir=1 #segment文件保留的最長時間,超時將被刪除 log.retention.hours=168 #滾動生成新的segment文件的最大時間 log.roll.hours=168 #日志文件中每個segment的大小,默認為1G log.segment.bytes=1073741824 #周期性檢查文件大小的時間 log.retention.check.interval.ms=300000 #日志清理是否打開 log.cleaner.enable=true #broker需要使用zookeeper保存meta數據 zookeeper.connect=localhost:2181 #zookeeper鏈接超時時間 zookeeper.connection.timeout.ms=6000 #partion buffer中,消息的條數達到閾值,將觸發flush到磁盤 log.flush.interval.messages=10000 #消息buffer的時間,達到閾值,將觸發flush到磁盤 log.flush.interval.ms=3000 #刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除 delete.topic.enable=true #此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful 錯誤! host.name=localhost advertised.host.name=localhost
Kafka基本命令
啟動Kafka服務器
kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties
Create a topic # 注意和之前flume的topic對應起來
kafka-topics.sh -create -zookeeper zookeeper001:2181 -replication-factor 1 -partitions 1 -topic test
查看所有主題
kafka-topics.sh -list -zookeeper spark001:2181
擴展:控制台作為生產者
kafka-console-producer.sh --broker-list spark001:9092 --topic test
:Start a consumer (消費者)
kafka-console-consumer.sh --zookeeper spark001:2181 --topic test
Hadoop部署
偽分布式安裝
etc/hadoop/core-site.xml:
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/opt/hadoop-2.7.7/data</value> # 默認在/tmp目錄下,每次重啟數據會消失,導致namenode節點啟動不了 </property> </configuration>
etc/hadoop/hdfs-site.xml:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
etc/hadoop/hadoop-env.sh ,配置Java_home
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
Format the filesystem:
$ bin/hdfs namenode -format
Start NameNode daemon and DataNode daemon:
$ sbin/start-dfs.sh
如果啟動成功
ztf@xxx:$ jps 19424 Jps 14369 NameNode 14563 DataNode 14797 SecondaryNameNode
Flink代碼示例
pom.xml依賴
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- Fink 連接Hadoop --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.7</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.7</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.7</version> </dependency> <!-- 打印日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies>
全部代碼
import java.time.ZoneId; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; public class TestDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); // 添加Kafka數據源 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.1.252:9092"); // Kafka服務器IP和端 properties.setProperty("zookeeper.connect", "192.168.1.252:2181"); // Zookeeper服務器IP和端口 properties.setProperty("group.id", "test"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); // 添加hadoop輸出Sink stream.print(); // 方式1:將數據導入Hadoop的文件夾 //stream.writeAsText("hdfs://127.0.0.1:9000/flink/test"); // 方式2:將數據導入Hadoop的文件夾 BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://127.0.0.1:9000/flink"); // 使用東八區時間格式"yyyy-MM-dd--HH"命名存儲區 hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai"))); // 下述兩種條件滿足其一時,創建新的塊文件 // 條件1.設置塊大小為100MB hadoopSink.setBatchSize(1024 * 1024 * 100); // 條件2.設置時間間隔20min hadoopSink.setBatchRolloverInterval(20 * 60 * 1000); // 設置塊文件前綴 hadoopSink.setPendingPrefix(""); // 設置塊文件后綴 hadoopSink.setPendingSuffix(""); // 設置運行中的文件前綴 hadoopSink.setInProgressPrefix("."); // 添加Hadoop-Sink,處理相應邏輯 stream.addSink(hadoopSink); env.execute("WordCount from Kafka data"); } }