redis未授權訪問批量驗證腳本編寫[python]


漏洞簡介

簡單來說,redis是一個數據庫。在默認的配置下,redis綁定在0.0.0.0:6379,也就是說,如果服務器有公網ip,可以通過訪問其公網ip的6379端口來操作redis。最為致命的是,redis默認是沒有密碼驗證的,可以免密碼登錄操作,攻擊者可以通過操作redis進一步控制服務器。

漏洞的危害

  1. 無密碼驗證登錄redis后,可讀取、刪除、更改數據
  2. 攻擊者可以通過redis讀寫文件,植入后門
  3. 如果redis以root權限運行,攻擊者可以寫入ssh公鑰文件,然后即可遠程ssh登錄服務器
    ...

漏洞修復

修復方案大概有以下幾種:

  1. 把redis綁定在127.0.0.1即本地上
  2. 配置登錄驗證
  3. 防火牆設置白名單,拒絕不信任的連接
    ...

本文主要講解驗證腳本的編寫,故不再過多闡述漏洞原理、利用等細節。

驗證方式

登錄redis后,執行info命令,可以獲得類似下面的信息:

# Server
redis_version:5.0.3
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:68e47d9309ff01ae
redis_mode:standalone
...

如果登錄失敗,是不可以執行命令的,所以我們可以向目標ip的6379(redis默認端口)發起連接,發送info命令,只要得到的響應中存在上面信息中的某些獨特的字符串,如redis_version,我們就認為目標存在redis未授權訪問漏洞。

代碼如下:

...
sock = socket.socket()  # 創建套接字
try:
    sock.connect((ip, 6379))  # 連接
    sock.send(payload)  # 發送info命令
    response = sock.recv(1024).decode()  # 接收響應數據
    if 'redis_version' in response:
        result = True  # 存在漏洞
    else:
        result = False  # 不存在漏洞
except (socket.error, socket.timeout):
    # 連接失敗,可能端口6379未開放,或者被攔截,此時認為漏洞不存在
    result = False
...

好了,現在的關鍵就在:如何發送info命令?

python有操作redis的第三方庫,可以很方便的操作redis。然而,我們並不使用這些第三方庫,歸根結底,發送info命令其實是發送了一個可以讓redis服務識別的特定的數據而已,只要我們知道這個數據是什么,我們就可以使用info命令了。

下面我們就來分析,redis是如何發送info命令的。

截獲info命令

我們需要搭建一個redis環境,使用抓包工具來截獲使用info命令時redis發送的數據,為了方便,我使用了linux系統的命令netcat、tee來充當抓包工具,讀者可以自己在linux系統下搭建redis環境嘗試。

我們使用netcat連接到本地的redis服務,然后使用另一個netcat進程監聽127.0.0.1:9000,將接受的連接發來的數據,重定向至連接到redis服務的netcat進程的輸入,即可完成連接的轉發,我們在這兩個netcat經常之間,使用tee來截獲數據,流程大致如下:

流程

在實際的操作中,我們還需要使用命名管道來實現雙向通信,否則客戶端無法接受到redis的登錄響應就會阻塞,無法發生命令。

具體操作如下:

  1. 啟動redis服務
  2. 創建兩個管道文件:pipe1、pipe2
    $ mkfifo pipe1
    $ mkfifo pipe2
    
  3. 啟動一個netcat進程監聽在本地的9000端口上,以pipe1作為輸入,輸出重定向到tee進程,tee進程負責將數據輸出到pipe2和屏幕上:
    $ ncat -l 127.0.0.1 9000 < pipe1 | tee pipe2
    
  4. 啟動另一個終端,啟動netcat進程,負責連接redis服務,以pipe2作為輸入,輸出重定向到pipe1中:
    $ ncat 127.0.0.1 6379 < pipe2 > pipe1
    
  5. 用redis客戶端連接本地9000端口:
    $ redis-cli -h 127.0.0.1 -p 9000
    
  6. 在redis-cli中發送info命令,tee進程在終端上的輸出即為整個過程需要發送給redis的數據

附上動圖:
獲取payload

可以知道payload為:

*1
$4
info

寫成python字節串就是:b'*1\r\n$4\r\ninfo\r\n''\r\n'是換行符)

所以我們只需使用socket發送以上字節串即可達到同樣的效果

編寫驗證poc

驗證漏洞的代碼如下:

def poc(url):
    url = url2ip(url)  # 將url轉換成ip地址
    if url:
        port = int(url.split(':', -1)) if ':' in url else 6379 # redis默認端口是6379
        host = url.split(':')[0]

        payload = b'*1\r\n$4\r\ninfo\r\n' # 發送的數據

        s = socket.socket()
        socket.setdefaulttimeout(3)  # 設置超時時間
        try:
            s.connect((host, port))
            s.send(payload)  # 發送info命令
            response = s.recv(1024).decode()
            s.close()

            if response and 'redis_version' in data:
                return True,'%s:%s'%(host,port)
        except (socket.error, socket.timeout):
            pass
    
    return False, None

其中url轉換成ip地址的函數如下:

def url2ip(url):
    """
    url轉換成ip
    argument: url
    return: 形如www.a.com:80格式的字符串 若轉換失敗則返回None
    """

    try:
        if not url.startswith('http://') and not url.startswith('https://'):
            url = 'http://' + url
        ip = urlparse(url).netloc
        return ip
    except (ValueError, socket.gaierror):
        pass

    return None

處理輸入

我們把要驗證漏洞的目標放在一個文件里,每一行為一個目標,現在來編寫一個函數,讀取文件,將所有目標放到一個隊列里,代碼如下:

def create_queue(file_name):
    """
    創建數據隊列
    argument: file_name -> 輸入文件名
    return: data,total 數據隊列,數據總數
    """
    total = 0
    data = Queue()
    for line in open(file_name):
        url = line.strip()
        if url:
            # 跳過空白的行
            data.put(url)
            total += 1

    data.put(None)  # 結束標記
    return data,total

創建多個線程

我們的start_jobs函數用於啟動多個線程來驗證目標,其代碼如下:

def start_jobs(data, num):
    """
    啟動所有工作線程
    argument: data -> 數據隊列 num -> 線程數
    """
    is_alive = [True]
    def job():
        """工作線程"""
        while is_alive[0]:
            try:
                url = data.get()
                if url == None:
                    # 遇到結束標記
                    break
                code, result = poc(url)  # 驗證漏洞
                if code:
                    print(result)  # 存在漏洞
            except:
                is_alive[0] = False
        data.put(None)  # 結束標記
                
    jobs = [ Thread(target=job) for i in range(num) ]  # 創建多個線程
    for j in jobs:
        j.setDaemon(True)
        j.start()  # 啟動線程

    for j in jobs:
        j.join()  # 等待線程退出

編寫主程序框架

現在我們需要一個主函數來控制整個流程,代碼很簡單:

def main():
    import sys
    if len(sys.argv) != 3:
        print('Usage: python %s inputFile numOfThread' % sys.argv[0])
        return
    file_name = sys.argv[1]  # 輸入文件
    num = int(sys.argv[2])  # 線程數
    data, total = create_queue(file_name)  # 創建數據隊列
    print('total: %s' % total)
    begin = time()
    start_jobs(data, num)  # 啟動工作線程
    end = time()
    print('spent %ss' % str(end-begin))
    

if __name__ == '__main__':
    main()

使用方法

現在假設輸入文件名為input.txt,腳本文件名為redis_unauth.py,使用16個線程來批量驗證漏洞,我們可以啟動以下命令:

$ python redis_unauth.py input.txt 16

完整代碼

只是一個小腳本,就沒必要放到github上了,這里直接貼出,需要的讀者可以復制:

#!/usr/python3

'''
created by feather
'''

import socket
from threading import Thread
from queue import Queue
from time import sleep,time
from urllib.parse import urlparse

def poc(url):
    url = url2ip(url)  # 將url轉換成ip地址
    if url:
        port = int(url.split(':', -1)) if ':' in url else 6379 # redis默認端口是6379
        host = url.split(':')[0]
        payload = b'*1\r\n$4\r\ninfo\r\n' # 發送的數據
        s = socket.socket()      
        socket.setdefaulttimeout(3)  # 設置超時時間
        try:
            s.connect((host, port))
            s.send(payload)  # 發送info命令
            response = s.recv(1024).decode()
            s.close()
        
            if response and 'redis_version' in response:
                return True,'%s:%s'%(host,port)
        except (socket.error, socket.timeout):
            pass
    
    return False, None

def url2ip(url):
    """
    url轉換成ip
    argument: url
    return: 形如www.a.com:80格式的字符串 若轉換失敗則返回None
    """

    try:
        if not url.startswith('http://') and not url.startswith('https://'):
            url = 'http://' + url
        ip = urlparse(url).netloc
        return ip
    except (ValueError, socket.gaierror):
        pass

    return None
 
def create_queue(file_name):
    """
    創建數據隊列
    argument: file_name -> 輸入文件名
    return: data,total 數據隊列,數據總數
    """
    total = 0
    data = Queue()
    for line in open(file_name):
        url = line.strip()
        if url:
            # 跳過空白的行
            data.put(url)
            total += 1

    data.put(None)  # 結束標記
    return data,total

def start_jobs(data, num):
    """
    啟動所有工作線程
    argument: data -> 數據隊列 num -> 線程數
    """
    is_alive = [True]
    def job():
        """工作線程"""
        while is_alive[0]:
            try:
                url = data.get()
                if url == None:
                    # 遇到結束標記
                    break
                code, result = poc(url)  # 驗證漏洞
                if code:
                    print(result)  # 存在漏洞
            except:
                is_alive[0] = False
        data.put(None)  # 結束標記
                
    jobs = [ Thread(target=job) for i in range(num) ]  # 創建多個線程
    for j in jobs:
        j.setDaemon(True)
        j.start()  # 啟動線程

    for j in jobs:
        j.join()  # 等待線程退出

def main():
    import sys
    if len(sys.argv) != 3:
        print('Usage: python %s inputFile numOfThread' % sys.argv[0])
        return
    file_name = sys.argv[1]  # 輸入文件
    num = int(sys.argv[2])  # 線程數
    data, total = create_queue(file_name)  # 創建數據隊列
    print('total: %s' % total)
    begin = time()
    start_jobs(data, num)  # 啟動工作線程
    end = time()
    print('spent %ss' % str(end-begin))
    

if __name__ == '__main__':
    main()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM