flume+kafka+zookeeper+spark+infuxdb+grafana+kapacitor監控平台


 

架構圖(資源問題一切從簡)

 

下載必須的包  (注意 kafka spark對jdk,scala 版本有要求,官網查看)

 

 wget https://dl.influxdata.com/influxdb/releases/influxdb-1.5.2.x86_64.rpm 
 yum localinstall influxdb-1.5.2.x86_64.rpm 
 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.4.1.x86_64.rpm
yum localinstall kapacitor-1.4.1.x86_64.rpm 
由於網絡問題同一下載了再上傳到服務器

192.168.10.129  flume infuxdb grafana kapacitor

192.168.10.130  kafka spark

安裝flume-ng(需要安裝jdk1.8)

flume是二進制包直接解壓就行

cd conf;cp flume-env.sh.template  flume-env.sh

echo -e 'export JAVA_HOME=/opt/jdk1.8\nexport JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"'>>flume-env.sh

flume-ng詳細介紹可參考 https://blog.csdn.net/zhaodedong/article/details/52541688

source 詳解

Source類型    說明
Avro Source    支持Avro協議(實際上是Avro RPC),內置支持
Thrift Source    支持Thrift協議,內置支持
Exec Source    基於Unix的command在標准輸出上生產數據
JMS Source    從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過
Spooling Directory Source    監控指定目錄內數據變更
Twitter 1% firehose Source    通過API持續下載Twitter數據,試驗性質
Netcat Source    監控某個端口,將流經端口的每一個文本行數據作為Event輸入
Sequence Generator Source    序列生成器數據源,生產序列數據
Syslog Sources    讀取syslog數據,產生Event,支持UDP和TCP兩種協議
HTTP Source    基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
Legacy Sources    兼容老的Flume OG中Source(0.9.x版本)
View Code

channel詳解

Channel類型    說明
Memory Channel    Event數據存儲在內存中
JDBC Channel    Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel    Event數據存儲在磁盤文件中
Spillable Memory Channel    Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
Pseudo Transaction Channel    測試用途
Custom Channel    自定義Channel實現
View Code

sink詳解

Sink類型    說明
HDFS Sink    數據寫入HDFS
Logger Sink    數據寫入日志文件
Avro Sink    數據被轉換成Avro Event,然后發送到配置的RPC端口上
Thrift Sink    數據被轉換成Thrift Event,然后發送到配置的RPC端口上
IRC Sink    數據在IRC上進行回放
File Roll Sink    存儲數據到本地文件系統
Null Sink    丟棄到所有數據
HBase Sink    數據寫入HBase數據庫
Morphline Solr Sink    數據發送到Solr搜索服務器(集群)
ElasticSearch Sink    數據發送到Elastic Search搜索服務器(集群)
Kite Dataset Sink    寫數據到Kite Dataset,試驗性質的
Custom Sink    自定義Sink實現
View Code

 flume.conf配置

# logser可以看做是flume服務的名稱,每個flume都由sources、channels和sinks三部分組成 ,source sinks可以多個用空格隔開
# # sources可以看做是數據源頭、channels是中間轉存的渠道、sinks是數據后面的去向
logser.sources = src_dir
logser.sinks = sink_kfk
logser.channels = ch
# # source
# # 源頭類型
logser.sources.src_dir.type = TAILDIR
# # 記錄所有監控的文件信息 
logser.sources.src_dir.positionFile=/opt/flume-1.8.0/logs/taildir_position.json
logser.sources.src_dir.filegroups = f1
logser.sources.src_dir.filegroups.f1=/opt/logs/.*
logser.sources.src_dir.filegroups.f1.headerKey1 = tomcatAPP
logser.sources.src_dir.filegroups.fileHeader = true
# # channel
logser.channels.ch.type = memory
logser.channels.ch.capacity = 10000
logser.channels.ch.transactionCapacity = 1000
# # kfk sink
# # 指定sink類型是Kafka,說明日志最后要發送到Kafka
logser.sinks.sink_kfk.type = org.apache.flume.sink.kafka.KafkaSink
# # Kafka servers#多個用逗號區分
logser.sinks.sink_kfk.kafka.bootstrap.servers= 192.168.10.130:9092
logser.sinks.sink_kfk.kafka.topic=tomcatCom
# # Bind the source and sink to the channel
logser.sources.src_dir.channels = ch
logser.sinks.sink_kfk.channel = ch

啟動flume(先啟動kafka)

nohup  bin/flume-ng agent --conf conf/ --conf-file conf/flume.conf --name logser -Dflume.root.logger=INFO,console >logs/fume-ng.log 2>&1 &

java log4j延伸

log4j.properties

log4j.rootlogger=INFO,stdout
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = flume //遠程flume agent avro主機名
log4j.appender.flume.Port = 41414 /遠程flume agent avro監聽端口
log4j.appender.flume.UnsafeMode = true
/* 注意jar應用需要加依賴
<dependency>  
    <groupId>org.apache.flume.flume-ng-clients</groupId>  
    <artifactId>flume-ng-log4jappender</artifactId>  
    <version>與flume版本一樣即可</version>  
</dependency>  
*/

flume.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.10.129
agent1.sources.avro-source.port=41414

#define channel
agent1.channels,logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic=mytest #kafka topic
agent1.sinks.kafka-sink.bootstrap.servers=192.168.10.130:9092
agent1.sinks.kafka-sink.flumeBatchSize=20  #一批中處理多少條消息。較大的批次可以提高吞吐量,默認100
agent1.sinks.kafka-sink.producer.acks=1 #默認為1

# Bind the source and sink to the channel
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

 

安裝kafka

安裝scalca環境

yum localinstall scala-2.11.7.rpm 

安裝 jdk環境

export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:/opt/jdk1.8/bin

直接解壓kafka包

ZK使用的是kafka自帶的

zookeeper.properties

dataDir=/opt/kafka_2.11/zk
dataLogDir=/opt/kafka_2.11/zk/logs
clientPort=2181
maxClientCnxns=0

server.properties (本地測試環境單節點)

broker.id=0
#默認情況下Producer往一個不存在的Topic發送message時會自動創建這個Topic
#默認刪除時,會出現“marked for deletion”提示,只是將該topic標記為刪除,使用list命令仍然能看到
#創建調整為不自動創建,刪除怕誤刪所以保持默認不修改
auto.create.topics.enable=false
delete.topic.enable=false
port=9092
host.name=192.168.10.130
#默認是域名 外部網絡或者未配置hostname映射,報錯org.apache.kafka.common.errors.TimeoutException: Batch Expired
advertised.host.name=192.168.10.130
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka_2.11/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.130:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

生產消費配置

#producer.properties
bootstrap.servers=192.168.10.130:9092
compression.type=none
#consumer.properties
bootstrap.servers=192.168.10.130:9092
group.id=test-consumer-group

常見命令

#啟動zk
bin/zookeeper-server-start.sh config/zookeeper.properties >> /opt/kafka_2.11/zk/logs/zk.log &
#啟動kafka
bin/kafka-server-start.sh -daemon config/server.properties &
#創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.10.130:2181 --replication-factor 1 --partitions 1 --topic tomcatCom
#查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.10.130:2181
bin/kafka-topics.sh --describe --zookeeper 192.168.10.130:2181 --topic test 
#模擬生產
bin/kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic test 
#模擬消費
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9092 --topic tomcatCom  --from-beginning --consumer.config config/consumer.properties

安裝spark

安裝scala,jdk,python 版本要求看官網

資源有限本地master采用local執行

安裝infuxdb+grafana+kapacitor

yum localinstall influxdb-1.5.2.x86_64.rpm  grafana-5.1.3-1.x86_64.rpm  kapacitor-1.4.1.x86_64.rpm 

influxdb 

service influxdb start

配置文件可參考http://www.ywnds.com/?p=10763

簡單運用可參考https://www.linuxdaxue.com/influxdb-study-series-manual.html

鑒權參考https://blog.csdn.net/caodanwang/article/details/51967393

influx -host '192.168.10.129' -port '8086'

influx -host '192.168.10.129' -port '8086' -username 'root' -password '123456'

curl -POST http://192.168.10.129:8086/query --data-urlencode "q=CREATE DATABASE mydb"

 數據保留策略

name--名稱,此示例名稱為 default
duration--持續時間,0代表無限制
shardGroupDuration--shardGroup的存儲時間,shardGroup是InfluxDB的一個基本儲存結構,應該大於這個時間的數據在查詢效率上應該有所降低。
replicaN--全稱是REPLICATION,副本個數
default--是否是默認策略
SHOW RETENTION POLICIES ON nginxlog_clear 
CREATE RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 2h REPLICATION 1 DEFAULT   -- 創建name為2_hours保留策略2小時的默認規則 --
ALTER RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 4h DEFAULT -- 修改為4小時 --
drop retention POLICY "2_hours" ON "nginxlog_clear" -- 刪除 刪除了也會采用這策略報錯找不到2_hours策略需要將autogen的default改為true--

 連續查詢

show continuous queries 
CREATE CONTINUOUS QUERY cq_1m ON nginxlog_clear BEGIN SELECT count(count) AS count_404 INTO current_data.two_year.ten_min_count FROM nginxlog_clear.autogen.nginx WHERE status = '404' GROUP BY time(1m) END
-- cq_1m 連續查詢名字 nginxlog_clear,current_data數據庫 --
-- two_year,autogen保留策略--
-- ten_min_count,nginx表 --
DROP CONTINUOUS QUERY <cq_name> ON <database_name>
-- 刪除,持續查詢不能修改只能刪除重新配置--

influxdb.conf 一些配置

bind-address = "192.168.10.129:8088"
[meta]
dir = "/opt/influxdb/meta"
[data]
dir = "/opt/influxdb/data"
wal-dir = "/opt/influxdb/wal"
[http]
enabled = true
bind-address = "192.168.10.129:8086"
access-log-path = "/opt/influxdb/data/logs"
[continuous_queries]
enabled = true
log-enabled = true
run-interval = "1m"  #可以根據最短的持續查詢group by time(1m) 設定檢測時間

 grafana配置

[server]
domain = 192.168.10.129
[smtp]
enabled = true 
host = 192.168.10.129:25  #注意postfix郵箱改成inet_interfaces = 192.168.10.129
password = xian6630753  
from_address = admin@grafana.com
from_name = Grafana

 配置圖形

 

配置告警郵箱

 spark

 NginxClear  //這里正則匹配用正則結合模式匹配更優

 

package com.sgm.spark
import scala.util
import scalaj.http._
import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object NginxClear {
  def main(args: Array[String]) {
    if (args.length !=2) {
      System.err.println(s"""
                            |Usage: DirectKafkaWordCount <brokers> <topics>
                            |  <bootstrap.servers> is a list of one or more Kafka brokers
                            |  <topics> is a list of one or more kafka topics to consume from
                            |
        """.stripMargin)
      System.exit(1)
    }


    val Array(brokers, topics) = args

    // Create context with 30 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(30))
    ssc.checkpoint("C:\\Users\\itlocal\\IdeaProjects\\nginxlog\\checkpoint")
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)  //采用checkpoint提交記錄的偏移量,沒有的話執行auto.offset.reset
    )
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    val logs=messages.map(_.value)
    /*改用正則匹配+模式匹配結合
    val clearDate=logs.map(line=> {
      val info = line.split("\n")
      val infoPattern ="""(\d+\.\d+\.\d+\.\d+)(.*)(\d+/[A-Z,a-z]+/\d+\:\d+\:\d+\:\d+)(.*)([1-5]{1}[0-9]{2}) (\d+) (\"http.*\") (\".*\") (.*)""".r
      info.foreach(x=>x match {
        case infoPattern(ip,none1,currentTime,none2,respCode,requestTime,url,none3,upstreamTIME)=>
          println(ip+"\t"+currentTime+"\t"+respCode+"\t"+requestTime+"\t"+url+"\t"+upstreamTIME+"\t")
        case _=>println("none")
      })
    }) */
    val cleanDate=logs.map(mapFunc = line => {
      val influxdb_url = "http://192.168.10.129:8086/write?db=nginxlog_clear"
      val infos = line.split(" ")
      if (infos.length>10) {
        val actime = infos(3).split(" ")(0).split("\\[")(1)
        val random_num = (new util.Random).nextInt(999999)
        val curent_timestamp = (DateUtil.getTime(actime).toString + "000000").toLong + random_num  //influxdb精確到納秒,而時間戳到毫秒,不轉換成納秒時間戳不識別
        val urlPattern="\"http.*".r
        val ipPattern="^[1-5]{1}[0-9]{2}$".r
        //infos.foreach(println)
        var initUrl="\"none\""
        var initStatus="\"none\""
        infos.foreach(info=>urlPattern.findAllIn(info).foreach(x=>initUrl=Some(x).get))
        infos.foreach(info=>ipPattern.findAllIn(info).foreach(x=>initStatus=x))
        //println(initStatus)
        val date = s"nginxLog,ip=${infos(0)},acess_time=${DateUtil.parseTime(actime)},status=$initStatus,upstream_time=${infos.last} send_bytes=${infos(9)},url=$initUrl,count=1 $curent_timestamp"
        println(date)
        Http(influxdb_url).postData(date).header("content-type", "application/json").asString.code
      }
    })//.filter(clearlog=>clearlog.statusCode != 200)
    cleanDate.count().print()
    ssc.start()
    ssc.awaitTermination()
  }
}    

 

時間格式轉換

DateUtil

package com.sgm.spark


import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

object DateUtil {
  val curent_day=FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
  val targe_fomat=FastDateFormat.getInstance("yyyyMMddHHmmss")
  def getTime(time:String)={    //轉化為時間戳
    curent_day.parse(time).getTime
  }
  def parseTime(time:String)={
    targe_fomat.format(new Date(getTime(time)))
  }
  def main(args: Array[String]): Unit = {
    println(parseTime("04/MAY/2017:09:22:05"))
  }
}

 maven安裝打包 參考  scala 開發環境安裝

submit提交

bin/spark-submit --master local[2] --class com.sgm.spark.NginxClear--name NginxClear  /root/bigdata-1.0-SNAPSHOT.jar

 

 

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM