掃描器開發 - 1


背景

由於手工測試過於繁瑣,而且基本上常見的漏洞判斷都是重復動作。

一般在滲透測試挖掘漏洞的基本流程如下:

數據包解析

數據包解析一般包括 請求方法解析、參數解析、http/s協議識別,這里我偷個懶使用burpsuite的接口,然后將header、method、參數組合為一個字典

然后通過socket 傳送到掃描端,就省去了自己去解析參數。

    # PARAM_URL 0 , PARAM_BODY 1
    def getParamaters(self, params, ptype):
        params_dict = {}
        for i in params:
            if i.getType() == ptype:
                # params_dict[i.getName()] = json.loads(self._helpers.urlDecode(i.getValue()))
                params_dict[i.getName()] = self._helpers.urlDecode(i.getValue())
        return params_dict

    def parseRequest(self, messageInfo):
        httpService = messageInfo.getHttpService()
        analyzeRequest = self._helpers.analyzeRequest(messageInfo)
        host = httpService.getHost()
        port = httpService.getPort()
        protocol = httpService.getProtocol()
        method = analyzeRequest.getMethod()
        full_url = analyzeRequest.getUrl().toString()
        bp_headers = analyzeRequest.getHeaders()
        content_type = analyzeRequest.getContentType()
        # self.stdout.println(host + str(port) + protocol)
        reqUri, bp1_headers = '\r\n'.join(bp_headers).split('\r\n', 1)
        headers = dict(re.findall(r"(?P<name>.*?): (?P<value>.*?)\r\n", bp1_headers + '\r\n'))
        # self.stdout.println(headers)
        body = messageInfo.getRequest()[analyzeRequest.getBodyOffset():].tostring() if messageInfo.getRequest()[
                                                                                       analyzeRequest.getBodyOffset():].tostring() else '{}'
        params = analyzeRequest.getParameters()
        paramsINURL = self.getParamaters(params, 0)
        paramsINBODY = self.getParamaters(params, 1)
        send_data = {}
        send_data['host'] = host
        send_data['port'] = port
        send_data['protocol'] = protocol
        send_data['method'] = method
        send_data['full_url'] = full_url
        send_data['headers'] = headers
        send_data['content_type'] = content_type
        send_data['body'] = body
        send_data['param_in_url'] = paramsINURL
        send_data['param_in_body'] = paramsINBODY

發送的數據為:

{'headers': {u'Accept': u'*/*', u'PDD-CONFIG': u'V4:002.059900', u'User-Agent': u'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) ', u'Connection': u'close', u'Host': u'101.35.212.35', u'Accept-Encoding': u'gzip, deflate', u'vip': u'101.35.212.35'}, 'method': u'GET', 'full_url': u'http://101.35.212.35:80/d?id=25196&ttl=1&dn=1', 'param_in_body': {}, 'body': '{}', 'protocol': u'http', 'content_type': 0, 'port': 80, 'host': u'101.35.212.35', 'param_in_url': {u'dn': u'1', u'ttl': u'1', u'id': u'25196'}}

http參數處理

平時我們在測試漏洞一般過程為 替換參數value,然后發送請求,根據響應或者dnslog的一些返回來判斷漏洞是否存在。所以我們開發自動化漏洞掃描器就是要模擬手工測試行為。

上面一步我們已經將參數都解析出來並生成一個dict。

常見參數形式包括:

GET 或 POST application/x-www-form-urlencoded

a=1
a={"x":123}
a={"x":[1,2,3]}
a={"x":{"y":"bbb"}}
a={"x":{"y":["bbb","ccc"]}}
a={"x":{"y":{"bbb":"ccc"}}}
a={"x":{"y":{"bbb":[1,2]}}}

POST application/json

{"x":123}
{"x":[1,2,3]}
{"x":{"y":"bbb"}}
{"x":{"y":["bbb","ccc"]}}
{"x":{"y":{"bbb":"ccc"}}}
{"x":{"y":{"bbb":[1,2]}}}

還有多層嵌套json 的結構。

這里需要分別對每個參數值替換或者追加payload。這里直接采用了 https://github.com/w-digital-scanner/w13scan/blob/cd6935719edec9ad8131561a2a93bbf07024cf72/W13SCAN/lib/core/common.py#L430 的 updateJsonObjectFromStr 方法,對此做了一些微小的改動,可以支持無限嵌套dict、list的解析和payloa替換追加。

    def updateJsonObjectFromStr(self, base_obj, update_str: str, mode: int):
        """
        為數據中的value 添加 、替換為 update_str
        :param base_obj:
        :param update_str:
        :param mode: 0, 替換  1 追加  2 ssrf
        :return: 返回帶有update_str的字典
        """
        assert (type(base_obj) in (list, dict))
        base_obj = copy.deepcopy(base_obj)
        # 存儲上一個value是str的對象,為的是更新當前值之前,將上一個值還原
        last_obj = None
        # 如果last_obj是dict,則為字符串,如果是list,則為int,為的是last_obj[last_key]執行合法
        last_key = None
        last_value = None
        # 存儲當前層的對象,只有list或者dict類型的對象,才會被添加進來
        curr_list = [base_obj]
        # 只要當前層還存在dict或list類型的對象,就會一直循環下去
        while len(curr_list) > 0:
            # 用於臨時存儲當前層的子層的list和dict對象,用來替換下一輪的當前層
            tmp_list = []
            for obj in curr_list:
                # 對於字典的情況
                if type(obj) is dict:
                    for k, v in obj.items():
                        if k not in self.black_params_list:
                            # 如果不是list, dict, str類型,直接跳過  {"action":"xx","data":{"isPreview":false}}  這里不會替換isPreview, 他是bool類型
                            if type(v) not in (list, dict, str, int):
                                continue
                            # list, dict類型,直接存儲,放到下一輪
                            if type(v) in (list, dict):
                                tmp_list.append(v)
                            # 字符串類型的處理
                            else:
                                # 如果上一個對象不是None的,先更新回上個對象的值
                                if last_obj is not None:
                                    last_obj[last_key] = last_value
                                # 重新綁定上一個對象的信息
                                last_obj = obj
                                last_key, last_value = k, v
                                # 執行更新
                                if mode == 0:
                                    obj[k] = update_str
                                elif mode == 1:
                                    obj[k] = str(v) + update_str
                                elif mode == 2:
                                    obj[k] = self.generate_ssrf_payload(update_str)
                                # 生成器的形式,返回整個字典
                                yield base_obj

                # 列表類型和字典差不多
                elif type(obj) is list:
                    for i in range(len(obj)):
                        # 為了和字典的邏輯統一,也寫成k,v的形式,下面就和字典的邏輯一樣了,可以把下面的邏輯抽象成函數
                        k, v = i, obj[i]
                        if v not in self.black_params_list:
                            if type(v) not in (list, dict, str, int):
                                continue
                            if type(v) in (list, dict):
                                tmp_list.append(v)
                            else:
                                if last_obj is not None:
                                    last_obj[last_key] = last_value
                                last_obj = obj
                                last_key, last_value = k, v
                                if mode == 0:
                                    obj[k] = update_str
                                elif mode == 1:
                                    obj[k] = str(v) + update_str
                                elif mode == 2:
                                    obj[k] = self.generate_ssrf_payload(update_str)
                                yield base_obj
            curr_list = tmp_list

生成的數據如下:每一個http請求都為一個字典

[{
	'headers': {
		'Accept': '*/*',
		'PDD-CONFIG': 'V4:002.059900',
		'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) ',
		'Connection': 'close',
		'Host': '101.35.212.35',
		'Accept-Encoding': 'gzip, deflate',
		'vip': '101.35.212.35'
	},
	'method': 'GET',
	'full_url': 'http://101.35.212.35:80/d?id=25196&ttl=1&dn=[1,2,3]&x={"a":{"b":"y"}}',
	'param_in_body': {},
	'body': '{}',
	'protocol': 'http',
	'content_type': 0,
	'port': 80,
	'host': '101.35.212.35',
	'param_in_url': {
		'dn': [1, 2, 3],
		'ttl': 'PAYLOAD',
		'x': {
			'a': {
				'b': 'y'
			}
		},
		'id': 25196
	}
}, {
	'headers': {
		'Accept': '*/*',
		'PDD-CONFIG': 'V4:002.059900',
		'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) ',
		'Connection': 'close',
		'Host': '101.35.212.35',
		'Accept-Encoding': 'gzip, deflate',
		'vip': '101.35.212.35'
	},
	'method': 'GET',
	'full_url': 'http://101.35.212.35:80/d?id=25196&ttl=1&dn=[1,2,3]&x={"a":{"b":"y"}}',
	'param_in_body': {},
	'body': '{}',
	'protocol': 'http',
	'content_type': 0,
	'port': 80,
	'host': '101.35.212.35',
	'param_in_url': {
		'dn': [1, 2, 3],
		'ttl': 1,
		'x': {
			'a': {
				'b': 'y'
			}
		},
		'id': 'PAYLOAD'
	}
}, {
	'headers': {
		'Accept': '*/*',
		'PDD-CONFIG': 'V4:002.059900',
		'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) ',
		'Connection': 'close',
		'Host': '101.35.212.35',
		'Accept-Encoding': 'gzip, deflate',
		'vip': '101.35.212.35'
	},
	'method': 'GET',
	'full_url': 'http://101.35.212.35:80/d?id=25196&ttl=1&dn=[1,2,3]&x={"a":{"b":"y"}}',
	'param_in_body': {},
	'body': '{}',
	'protocol': 'http',
	'content_type': 0,
	'port': 80,
	'host': '101.35.212.35',
	'param_in_url': {
		'dn': ['PAYLOAD', 2, 3],
		'ttl': 1,
		'x': {
			'a': {
				'b': 'y'
			}
		},
		'id': 25196
	}
}, {
	'headers': {
		'Accept': '*/*',
		'PDD-CONFIG': 'V4:002.059900',
		'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) ',
		'Connection': 'close',
		'Host': '101.35.212.35',
		'Accept-Encoding': 'gzip, deflate',
		'vip': '101.35.212.35'
	},
	'method': 'GET',
	'full_url': 'http://101.35.212.35:80/d?id=25196&ttl=1&dn=[1,2,3]&x={"a":{"b":"y"}}',
	'param_in_body': {},
	'body': '{}',
	'protocol': 'http',
	'content_type': 0,
	'port': 80,
	'host': '101.35.212.35',
	'param_in_url': {
		'dn': [1, 'PAYLOAD', 3],
		'ttl': 1,
		'x': {
			'a': {
				'b': 'y'
			}
		},
		'id': 25196
	}
}, {
	'headers': {
		'Accept': '*/*',
		'PDD-CONFIG': 'V4:002.059900',
		'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) ',
		'Connection': 'close',
		'Host': '101.35.212.35',
		'Accept-Encoding': 'gzip, deflate',
		'vip': '101.35.212.35'
	},
	'method': 'GET',
	'full_url': 'http://101.35.212.35:80/d?id=25196&ttl=1&dn=[1,2,3]&x={"a":{"b":"y"}}',
	'param_in_body': {},
	'body': '{}',
	'protocol': 'http',
	'content_type': 0,
	'port': 80,
	'host': '101.35.212.35',
	'param_in_url': {
		'dn': [1, 2, 'PAYLOAD'],
		'ttl': 1,
		'x': {
			'a': {
				'b': 'y'
			}
		},
		'id': 25196
	}
}, {
	'headers': {
		'Accept': '*/*',
		'PDD-CONFIG': 'V4:002.059900',
		'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) ',
		'Connection': 'close',
		'Host': '101.35.212.35',
		'Accept-Encoding': 'gzip, deflate',
		'vip': '101.35.212.35'
	},
	'method': 'GET',
	'full_url': 'http://101.35.212.35:80/d?id=25196&ttl=1&dn=[1,2,3]&x={"a":{"b":"y"}}',
	'param_in_body': {},
	'body': '{}',
	'protocol': 'http',
	'content_type': 0,
	'port': 80,
	'host': '101.35.212.35',
	'param_in_url': {
		'dn': [1, 2, 3],
		'ttl': 1,
		'x': {
			'a': {
				'b': 'PAYLOAD'
			}
		},
		'id': 25196
	}
}]

http重放所需的元素生成完成就需要進行重放,這里采用了 requests 庫。

    def assemble_parameter(self, d):
		"""
        組裝參數為字符串
        """
        return '&'.join([k if v is None else '{0}={1}'.format(k, json.dumps(v, separators=(',', ':')) if isinstance(v, (dict,list)) else v) for k, v in d.items()])


    def sendGetRequest(self, url, p, h, protocol):
        """
        發送get請求數據
        :param url:  url
        :param p: get參數
        :param h:  請求頭
        :param protocol: http or https
        :return:
        """
        if self.use_proxy == 'YES':
            if protocol == 'https':
                return requests.get(url=self.parseUrl(url), params=self.assemble_parameter(p), headers=h, proxies=self.proxy, verify=False,
                             allow_redirects=self.redirect)
            else:
                return requests.get(url=self.parseUrl(url), params=self.assemble_parameter(p), headers=h, proxies=self.proxy,
                             allow_redirects=self.redirect)
        else:
            if protocol == 'https':
                return requests.get(url=self.parseUrl(url), params=self.assemble_parameter(p), headers=h, verify=False, allow_redirects=self.redirect)
            else:
                return requests.get(url=self.parseUrl(url), params=self.assemble_parameter(p), headers=h, allow_redirects=self.redirect)

    def sendPostRequest(self, url, p, d, h, protocol):
        """
        發送post請求
        :param url: url
        :param p: get參數
        :param d:  post data
        :param h:  請求頭
        :param protocol: http or https
        :return:
        """
        if self.use_proxy == 'YES':
            if protocol == 'https':
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=self.assemble_parameter(d), headers=h, proxies=self.proxy, verify=False,
                              allow_redirects=self.redirect)
            else:
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=self.assemble_parameter(d), headers=h, proxies=self.proxy,
                              allow_redirects=self.redirect)
        else:
            if protocol == 'https':
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=self.assemble_parameter(d), headers=h, verify=False,
                              allow_redirects=self.redirect)
            else:
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=self.assemble_parameter(d), headers=h, allow_redirects=self.redirect)

    def sendPostJsonRequest(self, url, p, d, h, protocol):
        """
        發送 application/json 數據
        :param url:
        :param p:
        :param d:
        :param h:
        :param protocol:
        :return:
        """
        if self.use_proxy == 'YES':
            if protocol == 'https':
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=json.dumps(d, separators=(',', ':')), headers=h, proxies=self.proxy,
                              verify=False, allow_redirects=self.redirect)
            else:
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=json.dumps(d, separators=(',', ':')), headers=h, proxies=self.proxy,
                              allow_redirects=self.redirect)
        else:
            if protocol == 'https':
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=json.dumps(d, separators=(',', ':')), headers=h, verify=False,
                              allow_redirects=self.redirect)
            else:
                return requests.post(url=self.parseUrl(url), params=self.assemble_parameter(p), data=json.dumps(d, separators=(',', ':')), headers=h,
                              allow_redirects=self.redirect)

    def processRequest(self, request_data):
        """
        重放http/s 數據
        :param request_data:
        :return:
        """
        h = self.pop_black_headers(request_data['headers'])
        protocol = request_data['protocol']
        method = request_data['method']
        content_type = request_data['content_type']
        url = request_data['full_url']
        param_in_url = request_data['param_in_url']
        param_in_body = request_data['param_in_body']
        body = request_data['body']
        if method == 'GET' and param_in_url:
            return self.sendGetRequest(url, param_in_url, h, protocol)
        elif method == 'POST' and content_type == 1:
            return self.sendPostRequest(url, param_in_url, param_in_body, h, protocol)
        elif method == 'POST' and content_type == 4:
            return self.sendPostJsonRequest(url, param_in_url, body, h, protocol)

注意在重放前需要 忽略一些請求頭和自定義忽略參數

content-length
if-modified-since
if-none-match
pragma
cache-control

SQL注入識別

注入可分為 報錯注入、盲注,由於現在waf比較多,所以考慮用盡量不觸發waf的基礎上來進行探測注入。

這里主要探討盲注的探測方式。
這里使用了余弦相似度算法

self.bool_str_tuple = ('\'', '\'\'')
self.bool_str_tuple_second = ("'||'x", "'||'")
self.bool_str_tuple_third = ("'+'x", "'+'") if self.content_type == 4 else ("'%2b'x", "'%2b'")
self.bool_int_tuple = ('-x', '-0', '-false')
self.bool_order_tuple = (",1-x", ",1",",true")

1、頁面不存在隨機值的時候

  • str 類型注入判斷流程

  • int類型注入判斷流程

  • order by 類型注入判斷流程

2、 頁面存在隨機值干擾

可參考:https://mp.weixin.qq.com/s/iX8_C53QKGCL0XjqdrqbPQ,會存在一定誤報和漏報。

SSRF漏洞探測

這個比較簡單批量替換參數為dnslog地址。

有時候dnslog有延時,所以我們可考慮將ssrf探測請求全加入到數據庫。

生成唯一的ssrf地址

    def generate_uuid(self):
        """
        生成唯一字符串
        :return:
        """
        return ''.join(str(uuid.uuid4()).split('-'))[0:10]

    def generate_ssrf_payload(self, s):
        """
        生成SSRF dnslog 域名
        :return:
        """
        poc = self.generate_uuid() + '.'+ s + '.' + self.ssrfpayload
        self.ssrf_list.append(poc)
        return "http://" + poc

socket 服務端接受請求

class MyUDPServer(ThreadingMixIn, UDPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, queue=None):
        self.queue = queue
        UDPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)


class MyUDPHandler(socketserver.BaseRequestHandler):
    def __init__(self, request, client_address, server):
        self.queue = server.queue
        BaseRequestHandler.__init__(self, request, client_address, server)

    def parse(self,p):
        x = {}
        for k,v in p.items():
            try:
                v1 = json.loads(v)
            except:
                v1 = v
            x[k] = v1
        return x

    def handle(self):  # 必須要有handle方法;所有處理必須通過handle方法實現
        # self.request is the Udp socket connected to the client
        self.data = self.request[0].strip()
        data_dict = eval(self.data.decode('utf-8'))
        data_dict['param_in_url'] = self.parse(data_dict['param_in_url'])
        data_dict['param_in_body'] = self.parse(data_dict['param_in_body'])
        self.queue.put(data_dict)

if __name__ == "__main__":
    logger = CommonLog(__name__).getlog()
    HOST, PORT = "127.0.0.1", 8883
    queue = queue.Queue()
    model = CosineSimilarity()
    server = MyUDPServer((HOST, PORT), MyUDPHandler, queue=queue)  # 實例化一個多線程UDPServer
    server.max_packet_size = 8192 * 20
    # Start the server
    SERVER_THREAD = threading.Thread(target=server.serve_forever)
    SERVER_THREAD.daemon = True
    SERVER_THREAD.start()
    logger.info('----- udp server start at 127.0.0.1:8083 ----')
    while True:
        while not queue.empty():
            data = queue.get()
            http = HttpWappalyzer()
            content_type = data['content_type']
            sqlbool = SQLBool(http, model, content_type)
            sqlboolThread = threading.Thread(target=sqlbool.scan, args=(copy.deepcopy(data),))
            sqlboolThread.start()
            sqlboolThread.join()

使用 https://www.vulnspy.com/dvwa-wooyun/ 靶場進行測試,基本能探測出所有的SQL注入漏洞。


參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM