企業安全建設之搭建開源SIEM平台


https://www.freebuf.com/special/127172.html

https://www.freebuf.com/special/127264.html

https://www.freebuf.com/articles/network/127988.html

前言

SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹下如何使用開源軟件搭建企業的SIEM系統,數據深度分析在下篇。

SIEM的發展

對比Gartner2009年和2016年的全球SIEM廠商排名,可以清楚看出,基於大數據架構的廠商Splunk迅速崛起,傳統四強依托完整的安全產品線和成熟市場渠道,依然占據領導者象限,其他較小的廠商逐漸離開領導者象限。最重要的存儲架構也由盤櫃(可選)+商業數據庫逐漸轉變為可橫向擴展的大數據架構,支持雲環境也成為趨勢。

1.png

2.jpg

開源SIEM領域,比較典型的就是ossim和Opensoc,ossim存儲架構是mysql,支持多種日志格式,包括鼎鼎大名的Snort、Nmap、 Nessus以及Ntop等,對於數據規模不大的情況是個不錯的選擇,新版界面很酷炫。

3.png

完整的SIEM至少會包括以下功能:

  • 漏洞管理
  • 資產發現
  • 入侵檢測
  • 行為分析
  • 日志存儲、檢索
  • 報警管理
  • 酷炫報表

其中最核心的我認為是入侵檢測、行為分析和日志存儲檢索,本文重點集中討論支撐上面三個功能的技術架構。

Opensoc簡介

Opensoc是思科2014年在BroCon大會上公布的開源項目,但是沒有真正開源其源代碼,只是發布了其技術框架。我們參考了Opensoc發布的架構,結合公司實際落地了一套方案。Opensoc完全基於開源的大數據框架kafka、storm、spark和es等,天生具有強大的橫向擴展能力,本文重點講解的也是基於Opensoc的siem搭建。

4.png

上圖是Opensoc給出的框架,初次看非常費解,我們以數據存儲與數據處理兩個緯度來細化,以常見的linux服務器ssh登錄日志搜集為例。

數據搜集緯度

數據搜集緯度需求是搜集原始數據,存儲,提供用戶交互式檢索的UI接口,典型場景就是出現安全事件后,通過檢索日志回溯攻擊行為,定損。

11.png

logtash其實可以直接把數據寫es,但是考慮到storm也要數據處理,所以把數據切分放到logstash,切分后的數據發送kafka,提供給storm處理和logstash寫入es。數據檢索可以直接使用kibana,非常方便。數據切分也可以在storm里面完成。這個就是大名鼎鼎的ELK架構。es比較適合存儲較短時間的熱數據的實時檢索查詢,對於需要長期存儲,並且希望使用hadoop或者spark進行大時間跨度的離線分析時,還需要存儲到hdfs上,所以比較常見的數據流程圖為:

6.png

數據處理緯度

這里以數據實時流式處理為例,storm從kafka中訂閱切分過的ssh登錄日志,匹配檢測規則,檢測結果的寫入mysql或者es。

12.png

在這個例子中,孤立看一條登錄日志難以識別安全問題,最多識別非跳板機登錄,真正運行還需要參考知識庫中的常見登錄IP、時間、IP情報等以及臨時存儲處理狀態的狀態庫中最近該IP的登錄成功與失敗情況。比較接近實際運行情況的流程如下:

14.png

具體判斷邏輯舉例如下,實際中使用大量代理IP同時暴力破解,打一槍換一個地方那種無法覆蓋,這里只是個舉例:

9.png

擴展數據源

生產環境中,處理安全事件,分析入侵行為,只有ssh登錄日志肯定是不夠,我們需要盡可能多的搜集數據源,以下作為參考:

  • linux/window系統安全日志/操作日志
  • web服務器訪問日志
  • 數據庫SQL日志
  • 網絡流量日志

簡化后的系統架構如下,報警也存es主要是查看報警也可以通過kibana,人力不足界面都不用開發了:

10.png

storm拓撲

storm拓撲支持python開發,以處理SQL日志為例子:

假設SQL日志的格式

 

"Feb 16 06:32:50 " "127.0.0.1" "root@localhost" "select * from user where id=1"

 

一般storm的拓撲結構

截圖.png

簡化后spout是通用的從kafka讀取數據的,就一個bolt處理SQL日志,匹配規則,命中策略即輸出”alert”:”原始SQL日志”

核心bolt代碼doSQLCheckBolt偽碼

import storm class doSQLCheckBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") sql = word[3] if re.match(規則,sql): storm.emit(["sqli",tup.values[0]]) doSQLCheckBolt().run() 
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sqlLog", new kafkaSpout(), 10); builder.setBolt("sqliAlert", new doSQLCheckBolt(), 3) .shuffleGrouping("sqlLog");

拓撲提交示例

Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("doSQL", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("doSQL"); cluster.shutdown();

logstash

在本文環節中,logstash的配置量甚至超過了storm的拓撲腳本開發量,下面講下比較重點的幾個點,切割日志與檢索需求有關系,非常個性化,這里就不展開了。

從文件讀取

 

input
    file {
        path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }

 

從kafka中訂閱

input {
    kafka {
        zk_connect => "localhost:2181" group_id => "logstash" topic_id => "test" reset_beginning => false # boolean (optional), default: false consumer_threads => 5 # number (optional), default: 1 decorate_events => true # boolean (optional), default: false } }

寫kafka

 

output { kafka { broker_list => "localhost:9092" topic_id => "test" compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none" } }

 

寫hdfs

 

output {
    hadoop_webhdfs {
        workers => 2 server => "localhost:14000" user => "flume" path => "/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log" flush_size => 500 compress => "snappy" idle_flush_time => 10 retry_interval => 0.5 } }

 

寫es

output {
    elasticsearch {
        host => "localhost" protocol => "http" index => "logstash-%{type}-%{+YYYY.MM.dd}" index_type => "%{type}" workers => 5 template_overwrite => true } }

 

前言

SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用攻擊建模的方式識別攻擊行為。

回顧系統架構

截圖 (1).png

以數據庫為例,通過logstash搜集mysql的查詢日志,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。

數據庫日志搜集

常見的數據日志搜集方式有三種:

鏡像方式

大多數數據庫審計產品都支持這種模式,通過分析數據庫流量,解碼數據庫協議,識別SQL預計,抽取出SQL日志

截圖 (2).png

代理方式

比較典型的就是db-proxy方式,目前百度、搜狐、美團、京東等都有相關開源產品,前端通過db-proxy訪問后端的真實數據庫服務器。SQL日志可以直接在db-proxy上搜集。

截圖 (3).png

客戶端方式

通過在數據庫服務器安裝客戶端搜集SQL日志,比較典型的方式就是通過logstash來搜集,本文以客戶端方式進行講解,其余方式本質上也是類似的。

logstash配置

安裝

下載logstash https://www.elastic.co/downloads/logstash 目前最新版本5.2.1版

開啟mysql查詢日志

截圖 (4).png

mysql查詢日志

截圖 (5).png

配置logstash

 

input {

file {

type => "mysql_sql_file"

path => "/var/log/mysql/mysql.log"

start_position => "beginning"

sincedb_path => "/dev/null"

}

}

output {

kafka { broker_list => "localhost:9092" topic_id => "test" compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none" }

}

 

運行logstash

 

bin/logstash -f mysql.conf

日志舉例

 

2017-02-16T23:29:00.813Z localhost 170216 19:10:15 37 Connect

debian-sys-maint@localhost on

2017-02-16T23:29:00.813Z localhost 37 Quit

2017-02-16T23:29:00.813Z localhost 38 Connect debian-sys-maint@localhost on

2017-02-16T23:29:00.813Z localhost 38 Query SHOW VARIABLES LIKE 'pid_file'

 

 

切詞

最簡化操作是不用進行切詞,如果喜歡自動切分出數據庫名,時間等字段,請參考:

grok語法

https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

grok語法調試

http://grokdebug.herokuapp.com/

常見攻擊特征

以常見的wavsep搭建靶場環境,請參考我的另外一篇文章《基於WAVSEP的靶場搭建指南

使用SQL掃描鏈接

截圖 (6).png

分析攻擊特征,下列列舉兩個,更多攻擊特征請大家自行總結

特征一

2017-02-16T23:29:00.993Z localhost 170216 19:19:12   46 Query SELECT username, password FROM users WHERE username='textvalue' UNION ALL SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL#' AND password='textvalue2'

使用聯合查詢枚舉數據時會產生大量的NULL字段

特征二、三

枚舉數據庫結構時會使用INFORMATION_SCHEMA,另外個別掃描器會使用GROUP BY x)a)

2017-02-16T23:29:00.998Z localhost    46 Query SELECT username, password FROM users WHERE username='textvalue' AND (SELECT 7473 FROM(SELECT COUNT(*),CONCAT(0x7171716271,(SELECT (CASE WHEN (8199= 8199) THEN 1 ELSE 0 END)),0x717a627871,FLOOR(RAND(0)*2))x FROM INFORMATION_SCHEMA.PLUGINS GROUP BY x)a)-- LFpQ' AND password='textvalue2'

hadoop離線處理

hadoop是基於map,reduce模型

截圖 (7).png

簡化理解就是:

 

cat data.txt | ./map | ./reduce

 

最簡化期間,我們可以只開發map程序,在map中逐行處理日志數據,匹配攻擊行為。

以perl腳本開發,python類似

 

#!/usr/bin/perl -w

my $rule="(null,){3,}|information_schema|GROUP BY x\\)a\\)";

my $line="";

while($line=<>)

{

if( $line=~/$rule/i )

{

printf($line);

}

}

 

在hadoop下運行即可。

生產環境

生產環境中的規則會比這復雜很多,需要你不斷補充,這里只是舉例;

單純只編寫map會有大量的重復報警,需要開發reduce用於聚合;

應急響應時需要知道SQL注入的是那個庫,使用的是哪個賬戶,這個需要在logstash切割字段時補充;

應急響應時最好可以知道SQL注入對應的鏈接,這個需要將web的accesslog與SQL日志關聯分析,比較成熟的方案是基於機器學習,學習出基於時間的關聯矩陣;

客戶端直接搜集SQL數據要求mysql也開啟查詢日志,這個對服務器性能有較大影響,我知道的大型公司以db-prxoy方式接入為主,建議可以在db-proxy上搜集;

基於規則識別SQL注入存在瓶頸,雖然相對web日志層面以及流量層面有一定進步,SQL語義成為必然之路。

后繼

 

前言

SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用算法挖掘未知攻擊行為。上集傳送門

回顧系統架構

截圖.png

以WEB服務器日志為例,通過logstash搜集WEB服務器的查詢日志,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。

自定義日志格式

開啟httpd自定義日志格式,記錄User-Agen以及Referer

 

<IfModule logio_module>
      # You need to enable mod_logio.c to use %I and %O LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %I %O" combinedio </IfModule> CustomLog "logs/access_log" combined

日志舉例

 

180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21" 180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/wp-json/ HTTP/1.1" 200 51789 "-" "print `env`" 180.76.152.166 - - [26/Feb/2017:13:12:38 +0800] "GET /wordpress/wp-admin/load-styles.php?c=0&dir=ltr&load[]=dashicons,buttons,forms,l10n,login&ver=Li4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vZXRjL3Bhc3N3ZAAucG5n HTTP/1.1" 200 35841 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21" 180.76.152.166 - - [26/Feb/2017:13:12:38 +0800] "GET /wordpress/ HTTP/1.1" 200 17442 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"

 

測試環境

在wordpress目錄下添加測試代碼1.php,內容為phpinfo

截圖 (1).png

針對1.php的訪問日志

 

[root@instance-8lp4smgv logs]# cat access_log | grep 'wp-admin/1.php' 125.33.206.140 - - [26/Feb/2017:13:09:47 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" 125.33.206.140 - - [26/Feb/2017:13:11:19 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" 125.33.206.140 - - [26/Feb/2017:13:13:44 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" 127.0.0.1 - - [26/Feb/2017:13:14:19 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" 127.0.0.1 - - [26/Feb/2017:13:16:04 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 107519 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" 125.33.206.140 - - [26/Feb/2017:13:16:12 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 27499 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" [root@instance-8lp4smgv logs]#

 

hadoop離線處理

hadoop是基於map,reduce模型

map腳本

 

localhost:work maidou$ cat mapper-graph.pl 
#!/usr/bin/perl -w #180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21" my $line=""; while($line=<>) { if( $line=~/"GET (\S+) HTTP\/1.[01]" 2\d+ \d+ "(\S+)"/ ) { my $path=$1; my $ref=$2; if( $path=~/(\S+)\?(\S+)/ ) { $path=$1; } if( $ref=~/(\S+)\?(\S+)/ ) { $ref=$1; } if( ($ref=~/^http:\/\/180/)||( "-" eq $ref ) ) { my $line=$ref."::".$path."\n"; #printf("$ref::$path\n"); print($line); } } }

 

reducer腳本

 

localhost:work maidou$ cat reducer-graph.pl 
#!/usr/bin/perl -w my %result; my $line=""; while($line=<>) { if( $line=~/(\S+)\:\:(\S+)/ ) { unless( exists($result{$line}) ) { $result{$line}=1; } } } foreach $key (sort keys %result) { if( $key=~/(\S+)\:\:(\S+)/ ) { my $ref=$1; my $path=$2;#這里是舉例,過濾你關注的webshell文件后綴,常見的有php、jsp,白名單形式過濾存在漏報風險;也可以以黑名單形式過濾你忽略的文件類型 if( $path=~/(\.php)$/ ) { my $output=$ref." -> ".$path."\n"; print($output); } } }

 

 

生成結果示例為:

 

- -> http://180.76.190.79/wordpress/wp-admin/1.php - -> http://180.76.190.79/wordpress/wp-admin/admin-ajax.php - -> http://180.76.190.79/wordpress/wp-admin/customize.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/edit-comments.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/profile.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-login.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/xmlrpc.php

圖算法

講生成數據導入圖數據庫neo4j,滿足webshell特征的為:

入度出度均為0

入度出度均為1且自己指向自己

neo4j

neo4j是一個高性能的,NOSQL圖形數據庫,它將結構化數據存儲在網絡上而不是表中,因其嵌入式、高性能、輕量級等優勢,越來越受到關注。

截圖 (2).png

neo4j安裝

https://neo4j.com/ 上下載安裝包安裝,默認配置即可

ne04j啟動

以我的mac為例子,通過gui啟動即可,默認密碼為ne04j/ne04j,第一次登錄會要求更改密碼

截圖 (3).png

GUI管理界面

截圖 (4).png

python api庫安裝

sudo pip install neo4j-driver

下載JPype

https://pypi.python.org/pypi/JPype1

安裝JPype

 

tar -zxvf JPype1-0.6.2.tar.gz cd JPype1-0.6.2 sudo python setup.py install

 

將數據導入圖數據庫代碼如下:

 

B0000000B60544:freebuf liu.yan$ cat load-graph.py import re from neo4j.v1 import GraphDatabase, basic_auth nodes={} index=1 driver = GraphDatabase.driver("bolt://localhost:7687",auth=basic_auth("neo4j","maidou")) session = driver.session() file_object = open('r-graph.txt', 'r') try: for line in file_object: matchObj = re.match( r'(\S+) -> (\S+)', line, re.M|re.I) if matchObj: path = matchObj.group(1); ref = matchObj.group(2); if path in nodes.keys(): path_node = nodes[path] else: path_node = "Page%d" % index nodes[path]=path_node sql = "create (%s:Page {url:\"%s\" , id:\"%d\",in:0,out:0})" %(path_node,path,index) index=index+1 session.run(sql) #print sql if ref in nodes.keys(): ref_node = nodes[ref] else: ref_node = "Page%d" % index nodes[ref]=ref_node sql = "create (%s:Page {url:\"%s\",id:\"%d\",in:0,out:0})" %(ref_node,ref,index) index=index+1 session.run(sql) #print sql sql = "create (%s)-[:IN]->(%s)" %(path_node,ref_node) session.run(sql) #print sql sql = "match (n:Page {url:\"%s\"}) SET n.out=n.out+1" % path session.run(sql) #print sql sql = "match (n:Page {url:\"%s\"}) SET n.in=n.in+1" % ref session.run(sql) #print sql finally: file_object.close( ) session.close()

 

生成有向圖如下

截圖 (5).png

截圖 (6).png

查詢入度為1出度均為0的結點或者查詢入度出度均為1且指向自己的結點,由於把ref為空的情況也識別為”-”結點,所以入度為1出度均為0。

截圖 (7).png

優化點

生產環境實際使用中,我們遇到誤報分為以下幾種:

主頁,各種index頁面(第一個誤報就是這種)

phpmyadmin、zabbix等運維管理后台

hadoop、elk等開源軟件的控制台

API接口

這些通過短期加白可以有效解決,比較麻煩的是掃描器對結果的影響(第二個誤報就是這種),這部分需要通過掃描器指紋或者使用高大上的人機算法來去掉干擾。

后記

使用算法來挖掘未知攻擊行為是目前非常流行的一個研究方向,本文只是介紹了其中比較好理解和實現的一種算法,該算法並非我首創,不少安全公司也都或多或少有過實踐。篇幅有限,我將陸續在企業安全建設專題其他文章中由淺入深介紹其他算法。算法或者說機器學習本質是科學規律在大數據集集合上趨勢體現,所以很難做到精准報警,目前階段還是需要通過各種規則和模型來輔助,不過對於挖掘未知攻擊行為確實是一支奇兵。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM