HMM XSS檢測


HMM XSS檢測

轉自:http://www.freebuf.com/articles/web/133909.html

前言

上篇我們介紹了HMM的基本原理以及常見的基於參數的異常檢測實現,這次我們換個思路,把機器當一個剛入行的白帽子,我們訓練他學會XSS的攻擊語法,然后再讓機器從訪問日志中尋找符合攻擊語法的疑似攻擊日志。

原理圖.png

通過詞法分割,可以把攻擊載荷序列化成觀察序列,舉例如下:

序列化.jpg

詞集/詞袋模型

詞集和詞袋模型是機器學習中非常常用的一個數據處理模型,它們用於特征化字符串型數據。一般思路是將樣本分詞后,統計每個詞的頻率,即詞頻,根據需要選擇全部或者部分詞作為哈希表鍵值,並依次對該哈希表編號,這樣就可以使用該哈希表對字符串進行編碼。

  • 詞集模型:單詞構成的集合,集合自然每個元素都只有一個,也即詞集中的每個單詞都只有一個
  • 詞袋模型:如果一個單詞在文檔中出現不止一次,並統計其出現的次數

本章使用詞集模型即可。

假設存在如下數據集合:

    dataset = [['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'], ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'], ['my', 'dalmation', 'is', 'so', 'cute', 'I', 'love', 'him'], ['stop', 'posting', 'stupid', 'worthless', 'garbage'], ['mr', 'licks', 'ate', 'my', 'steak', 'how', 'to', 'stop', 'him'], ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]

首先生成詞匯表:

vocabSet = set() for doc in dataset: vocabSet |= set(doc) vocabList = list(vocabSet)

根據詞匯表生成詞集:

# 詞集模型 SOW = [] for doc in dataset: vec = [0]*len(vocabList) for i, word in enumerate(vocabList): if word in doc: vec[i] = 1 SOW.append(doc)

 

簡化后的詞集模型的核心代碼如下:

fredist = nltk.FreqDist(tokens_list) # 單文件詞頻 keys=fredist.keys() keys=keys[:max] #只提取前N個頻發使用的單詞 其余泛化成0 for localkey in keys: # 獲取統計后的不重復詞集 if localkey in wordbag.keys(): # 判斷該詞是否已在詞集中 continue else: wordbag[localkey] = index_wordbag index_wordbag += 1

數據處理與特征提取

常見的XSS攻擊載荷列舉如下:

<script>alert('XSS')</script> %3cscript%3ealert('XSS')%3c/script%3e %22%3e%3cscript%3ealert('XSS')%3c/script%3e <IMG SRC="javascript:alert('XSS');"> <IMG SRC=javascript:alert("XSS")> <IMG SRC=javascript:alert('XSS')> <img src=xss onerror=alert(1)> <IMG """><SCRIPT>alert("XSS")</SCRIPT>"> <IMG SRC=javascript:alert(String.fromCharCode(88,83,83))> <IMG SRC="jav ascript:alert('XSS');"> <IMG SRC="jav ascript:alert('XSS');"> <BODY BACKGROUND="javascript:alert('XSS')"> <BODY ONLOAD=alert('XSS')>

需要支持的詞法切分原則為:

單雙引號包含的內容 ‘XSS’

http/https鏈接 http://xi.baidu.com/xss.js

<>標簽 <script>

<>標簽開頭 <BODY

屬性標簽 ONLOAD=

<>標簽結尾 >

函數體 “javascript:alert(‘XSS’);”

字符數字標量 代碼實現舉例如下:

tokens_pattern = r'''(?x) "[^"]+" |http://\S+ |</\w+> |<\w+> |<\w+ |\w+= |> |\w+\([^<]+\) #函數 比如alert(String.fromCharCode(88,83,83)) |\w+ ''' words=nltk.regexp_tokenize(line, tokens_pattern)

另外,為了減少向量空間,需要把數字和字符以及超鏈接范化,具體原則為:

#數字常量替換成8 line, number = re.subn(r'\d+', "8", line) #ulr日換成http://u line, number = re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?]+', "http://u", line) #干掉注釋 line, number = re.subn(r'\/\*.?\*\/', "", line) 范化后分詞效果示例為: #原始參數值:"><img src=x onerror=prompt(0)>) #分詞后: ['>', '<img', 'src=', 'x', 'onerror=', 'prompt(8)', '>']

 

#原始參數值:<iframe src="x-javascript:alert(document.domain);"></iframe>) #分詞后: ['<iframe', 'src=', '"x-javascript:alert(document.domain);"', '>', '</iframe>'] #原始參數值:<marquee><h1>XSS by xss</h1></marquee> ) #分詞后: ['<marquee>', '<h8>', 'XSS', 'by', 'xss', '</h8>', '</marquee>'] #原始參數值:<script>-=alert;-(1)</script> "onmouseover="confirm(document.domain);"" </script>) #分詞后: ['<script>', 'alert', '8', '</script>', '"onmouseover="', 'confirm(document.domain)', '</script>'] #原始參數值:<script>alert(2)</script> "><img src=x onerror=prompt(document.domain)>) #分詞后: ['<script>', 'alert(8)', '</script>', '>', '<img', 'src=', 'x', 'onerror=', 'prompt(document.domain)', '>']

結合詞集模型,完整的流程舉例如下:

詞集模型處理流程.png

訓練模型

將范化后的向量X以及對應的長度矩陣X_lens輸入即可,需要X_lens的原因是參數樣本的長度可能不一致,所以需要單獨輸入。

remodel = hmm.GaussianHMM(n_components=3, covariance_type="full", n_iter=100) remodel.fit(X,X_lens)

驗證模型

整個系統運行過程如下:

系統運行流程.png

驗證階段利用訓練出來的HMM模型,輸入觀察序列獲取概率,從而判斷觀察序列的合法性,訓練樣本是1000條典型的XSS攻擊日志,通過分詞、計算詞集,提煉出200個特征,全部樣本就用這200個特征進行編碼並序列化,使用20000條正常日志和20000條XSS攻擊識別(類似JSFUCK這類編碼的暫時不支持),准確率達到90%以上,其中驗證環節的核心代碼如下:

with open(filename) as f: for line in f: line = line.strip('\n') line = urllib.unquote(line) h = HTMLParser.HTMLParser() line = h.unescape(line) if len(line) >= MIN_LEN: line, number = re.subn(r'\d+', "8", line) line, number = re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?:]+', "http://u", line) line, number = re.subn(r'\/\*.?\*\/', "", line) words = do_str(line) vers = [] for word in words: if word in wordbag.keys(): vers.append([wordbag[word]]) else: vers.append([-1]) np_vers = np.array(vers) pro = remodel.score(np_vers) if pro >= T: print "SCORE:(%d) XSS_URL:(%s) " % (pro,line)

較完整的代碼如下:

# -*- coding:utf-8 -*-

import sys
import urllib
import urlparse
import re
from hmmlearn import hmm
import numpy as np
from sklearn.externals import joblib
import HTMLParser
import nltk


#處理參數值的最小長度
MIN_LEN=10

#狀態個數
N=5
#最大似然概率閾值
T=-200
#字母
#數字 1
#<>,:"'
#其他字符2
SEN=['<','>',',',':','\'','/',';','"','{','}','(',')']

index_wordbag=1 #詞袋索引
wordbag={} #詞袋

#</script><script>alert(String.fromCharCode(88,83,83))</script>
#<IMG SRC=x onchange="alert(String.fromCharCode(88,83,83))">
#<;IFRAME SRC=http://ha.ckers.org/scriptlet.html <;
#';alert(String.fromCharCode(88,83,83))//\';alert(String.fromCharCode(88,83,83))//";alert(String.fromCharCode(88,83,83))
# //\";alert(String.fromCharCode(88,83,83))//--></SCRIPT>">'><SCRIPT>alert(String.fromCharCode(88,83,83))</SCRIPT>
tokens_pattern = r'''(?x)
 "[^"]+"
|http://\S+
|</\w+>
|<\w+>
|<\w+
|\w+=
|>
|\w+\([^<]+\) #函數 比如alert(String.fromCharCode(88,83,83))
|\w+
'''

def ischeck(str):
    if re.match(r'^(http)',str):
        return False
    for i, c in enumerate(str):
        if ord(c) > 127 or ord(c) < 31:
            return False
        if c in SEN:
            return True
        #排除中文干擾 只處理127以內的字符


    return False


def do_str(line):
    words=nltk.regexp_tokenize(line, tokens_pattern)
    #print  words
    return words

def load_wordbag(filename,max=100):
    X = [[0]]
    X_lens = [1]
    tokens_list=[]
    global wordbag
    global index_wordbag

    with open(filename) as f:
        for line in f:
            line=line.strip('\n')
            #url解碼
            line=urllib.unquote(line)
            #處理html轉義字符
            h = HTMLParser.HTMLParser()
            line=h.unescape(line)
            if len(line) >= MIN_LEN:
                #print "Learning xss query param:(%s)" % line
                #數字常量替換成8
                line, number = re.subn(r'\d+', "8", line)
                #ulr日換成http://u
                line, number = re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?:=]+', "http://u", line)
                #干掉注釋
                line, number = re.subn(r'\/\*.?\*\/', "", line)
                #print "Learning xss query etl param:(%s) " % line
                tokens_list+=do_str(line)

            #X=np.concatenate( [X,vers])
            #X_lens.append(len(vers))


    fredist = nltk.FreqDist(tokens_list)  # 單文件詞頻
    keys=fredist.keys()
    keys=keys[:max]
    for localkey in keys:  # 獲取統計后的不重復詞集
        if localkey in wordbag.keys():  # 判斷該詞是否已在詞袋中
            continue
        else:
            wordbag[localkey] = index_wordbag
            index_wordbag += 1

    print "GET wordbag size(%d)" % index_wordbag
def main(filename):
    X = [[-1]]
    X_lens = [1]
    X = []
    X_lens = []
    global wordbag
    global index_wordbag

    with open(filename) as f:
        for line in f:
            line=line.strip('\n')
            #url解碼
            line=urllib.unquote(line)
            #處理html轉義字符
            h = HTMLParser.HTMLParser()
            line=h.unescape(line)
            vers=[]
            if len(line) >= MIN_LEN:
                #print "Learning xss query param:(%s)" % line
                #數字常量替換成8
                line, number = re.subn(r'\d+', "8", line)
                #ulr日換成http://u
                line, number = re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?:]+', "http://u", line)
                #干掉注釋
                line, number = re.subn(r'\/\*.?\*\/', "", line)
                #print "Learning xss query etl param:(%s) " % line
                words=do_str(line)
                for word in words:
                    if word in wordbag.keys():
                        vers.append([wordbag[word]])
                    else:
                        vers.append([-1])
                    print word, vers
            np_vers = np.array(vers)
            print "np_vers:", np_vers, "X:", X
            #print np_vers
            X=np.concatenate([X,np_vers])
            X_lens.append(len(np_vers))
            #print X_lens



    remodel = hmm.GaussianHMM(n_components=N, covariance_type="full", n_iter=100)
    print X
    remodel.fit(X,X_lens)
    joblib.dump(remodel, "xss-train.pkl")

    return remodel

def test(remodel,filename):
    with open(filename) as f:
        for line in f:
            line = line.strip('\n')
            # url解碼
            line = urllib.unquote(line)
            # 處理html轉義字符
            h = HTMLParser.HTMLParser()
            line = h.unescape(line)

            if len(line) >= MIN_LEN:
                #print  "CHK XSS_URL:(%s) " % (line)
                    # 數字常量替換成8
                line, number = re.subn(r'\d+', "8", line)
                    # ulr日換成http://u
                line, number = re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?:]+', "http://u", line)
                    # 干掉注釋
                line, number = re.subn(r'\/\*.?\*\/', "", line)
                    # print "Learning xss query etl param:(%s) " % line
                words = do_str(line)
                #print "GET Tokens (%s)" % words
                vers = []
                for word in words:
                    # print "ADD %s" % word
                    if word in wordbag.keys():
                        vers.append([wordbag[word]])
                    else:
                        vers.append([-1])
                np_vers = np.array(vers)
                #print np_vers
                        #print  "CHK SCORE:(%d) QUREY_PARAM:(%s) XSS_URL:(%s) " % (pro, v, line)
                pro = remodel.score(np_vers)

                if pro >= T:
                    print  "SCORE:(%d) XSS_URL:(%s) " % (pro,line)
                        #print line

def test_normal(remodel,filename):
    with open(filename) as f:
        for line in f:
            # 切割參數
            result = urlparse.urlparse(line)
            # url解碼
            query = urllib.unquote(result.query)
            params = urlparse.parse_qsl(query, True)

            for k, v in params:
                v=v.strip('\n')
                #print  "CHECK v:%s LINE:%s " % (v, line)

                if len(v) >= MIN_LEN:
                    # print  "CHK XSS_URL:(%s) " % (line)
                    # 數字常量替換成8
                    v, number = re.subn(r'\d+', "8", v)
                    # ulr日換成http://u
                    v, number = re.subn(r'(http|https)://[a-zA-Z0-9\.@&/#!#\?:]+', "http://u", v)
                    # 干掉注釋
                    v, number = re.subn(r'\/\*.?\*\/', "", v)
                    # print "Learning xss query etl param:(%s) " % line
                    words = do_str(v)
                    # print "GET Tokens (%s)" % words
                    vers = []
                    for word in words:
                        # print "ADD %s" % word
                        if word in wordbag.keys():
                            vers.append([wordbag[word]])
                        else:
                            vers.append([-1])

                    np_vers = np.array(vers)
                    # print np_vers
                    # print  "CHK SCORE:(%d) QUREY_PARAM:(%s) XSS_URL:(%s) " % (pro, v, line)
                    pro = remodel.score(np_vers)
                    print  "CHK SCORE:(%d) QUREY_PARAM:(%s)" % (pro, v)
                    #if pro >= T:
                        #print  "SCORE:(%d) XSS_URL:(%s) " % (pro, v)
                        #print line

if __name__ == '__main__':
    #test(remodel,sys.argv[2])
    load_wordbag(sys.argv[1],2000)
    #print  wordbag.keys()
    remodel = main(sys.argv[1])
    #test_normal(remodel, sys.argv[2])
    test(remodel, sys.argv[2])

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM