https://www.freebuf.com/special/127172.html
https://www.freebuf.com/special/127264.html
https://www.freebuf.com/articles/network/127988.html
前言
SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹下如何使用開源軟件搭建企業的SIEM系統,數據深度分析在下篇。
SIEM的發展
對比Gartner2009年和2016年的全球SIEM廠商排名,可以清楚看出,基於大數據架構的廠商Splunk迅速崛起,傳統四強依托完整的安全產品線和成熟市場渠道,依然占據領導者象限,其他較小的廠商逐漸離開領導者象限。最重要的存儲架構也由盤櫃(可選)+商業數據庫逐漸轉變為可橫向擴展的大數據架構,支持雲環境也成為趨勢。
開源SIEM領域,比較典型的就是ossim和Opensoc,ossim存儲架構是mysql,支持多種日志格式,包括鼎鼎大名的Snort、Nmap、 Nessus以及Ntop等,對於數據規模不大的情況是個不錯的選擇,新版界面很酷炫。
完整的SIEM至少會包括以下功能:
- 漏洞管理
- 資產發現
- 入侵檢測
- 行為分析
- 日志存儲、檢索
- 報警管理
- 酷炫報表
其中最核心的我認為是入侵檢測、行為分析和日志存儲檢索,本文重點集中討論支撐上面三個功能的技術架構。
Opensoc簡介
Opensoc是思科2014年在BroCon大會上公布的開源項目,但是沒有真正開源其源代碼,只是發布了其技術框架。我們參考了Opensoc發布的架構,結合公司實際落地了一套方案。Opensoc完全基於開源的大數據框架kafka、storm、spark和es等,天生具有強大的橫向擴展能力,本文重點講解的也是基於Opensoc的siem搭建。
上圖是Opensoc給出的框架,初次看非常費解,我們以數據存儲與數據處理兩個緯度來細化,以常見的linux服務器ssh登錄日志搜集為例。
數據搜集緯度
數據搜集緯度需求是搜集原始數據,存儲,提供用戶交互式檢索的UI接口,典型場景就是出現安全事件后,通過檢索日志回溯攻擊行為,定損。
logtash其實可以直接把數據寫es,但是考慮到storm也要數據處理,所以把數據切分放到logstash,切分后的數據發送kafka,提供給storm處理和logstash寫入es。數據檢索可以直接使用kibana,非常方便。數據切分也可以在storm里面完成。這個就是大名鼎鼎的ELK架構。es比較適合存儲較短時間的熱數據的實時檢索查詢,對於需要長期存儲,並且希望使用hadoop或者spark進行大時間跨度的離線分析時,還需要存儲到hdfs上,所以比較常見的數據流程圖為:
數據處理緯度
這里以數據實時流式處理為例,storm從kafka中訂閱切分過的ssh登錄日志,匹配檢測規則,檢測結果的寫入mysql或者es。
在這個例子中,孤立看一條登錄日志難以識別安全問題,最多識別非跳板機登錄,真正運行還需要參考知識庫中的常見登錄IP、時間、IP情報等以及臨時存儲處理狀態的狀態庫中最近該IP的登錄成功與失敗情況。比較接近實際運行情況的流程如下:
具體判斷邏輯舉例如下,實際中使用大量代理IP同時暴力破解,打一槍換一個地方那種無法覆蓋,這里只是個舉例:
擴展數據源
生產環境中,處理安全事件,分析入侵行為,只有ssh登錄日志肯定是不夠,我們需要盡可能多的搜集數據源,以下作為參考:
- linux/window系統安全日志/操作日志
- web服務器訪問日志
- 數據庫SQL日志
- 網絡流量日志
簡化后的系統架構如下,報警也存es主要是查看報警也可以通過kibana,人力不足界面都不用開發了:
storm拓撲
storm拓撲支持python開發,以處理SQL日志為例子:
假設SQL日志的格式
"Feb 16 06:32:50 " "127.0.0.1" "root@localhost" "select * from user where id=1"
一般storm的拓撲結構
簡化后spout是通用的從kafka讀取數據的,就一個bolt處理SQL日志,匹配規則,命中策略即輸出”alert”:”原始SQL日志”
核心bolt代碼doSQLCheckBolt偽碼
import storm class doSQLCheckBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") sql = word[3] if re.match(規則,sql): storm.emit(["sqli",tup.values[0]]) doSQLCheckBolt().run()
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sqlLog", new kafkaSpout(), 10); builder.setBolt("sqliAlert", new doSQLCheckBolt(), 3) .shuffleGrouping("sqlLog");
拓撲提交示例
Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("doSQL", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("doSQL"); cluster.shutdown();
logstash
在本文環節中,logstash的配置量甚至超過了storm的拓撲腳本開發量,下面講下比較重點的幾個點,切割日志與檢索需求有關系,非常個性化,這里就不展開了。
從文件讀取
input
file {
path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }
從kafka中訂閱
input {
kafka {
zk_connect => "localhost:2181" group_id => "logstash" topic_id => "test" reset_beginning => false # boolean (optional), default: false consumer_threads => 5 # number (optional), default: 1 decorate_events => true # boolean (optional), default: false } }
寫kafka
output { kafka { broker_list => "localhost:9092" topic_id => "test" compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none" } }
寫hdfs
output {
hadoop_webhdfs {
workers => 2 server => "localhost:14000" user => "flume" path => "/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log" flush_size => 500 compress => "snappy" idle_flush_time => 10 retry_interval => 0.5 } }
寫es
output {
elasticsearch {
host => "localhost" protocol => "http" index => "logstash-%{type}-%{+YYYY.MM.dd}" index_type => "%{type}" workers => 5 template_overwrite => true } }
前言
SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用攻擊建模的方式識別攻擊行為。
回顧系統架構
以數據庫為例,通過logstash搜集mysql的查詢日志,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。
數據庫日志搜集
常見的數據日志搜集方式有三種:
鏡像方式
大多數數據庫審計產品都支持這種模式,通過分析數據庫流量,解碼數據庫協議,識別SQL預計,抽取出SQL日志
代理方式
比較典型的就是db-proxy方式,目前百度、搜狐、美團、京東等都有相關開源產品,前端通過db-proxy訪問后端的真實數據庫服務器。SQL日志可以直接在db-proxy上搜集。
客戶端方式
通過在數據庫服務器安裝客戶端搜集SQL日志,比較典型的方式就是通過logstash來搜集,本文以客戶端方式進行講解,其余方式本質上也是類似的。
logstash配置
安裝
下載logstash https://www.elastic.co/downloads/logstash 目前最新版本5.2.1版
開啟mysql查詢日志
mysql查詢日志
配置logstash
input {
file {
type => "mysql_sql_file"
path => "/var/log/mysql/mysql.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
output {
kafka { broker_list => "localhost:9092" topic_id => "test" compression_codec => "snappy"
運行logstash
bin/logstash -f mysql.conf
日志舉例
2017-02-16T23:29:00.813Z localhost 170216 19:10:15 37 Connect
debian-sys-maint@localhost on
2017-02-16T23:29:00.813Z localhost 37 Quit
2017-02-16T23:29:00.813Z localhost 38 Connect debian-sys-maint@localhost on
2017-02-16T23:29:00.813Z localhost 38 Query SHOW VARIABLES LIKE 'pid_file'
切詞
最簡化操作是不用進行切詞,如果喜歡自動切分出數據庫名,時間等字段,請參考:
grok語法
https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
grok語法調試
http://grokdebug.herokuapp.com/
常見攻擊特征
以常見的wavsep搭建靶場環境,請參考我的另外一篇文章《基於WAVSEP的靶場搭建指南》
使用SQL掃描鏈接
分析攻擊特征,下列列舉兩個,更多攻擊特征請大家自行總結
特征一
2017-02-16T23:29:00.993Z localhost 170216 19:19:12 46 Query SELECT username, password FROM users WHERE username='textvalue' UNION ALL SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL#' AND password='textvalue2'
使用聯合查詢枚舉數據時會產生大量的NULL字段
特征二、三
枚舉數據庫結構時會使用INFORMATION_SCHEMA,另外個別掃描器會使用GROUP BY x)a)
2017-02-16T23:29:00.998Z localhost 46 Query SELECT username, password FROM users WHERE username='textvalue' AND (SELECT 7473 FROM(SELECT COUNT(*),CONCAT(0x7171716271,(SELECT (CASE WHEN (8199= 8199) THEN 1 ELSE 0 END)),0x717a627871,FLOOR(RAND(0)*2))x FROM INFORMATION_SCHEMA.PLUGINS GROUP BY x)a)-- LFpQ' AND password='textvalue2'
hadoop離線處理
hadoop是基於map,reduce模型
簡化理解就是:
cat data.txt | ./map | ./reduce
最簡化期間,我們可以只開發map程序,在map中逐行處理日志數據,匹配攻擊行為。
以perl腳本開發,python類似
在hadoop下運行即可。
生產環境
生產環境中的規則會比這復雜很多,需要你不斷補充,這里只是舉例;
單純只編寫map會有大量的重復報警,需要開發reduce用於聚合;
應急響應時需要知道SQL注入的是那個庫,使用的是哪個賬戶,這個需要在logstash切割字段時補充;
應急響應時最好可以知道SQL注入對應的鏈接,這個需要將web的accesslog與SQL日志關聯分析,比較成熟的方案是基於機器學習,學習出基於時間的關聯矩陣;
客戶端直接搜集SQL數據要求mysql也開啟查詢日志,這個對服務器性能有較大影響,我知道的大型公司以db-prxoy方式接入為主,建議可以在db-proxy上搜集;
基於規則識別SQL注入存在瓶頸,雖然相對web日志層面以及流量層面有一定進步,SQL語義成為必然之路。
后繼
前言
SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用算法挖掘未知攻擊行為。上集傳送門
回顧系統架構
以WEB服務器日志為例,通過logstash搜集WEB服務器的查詢日志,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。
自定義日志格式
開啟httpd自定義日志格式,記錄User-Agen以及Referer
<IfModule logio_module>
# You need to enable mod_logio.c to use %I and %O LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %I %O" combinedio </IfModule> CustomLog "logs/access_log" combined
日志舉例
180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21" 180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/wp-json/ HTTP/1.1" 200 51789 "-" "print `env`" 180.76.152.166 - - [26/Feb/2017:13:12:38 +0800] "GET /wordpress/wp-admin/load-styles.php?c=0&dir=ltr&load[]=dashicons,buttons,forms,l10n,login&ver=Li4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vZXRjL3Bhc3N3ZAAucG5n HTTP/1.1" 200 35841 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21" 180.76.152.166 - - [26/Feb/2017:13:12:38 +0800] "GET /wordpress/ HTTP/1.1" 200 17442 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
測試環境
在wordpress目錄下添加測試代碼1.php,內容為phpinfo
針對1.php的訪問日志
[root@instance-8lp4smgv logs]# cat access_log | grep 'wp-admin/1.php' 125.33.206.140 - - [26/Feb/2017:13:09:47 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" 125.33.206.140 - - [26/Feb/2017:13:11:19 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" 125.33.206.140 - - [26/Feb/2017:13:13:44 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" 127.0.0.1 - - [26/Feb/2017:13:14:19 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" 127.0.0.1 - - [26/Feb/2017:13:16:04 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 107519 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" 125.33.206.140 - - [26/Feb/2017:13:16:12 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 27499 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" [root@instance-8lp4smgv logs]#
hadoop離線處理
hadoop是基於map,reduce模型
map腳本
localhost:work maidou$ cat mapper-graph.pl
#!/usr/bin/perl -w #180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21" my $line=""; while($line=<>) { if( $line=~/"GET (\S+) HTTP\/1.[01]" 2\d+ \d+ "(\S+)"/ ) { my $path=$1; my $ref=$2; if( $path=~/(\S+)\?(\S+)/ ) { $path=$1; } if( $ref=~/(\S+)\?(\S+)/ ) { $ref=$1; } if( ($ref=~/^http:\/\/180/)||( "-" eq $ref ) ) { my $line=$ref."::".$path."\n"; #printf("$ref::$path\n"); print($line); } } }
reducer腳本
localhost:work maidou$ cat reducer-graph.pl
#!/usr/bin/perl -w my %result; my $line=""; while($line=<>) { if( $line=~/(\S+)\:\:(\S+)/ ) { unless( exists($result{$line}) ) { $result{$line}=1; } } } foreach $key (sort keys %result) { if( $key=~/(\S+)\:\:(\S+)/ ) { my $ref=$1; my $path=$2;#這里是舉例,過濾你關注的webshell文件后綴,常見的有php、jsp,白名單形式過濾存在漏報風險;也可以以黑名單形式過濾你忽略的文件類型 if( $path=~/(\.php)$/ ) { my $output=$ref." -> ".$path."\n"; print($output); } } }
生成結果示例為:
- -> http://180.76.190.79/wordpress/wp-admin/1.php - -> http://180.76.190.79/wordpress/wp-admin/admin-ajax.php - -> http://180.76.190.79/wordpress/wp-admin/customize.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/edit-comments.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/profile.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-login.php http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/xmlrpc.php
圖算法
講生成數據導入圖數據庫neo4j,滿足webshell特征的為:
入度出度均為0
入度出度均為1且自己指向自己
neo4j
neo4j是一個高性能的,NOSQL圖形數據庫,它將結構化數據存儲在網絡上而不是表中,因其嵌入式、高性能、輕量級等優勢,越來越受到關注。
neo4j安裝
https://neo4j.com/ 上下載安裝包安裝,默認配置即可
ne04j啟動
以我的mac為例子,通過gui啟動即可,默認密碼為ne04j/ne04j,第一次登錄會要求更改密碼
GUI管理界面
python api庫安裝
sudo pip install neo4j-driver
下載JPype
https://pypi.python.org/pypi/JPype1
安裝JPype
tar -zxvf JPype1-0.6.2.tar.gz cd JPype1-0.6.2 sudo python setup.py install
將數據導入圖數據庫代碼如下:
B0000000B60544:freebuf liu.yan$ cat load-graph.py import re from neo4j.v1 import GraphDatabase, basic_auth nodes={} index=1 driver = GraphDatabase.driver("bolt://localhost:7687",auth=basic_auth("neo4j","maidou")) session = driver.session() file_object = open('r-graph.txt', 'r') try: for line in file_object: matchObj = re.match( r'(\S+) -> (\S+)', line, re.M|re.I) if matchObj: path = matchObj.group(1); ref = matchObj.group(2); if path in nodes.keys(): path_node = nodes[path] else: path_node = "Page%d" % index nodes[path]=path_node sql = "create (%s:Page {url:\"%s\" , id:\"%d\",in:0,out:0})" %(path_node,path,index) index=index+1 session.run(sql) #print sql if ref in nodes.keys(): ref_node = nodes[ref] else: ref_node = "Page%d" % index nodes[ref]=ref_node sql = "create (%s:Page {url:\"%s\",id:\"%d\",in:0,out:0})" %(ref_node,ref,index) index=index+1 session.run(sql) #print sql sql = "create (%s)-[:IN]->(%s)" %(path_node,ref_node) session.run(sql) #print sql sql = "match (n:Page {url:\"%s\"}) SET n.out=n.out+1" % path session.run(sql) #print sql sql = "match (n:Page {url:\"%s\"}) SET n.in=n.in+1" % ref session.run(sql) #print sql finally: file_object.close( ) session.close()
生成有向圖如下
查詢入度為1出度均為0的結點或者查詢入度出度均為1且指向自己的結點,由於把ref為空的情況也識別為”-”結點,所以入度為1出度均為0。
優化點
生產環境實際使用中,我們遇到誤報分為以下幾種:
主頁,各種index頁面(第一個誤報就是這種)
phpmyadmin、zabbix等運維管理后台
hadoop、elk等開源軟件的控制台
API接口
這些通過短期加白可以有效解決,比較麻煩的是掃描器對結果的影響(第二個誤報就是這種),這部分需要通過掃描器指紋或者使用高大上的人機算法來去掉干擾。
后記
使用算法來挖掘未知攻擊行為是目前非常流行的一個研究方向,本文只是介紹了其中比較好理解和實現的一種算法,該算法並非我首創,不少安全公司也都或多或少有過實踐。篇幅有限,我將陸續在企業安全建設專題其他文章中由淺入深介紹其他算法。算法或者說機器學習本質是科學規律在大數據集集合上趨勢體現,所以很難做到精准報警,目前階段還是需要通過各種規則和模型來輔助,不過對於挖掘未知攻擊行為確實是一支奇兵。