Nginx=>Flume=>Kafka=>Flink => Hadoop流程


简介

Flume:Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。 它具有基于流数据流的简单灵活的架构。 它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错能力。 它使用简单的可扩展数据模型,允许在线分析应用程序。
Kafka:是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统,流计算系统的数据源。流数据产生系统作为 Kafka 消息数据的生产者将数据流分发给 Kafka 消息主题,流数据计算系统 (Storm,Spark Streaming 等) 实时消费并计算数据。
Flink:Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在所有常见的集群环境中运行,以内存速度和任何规模执行计算
Hadoop: 分布式文件系统
 
相关文档

Flume安装、收集Nginx日志、输出到Kafka

Kafka单机安装和启动,官方文档:https://kafka.apache.org/quickstart

Fink官网:https://flink.apache.org/

Hadoop 官方文档:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html

环境

Ubuntu18.04

apache-flume-1.9.0-bin.tar.gz

kafka_2.11-2.3.1

hadoop-2.7.7

不安装Zookeeper,使用Kafka自带的Zookeeper

安装部署

Flume部署

Flume和服务器部署在一起,Kafka、Flink和Hadoop部署在一块进行测试

安装jdk8

sudo apt-get update
sudo apt-get install openjdk-8-jdk-headless

配置java环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib

在Path后面加入解压后的路径

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/

apache-flume-1.9.0-bin

nginx日志====>flume

文件位置 /root/logs/access.log

xxx@xxx:/opt/apache-flume-1.9.0-bin/conf$ cat nginx-logger.conf 
# 定义一个名为a1的agent中各组件的名字 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
 
# 描述和配置 source 组件:r1 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/xxx/logs/access.log  # nginx日志文件
# a1.sources.r1.shell = /bin/sh -c
 
# 描述和配置 sink 组件:k1 
#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=kafka-broker-IP:9092
#设置Kafka的Topic
a1.sinks.k1.topic=test
#a1.sinks.k1.batchSize = 5
#a1.sinks.k1.requiredAcks =1
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder


# 描述和配置 channel 组件:c1,此处使用是内存缓存的方式 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
 
 
# 描述和配置 source、channel、sink 之间的连接关系 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

shell脚本,自动生成日志

#!/bin/bash
   i=1; j=0; while (test $i -le 6170967 ) do j=`expr $i + 9` sed -n $i,$j'p' /root/logs/all.log >> /home/xxx/logs/access.log i=`expr $i + 10` sleep 5 # echo $i done #新建这个shell文件,把这个shell文件执行起来

Flume启动agent

./bin/flume-ng agent -n a1 -c conf -f conf/nginx-logger.conf -Dflume.root.logger=INFO,console

-a a1 指定 agent 的名字
-c conf 指定配置文件目录
-f conf/nginx-logger.conf 指定配置文件

Kafka部署

/opt/kafka_2.11-2.3.1  # 进入Kafka目录
bin/zookeeper-server-start.sh config/zookeeper.properties # 启动Zookeeper服务器
bin/kafka-server-start.sh config/server.properties   # 启动kafka服务器
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # 创建一个主题test,一个分区,一个副本
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  # 开启一个生产者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  # 启动一个消费者

Kafka允许远程访问

0.8版本有两个配置参数需要修改:

advertised.host.name=192.168.1.13 #<==kafka开放到外网的IP(本地IP地址)
advertised.port=9092 #<==9092映射到外网后的端口

最新版本0.10.x broker配置弃用了advertised.host.name 和 advertised.port 这两个个配置项,就配置advertised.listeners就可以了:

advertised.listeners=PLAINTEST://192.168.1.13:9092  # 本地IP地址:端口

需要知晓这些参数的同学可以参考:kafka配置文件详解

其他配置项

#vim /opt/kafka2.11/config/server.properties

#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小
socket.request.max.bytes=104857600

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

#kafka运行日志存放的路径
log.dirs=/opt/kafka2.11/logs

#topic在当前broker上的分片个数
num.partitions=2

#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除
log.retention.hours=168

#滚动生成新的segment文件的最大时间
log.roll.hours=168

#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

#周期性检查文件大小的时间
log.retention.check.interval.ms=300000

#日志清理是否打开
log.cleaner.enable=true

#broker需要使用zookeeper保存meta数据
zookeeper.connect=localhost:2181

#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=localhost

advertised.host.name=localhost    
View Code

 

Kafka基本命令

启动Kafka服务器

kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties

Create a topic # 注意和之前flume的topic对应起来

kafka-topics.sh -create -zookeeper zookeeper001:2181 -replication-factor 1 -partitions 1 -topic test

查看所有主题

kafka-topics.sh -list -zookeeper spark001:2181

扩展:控制台作为生产者

kafka-console-producer.sh --broker-list spark001:9092 --topic test

:Start a consumer (消费者)

kafka-console-consumer.sh --zookeeper spark001:2181 --topic test

Hadoop部署

伪分布式安装

etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-2.7.7/data</value>  # 默认在/tmp目录下,每次重启数据会消失,导致namenode节点启动不了
    </property>
</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

etc/hadoop/hadoop-env.sh ,配置Java_home

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64

Format the filesystem:

$ bin/hdfs namenode -format

Start NameNode daemon and DataNode daemon:

$ sbin/start-dfs.sh

如果启动成功

ztf@xxx:$ jps
19424 Jps
14369 NameNode
14563 DataNode
14797 SecondaryNameNode

Flink代码示例

pom.xml依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.9.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Fink 连接Hadoop -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.7</version>
        </dependency>

        <!-- 打印日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

 

全部代码

import java.time.ZoneId;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class TestDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        
        // 添加Kafka数据源
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.252:9092");  // Kafka服务器IP和端
        properties.setProperty("zookeeper.connect", "192.168.1.252:2181");  // Zookeeper服务器IP和端口
        properties.setProperty("group.id", "test");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
                properties);
        DataStream<String> stream = env.addSource(myConsumer);
        
        // 添加hadoop输出Sink
        stream.print();
        // 方式1:将数据导入Hadoop的文件夹
        //stream.writeAsText("hdfs://127.0.0.1:9000/flink/test");
        // 方式2:将数据导入Hadoop的文件夹
        BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://127.0.0.1:9000/flink");
        // 使用东八区时间格式"yyyy-MM-dd--HH"命名存储区
        hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")));
        // 下述两种条件满足其一时,创建新的块文件
        // 条件1.设置块大小为100MB
        hadoopSink.setBatchSize(1024 * 1024 * 100);
        // 条件2.设置时间间隔20min
        hadoopSink.setBatchRolloverInterval(20 * 60 * 1000);
        // 设置块文件前缀
        hadoopSink.setPendingPrefix("");
        // 设置块文件后缀
        hadoopSink.setPendingSuffix("");
        // 设置运行中的文件前缀
        hadoopSink.setInProgressPrefix(".");
        // 添加Hadoop-Sink,处理相应逻辑
        stream.addSink(hadoopSink);
        
        env.execute("WordCount from Kafka data");
    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM