企業安全監控中的大數據技術



最近在做企業安全建設,企業安全建設中最常見的一項就是做監控,監控的種類多種多樣,但是底層的技術棧卻基本是一致的————大數據技術,下面我記錄一下我最近學習到的一些大數據技術,下文只是描述個脈絡而已。

大數據的技術棧,以及對應的上下依賴圖如下:

看完這個圖,是不是覺得和之前學習過的網絡協議、框架都非常相識,無非就是把里面的名詞替換了一下而已。我感覺軟件產品的設計思路都是要分模塊化、解耦合,你看TCP/IP協議層,每層都各司其職,每層里面的每個功能也是按照這個總體思路繼續向下設計。解耦合的好處很多,不一一列舉。

我個人覺得,里面比較有難度的就是Flink那塊,因為對數據的分析、計算處理都是在這一塊中完成的,Flink也可以用storm替換,不過性能沒有flink好。
當將計算結果存儲到ES之后,就可以做很多事了,比如做自動告警功能了。

數據源

數據源可以是任何數據,不過現在采集最多的是日志類數據,其次就是網絡流量了。

Filebeat

采集器是最容易理解的,主要是用來匯總日志然后轉發的,采集器的技術方案也有很多,這里舉例filebeat。

Filebeat主要由兩個組件構成:prospector(探測器)harvester(收集器),這兩類組件一起協作完成Filebeat的工作。

Filebeat的工作流程如下:
當開啟Filebeat程序的時候,它會啟動一個或多個探測器去檢測指定的日志目錄或文件,對於探測器找出的每一個日志文件,Filebeat會啟動收集進程,每一個收集進程讀取一個日志文件的內容,然后將這些日志數據發送到后台處理程序,后台處理程序會集合這些事件,最后發送集合的數據到output指定的目的地。

Filebeat在有數據源的機器安裝好之后,要做的就是寫一下配置,
主要配置讀取文件的路徑,以及輸出流的位置以及相應的性能參數等,以Kafka消息中間件作為緩沖,所有的日志收集器都向Kafka輸送日志流。

定義日志信息輸出格式:

<Properties>
        //存放日志的文件夾名稱
        <Property name="LOG_HOME">logs</Property>
        //日志文件名稱
        <property name="FILE_NAME">collector</property>
        //日志格式
        //[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] 日志輸入時間,東八區
        //[%level{length=5}]    日志級別,debug、info、warn、error
        //[%thread-%tid]    當前線程信息
        //[%logger] 當前日志信息所屬類全路徑
        //[%X{hostName}]    當前節點主機名。需要通過MDC來自定義。
        //[%X{ip}]  當前節點ip。需要通過MDC來自定義。
        //[%X{applicationName}] 當前應用程序名。需要通過MDC來自定義。
        //[%F,%L,%C,%M] %F:當前日志信息所屬的文件(類)名,%L:日志信息在所屬文件中的行號,%C:當前日志所屬文件的全類名,%M:當前日志所屬的方法名
        //[%m]  日志詳情
        //%ex   異常信息
        //%n    換行
        <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger]
            [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
</property>

Filebeat配置參考信息:

  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    # pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   
    # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串)
    pattern: '^\['                                # 指定匹配的表達式(匹配以 "{ 開頭的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合並到上一行的末尾
    max_lines: 2000                         # 最大的行數
    timeout: 2s                                 # 如果在規定時間沒有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: error-log-collector   ## 按服務划分用作kafka topic
    evn: dev
    
output.kafka:
  enabled: true
  hosts: ["192.168.204.139:9092"]
  topic: '%{[fields.logtopic]}'
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true

Kafka

Apache kafka是消息中間件的一種,功能是高吞吐量的分布式發布訂閱消息系統

Kafka特點:
kafka中的消息不是kafka主動去拉去的,而必須有生產者往kafka寫消息。
kafka是不會主動往消費者發布消息的,而必須有消費者主動從kafka拉取消息。

kafka名詞解釋:
kafka的幾個名詞需要知道一下,比如topic、producer、consumer、broker,下面用最俗的方式解釋

  • producer:生產者,就是它來生產“雞蛋”的。
  • consumer:消費者,生出的“雞蛋”它來消費。
  • topic:你把它理解為標簽,生產者每生產出來一個雞蛋就貼上一個標簽(topic),消費者可不是誰生產的“雞蛋”都吃的,這樣不同的生產者生產出來的“雞蛋”,消費者就可以選擇性的“吃”了。
  • broker:相當於菜市場的小販,小販從生產者手里收購了雞蛋,然后一直存儲在商店中,等待消費者來購買。他在中間作雞蛋的存儲、轉發、接受顧客問價(請求)和回答(響應)等功能。
    一個單獨的Kafka Server就是一個Broker。在一般的生產環境中,一個Broker獨占一台物理服務器。Broker的主要工作就是接收生產者發過來的消息,分配offset,之后保存到磁盤中。同時,接收消費者、其他Broker的請求,根據請求類型進行相應處理並返回響應。

kafka的單節點基本操作:
生產者

# 創建一個主題(標簽),Hello-Kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
# 生產者將等待來自stdin的輸入並發布到Kafka集群。 默認情況下,每個新行都作為新消息發布,然后在 config / producer.properties 文件中指定默認生產者屬性。 

# 在終端中鍵入幾行消息
egg1
egg2

消費者

# 與生產者類似,在 config / consumer.proper-ties 文件中指定了缺省使用者屬性。 打開一個新終端並鍵入以下消息消息語法。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

# 自動出現
egg1
egg2

Flink

Flink核心是一個流式的數據流執行引擎,其針對數據流的分布式計算提供了數據分布、數據通信以及容錯機制等功。
簡單的說就是,Flink可以對數據流進行轉換、計算、聚合等功能。如果你采集的數據需要做告警功能,那么就需要用Flink或者storm,如果只是將采集的數據進行存儲,然后展示,那么就不需要用到Flink這種技術。

比如在企業安全建設中,做監控平台就需要有告警功能,采集到的監控數據會直接往 kafka 里塞,然后告警這邊需要從 kafka topic 里面實時讀取到監控數據,並將讀取到的監控數據做一些 轉換、計算、聚合等操作,然后將計算后的結果與告警規則的閾值進行比較,然后做出相應的告警措施(釘釘群、郵件、短信、電話等)。畫了個簡單的圖如下:

flink處理靜態sql的代理流程:

這個sql只能是寫死在代碼里面,如果是想要動態的修改sql,那么就要重啟flink服務才能生效。

但是有個需求,就像下圖這樣,sql語句來之外部,因為需要讓安全人員來描述規則,他們跟進安全態勢來修改,並且需要常常更新規則來挖掘出最新安全事件,

那么就出現一個問題了,像上面的flink只能處理靜態sql,想動態處理怎么辦?

使用 flink-siddhi 來處理動態sql:
SIDDHI 是一款功能強大的open source CEP(Complex Event Processing)引擎引擎,具有自己的DSL,豐富的模式匹配功能和可擴展性,

使用Siddhi 引擎的好處就是,里面的sql語句可以任意修改,修改sql后,也不需要重啟flink服務。
siddhi引擎我最近也是剛開始學習,這里就不過多筆墨了,后面會出siddhi的專項文章。

ES

ES = Elasticsearch
Elasticsearch可以簡單的理解成NoSQL數據庫產品。

Kibana

Kibana是一款數據可視化工具,通過簡單的配置就能結合Elasticg公司的其他產品進行使用。

參考

https://www.cnblogs.com/monument/p/12944718.html
https://www.jianshu.com/p/a8b66f586fd4
http://kafka.apachecn.org/
https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
https://blog.csdn.net/leanaoo/article/details/84310604
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
https://www.cnblogs.com/fxjwind/p/5048583.html
https://baijiahao.baidu.com/s?id=1623279487849430246&wfr=spider&for=pc


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM