我的魔法被公司防火牆限制了,所以我只能讓我的代理流量從我自己的服務器過一遍,但是服務器上面的客戶端只能允許本機使用,不能開放公網訪問,所以就想到了端口轉發
但是網絡上面找到的端口轉發工具需要各種配置,,,太麻煩了。所以在下班之前就開發了一個端口轉發的工具 (Python牛逼! (破音)
最近看了不少的優秀代碼,下面代碼也用到了一些開發方法。貼出代碼,希望能幫到有需要的人
# -*- coding:utf-8 -*
import socket
import threading
import sys,getopt
## 這個代碼是用來做端口轉發的 emmm 你問有什么用?沒用,刪了他吧 (攤手
# 錯誤
def l_w(s):
print("[!] {}".format(s))
# 信息
def l_i(s):
print("[+] {}".format(s))
# 毀滅性錯誤
def l_a(s):
print("[-] {}".format(s))
exit()
def help():
print("""
-h 提供幫助,就是你看到的這個
-s 指定監聽的本地ip,默認為 127.0.0.1
-l 指定監聽的端口號,必須指定
-d 指定轉發的目的主機,默認為 127.0.0.1
-p 指定轉發的目的端口,必須指定
本程序具有超級牛力! moo~~ moo~~~
""")
exit()
s_host = "127.0.0.1"
s_port = 0
d_host = "127.0.0.1"
d_port = 0
# 將來自s套接字的數據轉發到d套接字(函數名 forward)
def fw(s,d):
try:
while True:
buf = s.recv(4096)
l_i("{} ====> {} {} 字節".format(s.getpeername(),d.getpeername(),len(buf)))
if(len(buf) == 0):
l_w("{} 斷開連接".format(s.getpeername()))
return
d.send(buf)
except:
return
# 處理請求,每一個連接對應一個 (函數名 request thread)
def rt(request_socket):
des_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
des_socket.connect((d_host,int(d_port)))
except Exception as e:
l_a("連接目標 {}:{} 失敗!err: {}".format(d_host,d_port,str(e)))
threading.Thread(target=fw,args=(request_socket,des_socket)).start()
fw(des_socket,request_socket)
l_i("這個程序由 Startu 開發 o_o | ^_^ ~~~")
# 獲取參數
try:
opts,args = getopt.getopt(sys.argv[1:],"hs:l:d:p:")
except getopt.GetoptError:
l_w("參數不正確")
help()
for opt,arg in opts:
if opt == '-h':
help()
elif opt == '-s':
s_host = arg
elif opt == '-l':
s_port = arg
elif opt == '-d':
d_host = arg
elif opt == '-p':
d_port = arg
if s_host == "127.0.0.1":
l_w("將會監聽: 127.0.0.1") # 因為可能你不想綁定 127.0.0.1 所以作出提示 (也許是0.0.0.0呢)
if s_port == 0 or d_port == 0:
l_a("沒有指定端口號或指定為0 通過 -h 參數看幫助")
l_i("監聽 {}:{}".format(s_host,s_port))
l_i("將會連接 {}:{}".format(d_host,d_port))
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server_socket.bind((s_host,int(s_port)))
except Exception as e:
l_a("綁定 {}:{} 端口失敗 {}".format(s_host,s_port,str(e)))
server_socket.listen(50) # 應該同時連接不會超過 50 個吧...
l_i("准備就緒")
while True:
request_socket,addr = server_socket.accept()
l_i("{} 已連接".format(request_socket.getpeername()))
threading.Thread(target=rt,args=(request_socket,)).start()
核心代碼實際上只有十幾行而已。。。