简介
Flume:Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。 它具有基于流数据流的简单灵活的架构。 它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错能力。 它使用简单的可扩展数据模型,允许在线分析应用程序。Kafka单机安装和启动,官方文档:https://kafka.apache.org/quickstart
Fink官网:https://flink.apache.org/
Hadoop 官方文档:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
环境
Ubuntu18.04
apache-flume-1.9.0-bin.tar.gz
kafka_2.11-2.3.1
hadoop-2.7.7
不安装Zookeeper,使用Kafka自带的Zookeeper
安装部署
Flume部署
Flume和服务器部署在一起,Kafka、Flink和Hadoop部署在一块进行测试
安装jdk8
sudo apt-get update sudo apt-get install openjdk-8-jdk-headless
配置java环境变量
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64 export JRE_HOME=$JAVA_HOME/jre export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
在Path后面加入解压后的路径
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/
apache-flume-1.9.0-bin
nginx日志====>flume
文件位置 /root/logs/access.log
xxx@xxx:/opt/apache-flume-1.9.0-bin/conf$ cat nginx-logger.conf # 定义一个名为a1的agent中各组件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置 source 组件:r1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/xxx/logs/access.log # nginx日志文件 # a1.sources.r1.shell = /bin/sh -c # 描述和配置 sink 组件:k1 #设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka的broker地址和端口号 a1.sinks.k1.brokerList=kafka-broker-IP:9092 #设置Kafka的Topic a1.sinks.k1.topic=test #a1.sinks.k1.batchSize = 5 #a1.sinks.k1.requiredAcks =1 #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder # 描述和配置 channel 组件:c1,此处使用是内存缓存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置 source、channel、sink 之间的连接关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
shell脚本,自动生成日志
#!/bin/bash
i=1; j=0; while (test $i -le 6170967 ) do j=`expr $i + 9` sed -n $i,$j'p' /root/logs/all.log >> /home/xxx/logs/access.log i=`expr $i + 10` sleep 5 # echo $i done #新建这个shell文件,把这个shell文件执行起来
Flume启动agent
./bin/flume-ng agent -n a1 -c conf -f conf/nginx-logger.conf -Dflume.root.logger=INFO,console -a a1 指定 agent 的名字 -c conf 指定配置文件目录 -f conf/nginx-logger.conf 指定配置文件
Kafka部署
/opt/kafka_2.11-2.3.1 # 进入Kafka目录 bin/zookeeper-server-start.sh config/zookeeper.properties # 启动Zookeeper服务器 bin/kafka-server-start.sh config/server.properties # 启动kafka服务器 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # 创建一个主题test,一个分区,一个副本 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 开启一个生产者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning # 启动一个消费者
Kafka允许远程访问
0.8版本有两个配置参数需要修改:
advertised.host.name=192.168.1.13 #<==kafka开放到外网的IP(本地IP地址) advertised.port=9092 #<==9092映射到外网后的端口
最新版本0.10.x broker配置弃用了advertised.host.name 和 advertised.port 这两个个配置项,就配置advertised.listeners就可以了:
advertised.listeners=PLAINTEST://192.168.1.13:9092 # 本地IP地址:端口
需要知晓这些参数的同学可以参考:kafka配置文件详解
其他配置项

#vim /opt/kafka2.11/config/server.properties #接受套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #kafka运行日志存放的路径 log.dirs=/opt/kafka2.11/logs #topic在当前broker上的分片个数 num.partitions=2 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #滚动生成新的segment文件的最大时间 log.roll.hours=168 #日志文件中每个segment的大小,默认为1G log.segment.bytes=1073741824 #周期性检查文件大小的时间 log.retention.check.interval.ms=300000 #日志清理是否打开 log.cleaner.enable=true #broker需要使用zookeeper保存meta数据 zookeeper.connect=localhost:2181 #zookeeper链接超时时间 zookeeper.connection.timeout.ms=6000 #partion buffer中,消息的条数达到阈值,将触发flush到磁盘 log.flush.interval.messages=10000 #消息buffer的时间,达到阈值,将触发flush到磁盘 log.flush.interval.ms=3000 #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除 delete.topic.enable=true #此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误! host.name=localhost advertised.host.name=localhost
Kafka基本命令
启动Kafka服务器
kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties
Create a topic # 注意和之前flume的topic对应起来
kafka-topics.sh -create -zookeeper zookeeper001:2181 -replication-factor 1 -partitions 1 -topic test
查看所有主题
kafka-topics.sh -list -zookeeper spark001:2181
扩展:控制台作为生产者
kafka-console-producer.sh --broker-list spark001:9092 --topic test
:Start a consumer (消费者)
kafka-console-consumer.sh --zookeeper spark001:2181 --topic test
Hadoop部署
伪分布式安装
etc/hadoop/core-site.xml:
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/opt/hadoop-2.7.7/data</value> # 默认在/tmp目录下,每次重启数据会消失,导致namenode节点启动不了 </property> </configuration>
etc/hadoop/hdfs-site.xml:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
etc/hadoop/hadoop-env.sh ,配置Java_home
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
Format the filesystem:
$ bin/hdfs namenode -format
Start NameNode daemon and DataNode daemon:
$ sbin/start-dfs.sh
如果启动成功
ztf@xxx:$ jps 19424 Jps 14369 NameNode 14563 DataNode 14797 SecondaryNameNode
Flink代码示例
pom.xml依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- Fink 连接Hadoop --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.7</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.7</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.7</version> </dependency> <!-- 打印日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies>
全部代码
import java.time.ZoneId; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; public class TestDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); // 添加Kafka数据源 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.1.252:9092"); // Kafka服务器IP和端 properties.setProperty("zookeeper.connect", "192.168.1.252:2181"); // Zookeeper服务器IP和端口 properties.setProperty("group.id", "test"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); // 添加hadoop输出Sink stream.print(); // 方式1:将数据导入Hadoop的文件夹 //stream.writeAsText("hdfs://127.0.0.1:9000/flink/test"); // 方式2:将数据导入Hadoop的文件夹 BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://127.0.0.1:9000/flink"); // 使用东八区时间格式"yyyy-MM-dd--HH"命名存储区 hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai"))); // 下述两种条件满足其一时,创建新的块文件 // 条件1.设置块大小为100MB hadoopSink.setBatchSize(1024 * 1024 * 100); // 条件2.设置时间间隔20min hadoopSink.setBatchRolloverInterval(20 * 60 * 1000); // 设置块文件前缀 hadoopSink.setPendingPrefix(""); // 设置块文件后缀 hadoopSink.setPendingSuffix(""); // 设置运行中的文件前缀 hadoopSink.setInProgressPrefix("."); // 添加Hadoop-Sink,处理相应逻辑 stream.addSink(hadoopSink); env.execute("WordCount from Kafka data"); } }