socket(套接字)


基於tcp協議的socket

tcp是基於鏈接的,必須先啟動服務端,然后再啟動客戶端去鏈接服務端

server端

import socket
sk = socket.socket()
sk.bind(('127.0.0.1',8898))  #把地址綁定到套接字
sk.listen()          #監聽鏈接
conn,addr = sk.accept() #接受客戶端鏈接
ret = conn.recv(1024)  #接收客戶端信息
print(ret)       #打印客戶端信息
conn.send(b'hi')        #向客戶端發送信息
conn.close()       #關閉客戶端套接字
sk.close()        #關閉服務器套接字(可選)

client端

import socket
sk = socket.socket()           # 創建客戶套接字
sk.connect(('127.0.0.1',8898))    # 嘗試連接服務器
sk.send(b'hello!')
ret = sk.recv(1024)         # 對話(發送/接收)
print(ret)
sk.close()            # 關閉客戶套接字

問題:有時重啟服務端可能會遇到

解決辦法:

#加入一條socket配置,重用ip和端口
import socket
from socket import SOL_SOCKET,SO_REUSEADDR
sk = socket.socket()
sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
sk.bind(('127.0.0.1',8898))  #把地址綁定到套接字
sk.listen()          #監聽鏈接
conn,addr = sk.accept() #接受客戶端鏈接
ret = conn.recv(1024)   #接收客戶端信息
print(ret)              #打印客戶端信息
conn.send(b'hi')        #向客戶端發送信息
conn.close()       #關閉客戶端套接字
sk.close()        #關閉服務器套接字(可選)

完成一個socket實現的小程序:

  1. 能夠實現和同桌之間的通信
  2. 能夠實現自己向發送的任意內容
  3. 能夠和你的同桌聊任意多句話,並設置退出

server端

import socket
sk = socket.socket()   # 創建一個對象
sk.bind(('127.0.0.1',9001))  # 綁定一個服務器的地址  192.168.16.46
sk.listen()  # 開始接受客戶端給我連接
conn,addr = sk.accept()  # 阻塞 直到有人連我
while True:
    conn.send(b'hello')  # 發送內容
    msg = conn.recv(1024) # 收信
    if msg.decode('utf-8') == 'tuichu':
        conn.send("tuichu".encode("utf-8"))
        break
        conn.close()  # 關閉連接
        sk.close()  # 關閉服務器
    print(msg.decode("utf-8"))

client端

import socket
sk = socket.socket() # 創建socket對象
sk.connect(('127.0.0.1',9001)) # 綁定連接server端的地址
while True:
    msg = sk.recv(1024)  # 接收服務器發來的信息
    print(msg.decode('utf-8'))  # 解碼並打印消息內容
    if msg.decode('utf-8') == 'tuichu':
        break
        sk.close()  # 關機
    choice = input('請輸入您要發送的內容>>>:')
    sk.send(choice.encode('utf-8'))

 基於udp協議的socket

udp是無鏈接的,啟動服務之后可以直接接受消息不需要提前建立鏈接

server端

import socket
udp_sk = socket.socket(type=socket.SOCK_DGRAM)   #創建一個服務器的套接字
udp_sk.bind(('127.0.0.1',9000))        #綁定服務器套接字
msg,addr = udp_sk.recvfrom(1024)
print(msg)
udp_sk.sendto(b'hi',addr)                 # 對話(接收與發送)
udp_sk.close()                         # 關閉服務器套接字

client端

import socket
ip_port=('127.0.0.1',9000)
udp_sk=socket.socket(type=socket.SOCK_DGRAM)
udp_sk.sendto(b'hello',ip_port)
back_msg,addr=udp_sk.recvfrom(1024)
print(back_msg.decode('utf-8'),addr)

 socket更多方法介紹

服務端套接字函數
s.bind()    綁定(主機,端口號)到套接字
s.listen()  開始TCP監聽
s.accept()  被動接受TCP客戶的連接,(阻塞式)等待連接的到來

客戶端套接字函數
s.connect()     主動初始化TCP服務器連接
s.connect_ex()  connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常

公共用途的套接字函數
s.recv()            接收TCP數據
s.send()            發送TCP數據
s.sendall()         發送TCP數據
s.recvfrom()        接收UDP數據
s.sendto()          發送UDP數據
s.getpeername()     連接到當前套接字的遠端的地址
s.getsockname()     當前套接字的地址
s.getsockopt()      返回指定套接字的參數
s.setsockopt()      設置指定套接字的參數
s.close()           關閉套接字

面向鎖的套接字方法
s.setblocking()     設置套接字的阻塞與非阻塞模式
s.settimeout()      設置阻塞套接字操作的超時時間
s.gettimeout()      得到阻塞套接字操作的超時時間

面向文件的套接字的函數
s.fileno()          套接字的文件描述符
s.makefile()        創建一個與該套接字相關的文件

 tcp驗證登錄

import socket
import hashlib
import json
def md5_func(user,pwd):
    md5 = hashlib.md5(user.encode('utf-8'))
    md5.update(pwd.encode('utf-8'))
    return md5.hexdigest()

username = input('用戶名')
password = input('密 碼')
sk = socket.socket()
sk.connect(('127.0.0.1',9000))
res = {'opt':'login','username':username,'password':md5_func(username,password)}
ret = json.dumps(res)
sk.send(ret.encode('utf-8'))
msg = sk.recv(1024).decode('utf-8')  # {"opt": "login", "result": false}
a = json.loads(msg)
if a['result']:
    print('登陸成功')
else:
    print('登陸失敗')
sk.close()
server
# -*- coding: utf-8 -*-
# @Time    : 2019/4/12 15:33
import os
import json
import hmac
import socket
import struct
import hashlib
sk = socket.socket()

# 登錄密碼加密
def get_md5(user,pwd):
    md5 = hashlib.md5(user.encode('utf-8'))
    md5.update(pwd.encode('utf-8'))
    return md5.hexdigest()

# 認證加密
def get_hmac(secret_key,rand):
    hmac_t = hmac.new(secret_key, rand)
    res = hmac_t.digest()
    return res

# 發送數據
def pro_send(sk,dic,pro = True):
    bytes_dic = json.dumps(dic).encode('utf-8')
    if pro:
        len_bytes = struct.pack('i',len(bytes_dic))
        sk.send(len_bytes)
    sk.send(bytes_dic)

# 接收數據
def pro_recv(sk,pro=True,num=1024):
    if pro:
        num = sk.recv(4)
        num = struct.unpack('i', num)[0]
    str_dic = sk.recv(num).decode('utf-8')
    dic = json.loads(str_dic)
    return dic

# 客戶端認證
def auth():
    sk.connect(('127.0.0.1', 9000))
    secret_key = '宋治學'.encode('utf-8')
    rand = sk.recv(1024)
    res = get_hmac(secret_key,rand)
    sk.send(res)
    return sk   # 將對象return出來方便全局使用

# 下載文件
def write_file(filename):
    md5 = hashlib.md5()
    filesize = json.loads(sk.recv(1024).decode('utf-8'))
    with open(filename, 'wb') as f:
        print('正在傳輸')
        while filesize['file_size'] > 0:
            content = sk.recv(4096)
            md5.update(content)
            f.write(content)
            filesize['file_size'] -= len(content)
        print('傳輸完成')
    return md5.hexdigest()
# D:\PycharmProjects\s20\爬蟲學習\server.py
def download(sk):
    dic = {'operate': 'download'}
    pro_send(sk, dic)
    lis = pro_recv(sk, pro=False, num=1024)
    for index,i in enumerate(lis,1):
        print(index,i)
    choice = input('請選擇要下載的文件').strip()
    if lis[int(choice)-1]:
        sk.send(lis[int(choice)-1].encode('utf-8')) # 發送用戶選擇的文件名
        md5_file = write_file(lis[int(choice) - 1]) # 下載文件后得到的md5值
        ret_md5 = sk.recv(1024).decode('utf-8')   # 服務器傳過來的md5值
        if md5_file == ret_md5:
            print('文件校驗成功')
        else:
            print('文件校驗失敗')
    else:
        print('輸入有誤請重新輸入')

# 上傳文件
def upload(sk):
    file_path = input('請輸入文件路徑').strip()
    file_name = os.path.basename(file_path)
    file_size = os.path.getsize(file_path)
    dic = {'filename': file_name, 'filesize': file_size, 'operate': 'upload'}
    pro_send(sk, dic)
    with open(file_path,'rb') as f:
        while file_size > 4096:
            content = f.read(4096)
            sk.send(content)
            file_size -= len(content)
        else:
            content = f.read()
            sk.send(content)
# 退出
def exsit(sk=None):
    exit()

# 登錄
def login(user,pwd):
    sk = auth()
    dic = {'user': user, 'passwd': get_md5(user,pwd), 'operate': 'login'}
    pro_send(sk,dic)
    ret = pro_recv(sk)
    return sk,ret

# 用戶登錄成功選擇操作
def choice(sk):
    while True:
        operate = [('上傳', upload), ('下載', download),('退出',exsit)]
        for index,opt in enumerate(operate,1):
            print(index,opt[0])
        num = input('請選擇操作:>>>').strip()
        if num.isdigit() and int(num) in range(1,len(operate)+1):
            operate[int(num)-1][1](sk)
        else:print('輸入有誤請重新輸入')

while True:
    username = input('username:').strip()
    password = input('password:').strip()
    sk,ret = login(username,password)
    if ret['flag']:
        print('登錄成功')
        choice(sk)
    else:
        print('登陸失敗')
        sk.close()
        break
client

udp多人聊天

# -*- coding: utf-8 -*-
# @Time    : 2019/4/9 16:35

import socket

server_socket = socket.socket(type=socket.SOCK_DGRAM)

server_socket.bind(('127.0.0.1',9000))

while True:
    conn,addr = server_socket.recvfrom(1024)
    print('消息來自%s:%s %s'%(addr[0],addr[1],conn.decode('utf-8')))
    msg = input('回復消息>>>:')
    server_socket.sendto(msg.encode('utf-8'),addr)
server
import socket
client_socket = socket.socket(type=socket.SOCK_DGRAM)
dic = {
    '華哥':('127.0.0.1',9000),
    '黑哥':('127.0.0.1',9000),
    '李健':('127.0.0.1',9000),
    '錢羽':('127.0.0.1',9000),
    '雄哥':('127.0.0.1',9000),
    '王月陽':('127.0.0.1',9000),
}

while True:
    name = input('請選擇聊天對象').strip()
    name = dic[name]
    while True:
        msg = input('>>>:').strip()
        client_socket.sendto(msg.encode('utf-8'),name)
        a,b = client_socket.recvfrom(1024)
        print(a.decode('utf-8'))
client

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM