# URL異常檢測


(Isolation Forest無監督)這個算法是隨機森林的推廣。
iTree樹構造:隨機選一個屬性,再隨機選該特征的一個值,對樣本進行二叉划分,重復以上操作。
iTree構建好了后,就可以對數據進行預測啦,預測的過程就是把測試記錄在iTree上走一下,看測試記錄落在哪個葉子節點。iTree能有效檢測異常的假設是:異常點一般都是非常稀有的,在iTree中會很快被划分到葉子節點,因此可以用葉子節點到根節點的路徑h(x)長度來判斷一條記錄x是否是異常點。
Alt text

越接近1表示是異常點的可能性高;
越接近0表示是正常點的可能性比較高;
如果大部分的訓練樣本的s(x,n)都接近於0.5,整個數據沒有明顯的異常。

Iforest構造:隨機采樣抽取一部分數據構造每一棵樹,保證樹的差異性。
與隨機森林的區別:
1、隨機森林需要采樣集樣本個數等於訓練集個數,IForest也是隨機采樣,但樣本個數小於訓練集個數。因為我們的目的是異常點檢測,只需要部分的樣本我們一般就可以將異常點區別出來了
2、IForest采用隨機選擇一個划分特征,對划分特征隨機選擇一個划分閾值,RF划分需要使用信息增益或者信息增益率作為選擇屬性和閾值的依據。

URL異常檢測(IForest)

數據源在這里
正常請求的數據

/103886/
/rcanimal/
/458010b88d9ce/
/cclogovs/
/using-localization/
/121006_dakotacwpressconf/

惡意請求的數據

/top.php?stuff='uname >q36497765 #
/h21y8w52.nsf?
/ca000001.pl?action=showcart&hop=">&path=acatalog/
/scripts/edit_image.php?dn=1&userfile=/etc/passwd&userfile_name= ;id;

這里就需要有文本向量化的知識,調用sklearn中CountVectorizer

from sklearn.feature_extraction.text import CountVectorizer  
vectorizer=CountVectorizer()
corpus=["I come to China to travel", 
    "This is a car polupar in China",          
    "I love tea and Apple ",   
    "The work is to write some papers in science"] 
print(vectorizer.fit_transform(corpus))
'''文本的序號,詞的序號,詞頻'''
	 (0, 16)	     1
	 (0, 3)	         1
	 (0, 15)	     2
print(vectorizer.get_feature_names())#I是停用詞不統計,打印分割好的詞向量
['and', 'apple', 'car', 'china', 'come', 'in', 'is', 'love', 'papers', 'polupar', 'science', 'some', 'tea', 'the', 'this', 'to', 'travel', 'work', 'write']
print(vectorizer.fit_transform(corpus).toarray()) //轉換成矩陣打印,四行代表四句話,列上的數字代表出現頻率,統計詞頻后的19維特征做為文本分類的輸入

Alt text
TF-IDF是一種統計方法,用以評估一字詞對於一個文件集或一個語料庫中的其中一份文件的重要程度。
TF(Term Frequency)詞頻,某個詞在文章中出現的次數或頻率。
IDF(inverse document frequency)逆文檔頻率。一個詞語“權重”的度量,一個詞越常見,IDF越低。
IDF計算公式:
Alt text
N代表語料庫中文本的總數,而N(x)代表語料庫中包含詞x的文本總數,有對應的詞庫
Alt text

from sklearn.feature_extraction.text import TfidfTransformer  
from sklearn.feature_extraction.text import CountVectorizer  

corpus=["I come to China to travel", 
    "This is a car polupar in China",          
    "I love tea and Apple ",   
    "The work is to write some papers in science"] 

vectorizer=CountVectorizer()

transformer = TfidfTransformer()
tfidf = transformer.fit_transform(vectorizer.fit_transform(corpus))  
print(tfidf)

得到(文本的序號,詞的序號,TF-IDF值)
Alt text
這塊知識的相關參考文章:
https://www.cnblogs.com/pinard/p/6693230.html
https://www.cnblogs.com/pinard/p/6693230.html

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import IsolationForest
from sklearn.metrics import confusion_matrix
import itertools
from sklearn.metrics import accuracy_score
#data
good_data=pd.read_csv('goodqueries.txt',names=['url'])
good_data['label']=0
data=good_data
data.head()
##feature
vectorizer = TfidfVectorizer(min_df = 0.0, analyzer="char", sublinear_tf=True, ngram_range=(1,3)) #converting data to vectors
X = vectorizer.fit_transform(data['url'].values.astype('U')) #TF-IDF向量化

Alt text
分為訓練集和測試集

X_train, X_test, y_train, y_test = train_test_split(X, data['label'].values, test_size=0.2, random_state=42) #splitting data
print(X_train) #y_test標簽都設置為0表示都是正常數據集
clf=IsolationForest()
clf.fit(X_train)
y_pre = clf.predict(X_test)
ny_pre = np.asarray(y_pre)
ny_pre[ny_pre==1] = 0 ##最后輸出1為正常,-1為異常 ==>0是正常,1是異常點
ny_pre[ny_pre==-1] = 1

ny_test = np.asarray(y_test)  #y_test都是0因為只導入了goodqueries.txt數據集
accuracy_score(ny_test,ny_pre)

Alt text

URL異常檢測(LSTM)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019/6/4 9:25
# @Author  : afanti

import sys
import os
import json
import pandas as pd
import numpy
import optparse
from keras.callbacks import TensorBoard
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from keras.layers.embeddings import Embedding
from keras.preprocessing import sequence
from keras.preprocessing.text import Tokenizer
from collections import OrderedDict
from keras.models import load_model
from keras.models import model_from_json
def model():
    dataframe = pd.read_csv('goodqueries.txt', names=['url'])
    dataframe['label']=0
    # dataframe.head()
    dataframe1 = pd.read_csv('badqueries.txt', names=['url'])
    dataframe1['label']=1
    # dataframe1.head()
    dataset=pd.concat([dataframe,dataframe1])
    dataset=dataset.sample(frac=1).values
    X = dataset[:,0]
    Y = dataset[:,1]
    for i in range(len(X)):
        if type(X[i])==float:
            X[i]=str(X[i])
    tokenizer = Tokenizer(filters='\t\n', char_level=True)
    tokenizer.fit_on_texts(X)
    X = tokenizer.texts_to_sequences(X) #序列的列表,列表中每個序列對應於一段輸入文本
    word_dict_file = 'build/word-dictionary.json'

    if not os.path.exists(os.path.dirname(word_dict_file)):
        os.makedirs(os.path.dirname(word_dict_file))

    with open(word_dict_file, 'w',encoding='utf-8') as outfile:
        json.dump(tokenizer.word_index, outfile, ensure_ascii=False) #將單詞(字符串)映射為它們的排名或者索引

    num_words = len(tokenizer.word_index)+1 #174

    max_log_length = 100
    train_size = int(len(dataset) * .75)

    # padding
    X_processed = sequence.pad_sequences(X, maxlen=max_log_length)
    # 划分樣本集
    X_train, X_test = X_processed[0:train_size], X_processed[train_size:len(X_processed)]
    Y_train, Y_test = Y[0:train_size], Y[train_size:len(Y)]

    model = Sequential()
    model.add(Embedding(num_words, 32, input_length=max_log_length))
    model.add(Dropout(0.5))
    model.add(LSTM(64, recurrent_dropout=0.5))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    model.summary()
    model.fit(X_train, Y_train, validation_split=0.25, epochs=3, batch_size=128)
    # Evaluate model
    score, acc = model.evaluate(X_test, Y_test, verbose=1, batch_size=128)
    print("Model Accuracy: {:0.2f}%".format(acc * 100))
    # Save model
    model.save_weights('securitai-lstm-weights.h5')
    model.save('securitai-lstm-model.h5')
    with open('securitai-lstm-model.json', 'w') as outfile:
        outfile.write(model.to_json())

    df_black = pd.read_csv('badqueries.txt', names=['url'], nrows=20000)
    df_black['label'] = 1
    X_waf = df_black['url'].values.astype('str')
    Y_waf = df_black['label'].values.astype('str')
    X_sequences = tokenizer.texts_to_sequences(X_waf)
    X_processed = sequence.pad_sequences(X_sequences, maxlen=max_log_length)
    score, acc = model.evaluate(X_processed, Y_waf, verbose=1, batch_size=128)
    print("Model Accuracy: {:0.2f}%".format(acc * 100))

這里用到了keras的Tokenizer分詞器

from keras.preprocessing.text import Tokenizer
import keras
tokenizer = Tokenizer(char_level=True)
text = ["/javascript/nets.png", "/javascript/legacy.swf"]
tokenizer.fit_on_texts(text)
# tokenizer.word_counts                       #[[2, 5]]   False
tokenizer.texts_to_sequences(["nets swf"]) 
tokenizer.word_index

當char_level=True時,會按char-level將字符頻度進行統計,如下圖a出現次數最多,
Alt text

輸出結果,nets swf輸出在上面圖查找:

#[[11, 12, 6, 3, 3, 17, 18]] True 語義沒了

輸入數據做了padding操作,這里補成了128維

# padding
    X_processed = sequence.pad_sequences(X, maxlen=max_log_length)

Alt text
相關Tokenizer操作看這里:
https://blog.csdn.net/wcy23580/article/details/84885734
https://blog.csdn.net/wcy23580/article/details/84957471
訓練模型前Embedding層做一次word embedding,單詞嵌入是使用密集的矢量表示來表示單詞和文檔的一類方法。output_dim這是嵌入單詞的向量空間的大小,input_dim這是文本數據中詞匯的取值可能數這里是173,input_length:輸入文檔都由1000個字組成,那么input_length就是1000.
下面我們定義一個詞匯表為173的嵌入層(例如從1到173的整數編碼的字,包括1到173),一個32維的向量空間,其中將嵌入單詞,以及輸入文檔,每個單詞有128個字符
嵌入層自帶學習的權重,如果將模型保存到文件中,則將包含嵌入圖層的權重。

model = Sequential()
model.add(Embedding(input_dim=num_words, output_dim=32, input_length=max_log_length))
model.add(Dropout(0.5))
model.add(LSTM(64, recurrent_dropout=0.5))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

總的來說就是char-level+統計向量化+詞嵌入+神經網絡LSTM

相關參考在這里:https://juejin.im/entry/5acc23f26fb9a028d1416bb3

URL異常檢測(LR)

導入數據

# -*- coding:utf8 -*-
from sklearn.externals import joblib
from sklearn.feature_extraction.text import TfidfVectorizer
import os
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
import urllib.parse
from sklearn.pipeline import Pipeline
import matplotlib.pyplot as plt

def loadFile(name):
    directory = str(os.getcwd())
    filepath = os.path.join(directory, name)
    with open(filepath,'r',encoding="utf8") as f:
        data = f.readlines()
    data = list(set(data))
    result = []
    for d in data:
        d = str(urllib.parse.unquote(d))   #converting url encoded data to simple string
        result.append(d)
    return result

badQueries = loadFile('badqueries.txt')
validQueries = loadFile('goodqueries.txt')

badQueries = list(set(badQueries))
validQueries = list(set(validQueries))
allQueries = badQueries + validQueries
yBad = [1 for i in range(0, len(badQueries))]  #labels, 1 for malicious and 0 for clean
yGood = [0 for i in range(0, len(validQueries))]
y = yBad + yGood
queries = allQueries

划分測試集訓練集

X_train, X_test, y_train, y_test = train_test_split(queries, y, test_size=0.2, random_state=42) #splitting data
badCount = len(badQueries) #44532
validCount = len(validQueries) #1265974

Pipeline可以將許多算法模型串聯起來,比如將特征提取、歸一化、分類組織在一起形成一個典型的機器學習問題工作流。
tfidf TfidfVectorizer類參數analyzer定義特征為詞(word)或n-gram字符。
sublinear_tf 應用線性縮放TF,例如,使用1+log(tf)覆蓋tf。
ngram_range要提取的n-gram的n-values的下限和上限范圍class_weight={0:0.9, 1:0.1},這樣類型0的權重為90%,而類型1的權重為10%。

pipe_lr = Pipeline([('tfidf', TfidfVectorizer(min_df = 0.0, analyzer="char", sublinear_tf=True, ngram_range=(1,3))),
                    ('clf', LogisticRegression(class_weight="balanced"))
                    ])
pipe_lr.fit(X_train, y_train)

predicted = pipe_lr.predict(X_test)
predicted=list(predicted)
fpr, tpr, _ = metrics.roc_curve(y_test, (pipe_lr.predict_proba(X_test)[:, 1]))
auc = metrics.auc(fpr, tpr)

print("Bad samples: %d" % badCount)
print("Good samples: %d" % validCount)
print("Baseline Constant negative: %.6f" % (validCount / (validCount + badCount)))
print("------------")
print("Accuracy: %f" % pipe_lr.score(X_test, y_test))  #checking the accuracy
print("Precision: %f" % metrics.precision_score(y_test, predicted))
print("Recall: %f" % metrics.recall_score(y_test, predicted))
print("F1-Score: %f" % metrics.f1_score(y_test, predicted))
print("AUC: %f" % auc)
joblib.dump(pipe_lr,"lr.pkl")

Alt text
測試

from urllib.parse import urlparse
from sklearn.externals import joblib
lr=joblib.load("lr.pkl")
def url(url):
    try:
        parsed_url=urlparse(url)
        paths=parsed_url.path+parsed_url.query
        result=lr.predict([paths])
        
        if result==[0]:
            return False
        else:
            return True
    except Exception as err:
        #print(str(err))
        pass

result=url('http://br-ofertasimperdiveis.epizy.com/examples/jsp/cal/feedsplitter.php?format=../../../../../../../../../../etc/passwd\x00&debug=1')
result1=url('http://br-ofertasimperdiveis.epizy.com/produto.php?linkcompleto=iphone-6-plus-apple-64gb-cinza')
result2=url('http://br-ofertasimperdiveis.epizy.com/?q=select * from x')

Alt text

char-level/word-level+n-gram+tfidf一把梭下來,可以解決很多問題了,包括較長文本和短文本,而安全中的很多關鍵信息都可以看成是長文本和短文本,比如域名請求,惡意代碼,惡意文件。

https://www.freebuf.com/articles/network/131279.html
https://github.com/exp-db/AI-Driven-WAF/blob/master/waf.py
https://www.kdnuggets.com/2017/02/machine-learning-driven-firewall.html
N-Gram模型理解
總參考連接:
https://xz.aliyun.com/t/5288#toc-4


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM