漏洞簡介
簡單來說,redis是一個數據庫。在默認的配置下,redis綁定在0.0.0.0:6379,也就是說,如果服務器有公網ip,可以通過訪問其公網ip的6379端口來操作redis。最為致命的是,redis默認是沒有密碼驗證的,可以免密碼登錄操作,攻擊者可以通過操作redis進一步控制服務器。
漏洞的危害
- 無密碼驗證登錄redis后,可讀取、刪除、更改數據
- 攻擊者可以通過redis讀寫文件,植入后門
- 如果redis以root權限運行,攻擊者可以寫入ssh公鑰文件,然后即可遠程ssh登錄服務器
...
漏洞修復
修復方案大概有以下幾種:
- 把redis綁定在127.0.0.1即本地上
- 配置登錄驗證
- 防火牆設置白名單,拒絕不信任的連接
...
本文主要講解驗證腳本的編寫,故不再過多闡述漏洞原理、利用等細節。
驗證方式
登錄redis后,執行info命令,可以獲得類似下面的信息:
# Server
redis_version:5.0.3
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:68e47d9309ff01ae
redis_mode:standalone
...
如果登錄失敗,是不可以執行命令的,所以我們可以向目標ip的6379(redis默認端口)發起連接,發送info命令,只要得到的響應中存在上面信息中的某些獨特的字符串,如redis_version
,我們就認為目標存在redis未授權訪問漏洞。
代碼如下:
...
sock = socket.socket() # 創建套接字
try:
sock.connect((ip, 6379)) # 連接
sock.send(payload) # 發送info命令
response = sock.recv(1024).decode() # 接收響應數據
if 'redis_version' in response:
result = True # 存在漏洞
else:
result = False # 不存在漏洞
except (socket.error, socket.timeout):
# 連接失敗,可能端口6379未開放,或者被攔截,此時認為漏洞不存在
result = False
...
好了,現在的關鍵就在:如何發送info命令?
python有操作redis的第三方庫,可以很方便的操作redis。然而,我們並不使用這些第三方庫,歸根結底,發送info命令其實是發送了一個可以讓redis服務識別的特定的數據而已,只要我們知道這個數據是什么,我們就可以使用info命令了。
下面我們就來分析,redis是如何發送info命令的。
截獲info命令
我們需要搭建一個redis環境,使用抓包工具來截獲使用info命令時redis發送的數據,為了方便,我使用了linux系統的命令netcat、tee來充當抓包工具,讀者可以自己在linux系統下搭建redis環境嘗試。
我們使用netcat連接到本地的redis服務,然后使用另一個netcat進程監聽127.0.0.1:9000,將接受的連接發來的數據,重定向至連接到redis服務的netcat進程的輸入,即可完成連接的轉發,我們在這兩個netcat經常之間,使用tee來截獲數據,流程大致如下:
在實際的操作中,我們還需要使用命名管道來實現雙向通信,否則客戶端無法接受到redis的登錄響應就會阻塞,無法發生命令。
具體操作如下:
- 啟動redis服務
- 創建兩個管道文件:pipe1、pipe2
$ mkfifo pipe1 $ mkfifo pipe2
- 啟動一個netcat進程監聽在本地的9000端口上,以pipe1作為輸入,輸出重定向到tee進程,tee進程負責將數據輸出到pipe2和屏幕上:
$ ncat -l 127.0.0.1 9000 < pipe1 | tee pipe2
- 啟動另一個終端,啟動netcat進程,負責連接redis服務,以pipe2作為輸入,輸出重定向到pipe1中:
$ ncat 127.0.0.1 6379 < pipe2 > pipe1
- 用redis客戶端連接本地9000端口:
$ redis-cli -h 127.0.0.1 -p 9000
- 在redis-cli中發送info命令,tee進程在終端上的輸出即為整個過程需要發送給redis的數據
附上動圖:
可以知道payload為:
*1
$4
info
寫成python字節串就是:b'*1\r\n$4\r\ninfo\r\n'
('\r\n'
是換行符)
所以我們只需使用socket發送以上字節串即可達到同樣的效果
編寫驗證poc
驗證漏洞的代碼如下:
def poc(url):
url = url2ip(url) # 將url轉換成ip地址
if url:
port = int(url.split(':', -1)) if ':' in url else 6379 # redis默認端口是6379
host = url.split(':')[0]
payload = b'*1\r\n$4\r\ninfo\r\n' # 發送的數據
s = socket.socket()
socket.setdefaulttimeout(3) # 設置超時時間
try:
s.connect((host, port))
s.send(payload) # 發送info命令
response = s.recv(1024).decode()
s.close()
if response and 'redis_version' in data:
return True,'%s:%s'%(host,port)
except (socket.error, socket.timeout):
pass
return False, None
其中url轉換成ip地址的函數如下:
def url2ip(url):
"""
url轉換成ip
argument: url
return: 形如www.a.com:80格式的字符串 若轉換失敗則返回None
"""
try:
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://' + url
ip = urlparse(url).netloc
return ip
except (ValueError, socket.gaierror):
pass
return None
處理輸入
我們把要驗證漏洞的目標放在一個文件里,每一行為一個目標,現在來編寫一個函數,讀取文件,將所有目標放到一個隊列里,代碼如下:
def create_queue(file_name):
"""
創建數據隊列
argument: file_name -> 輸入文件名
return: data,total 數據隊列,數據總數
"""
total = 0
data = Queue()
for line in open(file_name):
url = line.strip()
if url:
# 跳過空白的行
data.put(url)
total += 1
data.put(None) # 結束標記
return data,total
創建多個線程
我們的start_jobs函數用於啟動多個線程來驗證目標,其代碼如下:
def start_jobs(data, num):
"""
啟動所有工作線程
argument: data -> 數據隊列 num -> 線程數
"""
is_alive = [True]
def job():
"""工作線程"""
while is_alive[0]:
try:
url = data.get()
if url == None:
# 遇到結束標記
break
code, result = poc(url) # 驗證漏洞
if code:
print(result) # 存在漏洞
except:
is_alive[0] = False
data.put(None) # 結束標記
jobs = [ Thread(target=job) for i in range(num) ] # 創建多個線程
for j in jobs:
j.setDaemon(True)
j.start() # 啟動線程
for j in jobs:
j.join() # 等待線程退出
編寫主程序框架
現在我們需要一個主函數來控制整個流程,代碼很簡單:
def main():
import sys
if len(sys.argv) != 3:
print('Usage: python %s inputFile numOfThread' % sys.argv[0])
return
file_name = sys.argv[1] # 輸入文件
num = int(sys.argv[2]) # 線程數
data, total = create_queue(file_name) # 創建數據隊列
print('total: %s' % total)
begin = time()
start_jobs(data, num) # 啟動工作線程
end = time()
print('spent %ss' % str(end-begin))
if __name__ == '__main__':
main()
使用方法
現在假設輸入文件名為input.txt,腳本文件名為redis_unauth.py,使用16個線程來批量驗證漏洞,我們可以啟動以下命令:
$ python redis_unauth.py input.txt 16
完整代碼
只是一個小腳本,就沒必要放到github上了,這里直接貼出,需要的讀者可以復制:
#!/usr/python3
'''
created by feather
'''
import socket
from threading import Thread
from queue import Queue
from time import sleep,time
from urllib.parse import urlparse
def poc(url):
url = url2ip(url) # 將url轉換成ip地址
if url:
port = int(url.split(':', -1)) if ':' in url else 6379 # redis默認端口是6379
host = url.split(':')[0]
payload = b'*1\r\n$4\r\ninfo\r\n' # 發送的數據
s = socket.socket()
socket.setdefaulttimeout(3) # 設置超時時間
try:
s.connect((host, port))
s.send(payload) # 發送info命令
response = s.recv(1024).decode()
s.close()
if response and 'redis_version' in response:
return True,'%s:%s'%(host,port)
except (socket.error, socket.timeout):
pass
return False, None
def url2ip(url):
"""
url轉換成ip
argument: url
return: 形如www.a.com:80格式的字符串 若轉換失敗則返回None
"""
try:
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://' + url
ip = urlparse(url).netloc
return ip
except (ValueError, socket.gaierror):
pass
return None
def create_queue(file_name):
"""
創建數據隊列
argument: file_name -> 輸入文件名
return: data,total 數據隊列,數據總數
"""
total = 0
data = Queue()
for line in open(file_name):
url = line.strip()
if url:
# 跳過空白的行
data.put(url)
total += 1
data.put(None) # 結束標記
return data,total
def start_jobs(data, num):
"""
啟動所有工作線程
argument: data -> 數據隊列 num -> 線程數
"""
is_alive = [True]
def job():
"""工作線程"""
while is_alive[0]:
try:
url = data.get()
if url == None:
# 遇到結束標記
break
code, result = poc(url) # 驗證漏洞
if code:
print(result) # 存在漏洞
except:
is_alive[0] = False
data.put(None) # 結束標記
jobs = [ Thread(target=job) for i in range(num) ] # 創建多個線程
for j in jobs:
j.setDaemon(True)
j.start() # 啟動線程
for j in jobs:
j.join() # 等待線程退出
def main():
import sys
if len(sys.argv) != 3:
print('Usage: python %s inputFile numOfThread' % sys.argv[0])
return
file_name = sys.argv[1] # 輸入文件
num = int(sys.argv[2]) # 線程數
data, total = create_queue(file_name) # 創建數據隊列
print('total: %s' % total)
begin = time()
start_jobs(data, num) # 啟動工作線程
end = time()
print('spent %ss' % str(end-begin))
if __name__ == '__main__':
main()