Nginx=>Flume=>Kafka=>Flink => Hadoop流程


簡介

Flume:Flume是一種分布式,可靠且可用的服務,用於有效地收集,聚合和移動大量日志數據。 它具有基於流數據流的簡單靈活的架構。 它具有可靠的可靠性機制和許多故障轉移和恢復機制,具有強大的容錯能力。 它使用簡單的可擴展數據模型,允許在線分析應用程序。
Kafka:是一個分布式的,高吞吐量,易於擴展地基於主題發布/訂閱的消息系統,流計算系統的數據源。流數據產生系統作為 Kafka 消息數據的生產者將數據流分發給 Kafka 消息主題,流數據計算系統 (Storm,Spark Streaming 等) 實時消費並計算數據。
Flink:Apache Flink是一個框架和分布式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink設計為在所有常見的集群環境中運行,以內存速度和任何規模執行計算
Hadoop: 分布式文件系統
 
相關文檔

Flume安裝、收集Nginx日志、輸出到Kafka

Kafka單機安裝和啟動,官方文檔:https://kafka.apache.org/quickstart

Fink官網:https://flink.apache.org/

Hadoop 官方文檔:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html

環境

Ubuntu18.04

apache-flume-1.9.0-bin.tar.gz

kafka_2.11-2.3.1

hadoop-2.7.7

不安裝Zookeeper,使用Kafka自帶的Zookeeper

安裝部署

Flume部署

Flume和服務器部署在一起,Kafka、Flink和Hadoop部署在一塊進行測試

安裝jdk8

sudo apt-get update
sudo apt-get install openjdk-8-jdk-headless

配置java環境變量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib

在Path后面加入解壓后的路徑

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/

apache-flume-1.9.0-bin

nginx日志====>flume

文件位置 /root/logs/access.log

xxx@xxx:/opt/apache-flume-1.9.0-bin/conf$ cat nginx-logger.conf 
# 定義一個名為a1的agent中各組件的名字 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
 
# 描述和配置 source 組件:r1 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/xxx/logs/access.log  # nginx日志文件
# a1.sources.r1.shell = /bin/sh -c
 
# 描述和配置 sink 組件:k1 
#設置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#設置Kafka的broker地址和端口號
a1.sinks.k1.brokerList=kafka-broker-IP:9092
#設置Kafka的Topic
a1.sinks.k1.topic=test
#a1.sinks.k1.batchSize = 5
#a1.sinks.k1.requiredAcks =1
#設置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder


# 描述和配置 channel 組件:c1,此處使用是內存緩存的方式 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
 
 
# 描述和配置 source、channel、sink 之間的連接關系 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

shell腳本,自動生成日志

#!/bin/bash
   i=1; j=0; while (test $i -le 6170967 ) do j=`expr $i + 9` sed -n $i,$j'p' /root/logs/all.log >> /home/xxx/logs/access.log i=`expr $i + 10` sleep 5 # echo $i done #新建這個shell文件,把這個shell文件執行起來

Flume啟動agent

./bin/flume-ng agent -n a1 -c conf -f conf/nginx-logger.conf -Dflume.root.logger=INFO,console

-a a1 指定 agent 的名字
-c conf 指定配置文件目錄
-f conf/nginx-logger.conf 指定配置文件

Kafka部署

/opt/kafka_2.11-2.3.1  # 進入Kafka目錄
bin/zookeeper-server-start.sh config/zookeeper.properties # 啟動Zookeeper服務器
bin/kafka-server-start.sh config/server.properties   # 啟動kafka服務器
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # 創建一個主題test,一個分區,一個副本
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  # 開啟一個生產者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  # 啟動一個消費者

Kafka允許遠程訪問

0.8版本有兩個配置參數需要修改:

advertised.host.name=192.168.1.13 #<==kafka開放到外網的IP(本地IP地址)
advertised.port=9092 #<==9092映射到外網后的端口

最新版本0.10.x broker配置棄用了advertised.host.name 和 advertised.port 這兩個個配置項,就配置advertised.listeners就可以了:

advertised.listeners=PLAINTEST://192.168.1.13:9092  # 本地IP地址:端口

需要知曉這些參數的同學可以參考:kafka配置文件詳解

其他配置項

#vim /opt/kafka2.11/config/server.properties

#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400

#請求套接字的緩沖區大小
socket.request.max.bytes=104857600

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

#kafka運行日志存放的路徑
log.dirs=/opt/kafka2.11/logs

#topic在當前broker上的分片個數
num.partitions=2

#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1

#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168

#滾動生成新的segment文件的最大時間
log.roll.hours=168

#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824

#周期性檢查文件大小的時間
log.retention.check.interval.ms=300000

#日志清理是否打開
log.cleaner.enable=true

#broker需要使用zookeeper保存meta數據
zookeeper.connect=localhost:2181

#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000

#partion buffer中,消息的條數達到閾值,將觸發flush到磁盤
log.flush.interval.messages=10000

#消息buffer的時間,達到閾值,將觸發flush到磁盤
log.flush.interval.ms=3000

#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true

#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=localhost

advertised.host.name=localhost    
View Code

 

Kafka基本命令

啟動Kafka服務器

kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties

Create a topic # 注意和之前flume的topic對應起來

kafka-topics.sh -create -zookeeper zookeeper001:2181 -replication-factor 1 -partitions 1 -topic test

查看所有主題

kafka-topics.sh -list -zookeeper spark001:2181

擴展:控制台作為生產者

kafka-console-producer.sh --broker-list spark001:9092 --topic test

:Start a consumer (消費者)

kafka-console-consumer.sh --zookeeper spark001:2181 --topic test

Hadoop部署

偽分布式安裝

etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-2.7.7/data</value>  # 默認在/tmp目錄下,每次重啟數據會消失,導致namenode節點啟動不了
    </property>
</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

etc/hadoop/hadoop-env.sh ,配置Java_home

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64

Format the filesystem:

$ bin/hdfs namenode -format

Start NameNode daemon and DataNode daemon:

$ sbin/start-dfs.sh

如果啟動成功

ztf@xxx:$ jps
19424 Jps
14369 NameNode
14563 DataNode
14797 SecondaryNameNode

Flink代碼示例

pom.xml依賴

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.9.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Fink 連接Hadoop -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.7</version>
        </dependency>

        <!-- 打印日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

 

全部代碼

import java.time.ZoneId;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class TestDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        
        // 添加Kafka數據源
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.252:9092");  // Kafka服務器IP和端
        properties.setProperty("zookeeper.connect", "192.168.1.252:2181");  // Zookeeper服務器IP和端口
        properties.setProperty("group.id", "test");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
                properties);
        DataStream<String> stream = env.addSource(myConsumer);
        
        // 添加hadoop輸出Sink
        stream.print();
        // 方式1:將數據導入Hadoop的文件夾
        //stream.writeAsText("hdfs://127.0.0.1:9000/flink/test");
        // 方式2:將數據導入Hadoop的文件夾
        BucketingSink<String> hadoopSink = new BucketingSink<>("hdfs://127.0.0.1:9000/flink");
        // 使用東八區時間格式"yyyy-MM-dd--HH"命名存儲區
        hadoopSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH", ZoneId.of("Asia/Shanghai")));
        // 下述兩種條件滿足其一時,創建新的塊文件
        // 條件1.設置塊大小為100MB
        hadoopSink.setBatchSize(1024 * 1024 * 100);
        // 條件2.設置時間間隔20min
        hadoopSink.setBatchRolloverInterval(20 * 60 * 1000);
        // 設置塊文件前綴
        hadoopSink.setPendingPrefix("");
        // 設置塊文件后綴
        hadoopSink.setPendingSuffix("");
        // 設置運行中的文件前綴
        hadoopSink.setInProgressPrefix(".");
        // 添加Hadoop-Sink,處理相應邏輯
        stream.addSink(hadoopSink);
        
        env.execute("WordCount from Kafka data");
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM