一個基於python的即時通信程序


5月17日更新:

廣播信息、用戶列表、信息確認列表以及通信信息,從原來的用字符串存儲改為使用字典來存儲,使代碼更清晰,更容易擴展,具體更改的格式如下:

廣播信息(上線):
{
  'status': 信息狀態標志,
  'user_info': 本機的用戶名和主機名,
  'pub_key': 本機生成的公鑰,
}
廣播信息(下線):
{
  'status': 信息狀態標志,
  'user_info': 本機的用戶名和主機名,
}

用戶列表的元素:
{
  'user_info': 對應用戶的用戶名和主機名,
  'pub_key': 對應用戶的公鑰,
  'addr': 用戶對應的ip,
}

信息確認列表的元素:
{
  'confirm_seq': 信息序列號,
  'user': 發送信息的用戶的用戶名,
  'msg': 發送的信息,
  'addr': 信息的目的ip和端口,
}


通信信息:
{
  'status': 信息序列號,
  'user': 發送信息的用戶的用戶名,
  'msg': 發送的信息,
}

 

更新后的代碼如下:

!/usr/bin/env python
#coding=utf-8
#author: cjyfff
#blog: http://www.cnblogs.com/cjyfff/

import socket
import os
import threading
import traceback
import rsa

user_list = []
confirm_list = []
username = os.environ['USER']
hostname = os.popen('hostname').read()
(pubkey, privkey) = rsa.newkeys(1024)
pub = pubkey.save_pkcs1()


class MyThread(threading.Thread):
    '''這個類用於創建新的線程'''

    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        apply(self.func, self.args)


def broadcast(broADDR, status):
    '''發送廣播信息模塊
    用於發送廣播信息給其他主機,通知其他主機本主機上線\下線狀態,以及發送本機的信息給其他主機。
    這個模塊會在廣播信息前添加上status這個參數的值。在本程序中,當需要通知其他主機,本機已經上線時,
    會傳遞"online"給status,當需要通知其他主機本機即將下線時,會傳遞"offline"給status。
    '''
    global username, hostname, pub

    def broadcast_send(oMsg):
        udpSock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        udpSock2.sendto(oMsg, broADDR)
    oMsg = {}
    if status == 'online':
        oMsg = {
            'status': status,
            'user_info': ' '.join([username, hostname]),
            'pub_key': pub,
        }
        broadcast_send(str(oMsg))
    elif status == 'offline':
        oMsg = {
            'status': status,
            'user_info': ' '.join([username, hostname]),
        }
        broadcast_send(str(oMsg))


def recv_msg(localADDR, BUFSIZ, udpSock, port):
    '''信息接收模塊
    這個模塊的主要功能是,跟據接收到的廣播信息更新用戶列表,以及處理對端發送過來信息
    '''
    global user_list, confirm_list, username, hostname, pub, privkey

    while True:
        try:
            data, addr = udpSock.recvfrom(BUFSIZ)
        except:
            break
        if data:
            data = eval(data)
        addr = addr[0]
        if data['status'] == 'online':
            user_list_info = {
                'user_info': data['user_info'],
                'pub_key': data['pub_key'],
                'addr': addr,
            }
            if user_list_info not in user_list:
                user_list.append(user_list_info)
                # 把對方添加上用戶列表的同時,還要把自己的信息發給對方,以便對方更新用戶列表
                respond_msg = {
                    'status': 'respon_online',
                    'user_info': ' '.join([username, hostname]),
                    'pub_key': pub,
                }
                udpSock.sendto(str(respond_msg), (addr, port))

        elif data['status'] == 'offline':
            user_list_info = {
                'user_info': data['user_info'],
            }
            for i in xrange(len(user_list)):
                for k, v in user_list[i].iteritems():
                    if user_list_info['user_info'] == v:
                        del user_list[i]

        elif data['status'] == 'respon_online':
            user_list_info = {
                'user_info': data['user_info'],
                'pub_key': data['pub_key'],
                'addr': addr,
            }
            if user_list_info not in user_list:
                user_list.append(user_list_info)

        elif data['status'] == 'quit':
            print "對方已斷開連接,請輸入'quit'或'q'返回主菜單"
            continue

        elif data['status'] == 'local_quit':
            continue

        else:
            confirm_msg = data['status']
            # 假如收到的確認標志和確認表中的某項匹配,刪除該項
            for i in xrange(len(confirm_list)):
                if confirm_list[i]['confirm_seq'] == confirm_msg:
                    del confirm_list[i]
            if not data['msg']:
                continue
            addr_list = []
            for x in user_list:
                # 提取出用戶表中所有用戶的地址,存到addr_list中:
                addr_list.append(x['addr'])

            # 檢查發送信息的用戶的地址是否在用戶列表當中:
            if addr in addr_list:
                # 反饋收到確認信息給對方:
                confirm_res = {'status': confirm_msg, 'msg': 0}
                udpSock.sendto(str(confirm_res), (addr, port))
                # 打印信息:
                data_user = data['user']
                try:
                    data_msg = rsa.decrypt((data['msg']), privkey)
                except DecryptionError:
                    print "解碼出現異常,請重新連接"
                    continue
                print data_user, ":", data_msg


def print_userlist():
    '''打印用戶列表模塊'''
    global user_list
    user_list_len = len(user_list)
    print "當前有%d個用戶在線:" % user_list_len
    for i in xrange(user_list_len):
        print "ID:", i+1, ":", user_list[i]['user_info'].strip('\n'), \
            "come from:", user_list[i]['addr']


def send_msg(udpSock, cli_addr, cli_pub_key, port):
    '''信息發送模塊'''
    import random
    global username, confirm_list
    quit_list = ['q', 'quit', 'exit']
    cli_pub_key_rip = rsa.PublicKey.load_pkcs1(cli_pub_key)

    while True:
        msg = raw_input("> ")
        if msg in quit_list:
            # quit_msg_to_local用於通知本機對話結束,回收socket
            quit_msg_to_local = {'status': 'local_quit'}
            quit_msg_to_cli = {'status': 'quit'}
            udpSock.sendto(str(quit_msg_to_local), ('localhost', port))
            udpSock.sendto(str(quit_msg_to_cli), cli_addr)
            break

        random_num = random.randint(0, 1000)
        msg = rsa.encrypt(msg, cli_pub_key_rip)
        output_msg = {
            'status': str(random_num),
            'user': username,
            'msg': msg,
        }
        confirm_list_member = {
            'confirm_seq': str(random_num),
            'user': username,
            'msg': msg,
            'addr': cli_addr,
        }
        confirm_list.append(confirm_list_member)

        udpSock.sendto(str(output_msg), cli_addr)


def confirm_successd(udpSock):
    '''確認信息到達模塊
    采用類似於最久未使用(LRU)算法,每隔5秒鍾檢查一下信息確認列表(confirm_list),當信息確認列表長度大於5時(
    也就是說未確認接收的信息大於5),把信息確認列表前一半的信息再一次發送。
    '''
    import time
    global confirm_list

    while True:
        confirm_list_len = len(confirm_list)
        if confirm_list_len > 5:
            for i in xrange(confirm_list_len/2):
                repeat_output_msg = {
                    'status': confirm_list[i]['confirm_seq'],
                    'user': confirm_list[i]['user'],
                    'msg': confirm_list[i]['msg'],
                }
                #msg = confirm_list[i][0]
                addr = confirm_list[i]['addr']
                udpSock.sendto(str(repeat_output_msg), addr)
            time.sleep(5)
        else:
            time.sleep(5)


def option(udpSock, BUFSIZ, broADDR, port):
    '''選項菜單模塊'''
    while True:
        print '''
        請輸入您的選項:
        1 顯示用戶列表
        2 連接到指定用戶,並開始對話
        3 退出
        '''
        action = raw_input("> ")
        if action is '1':
            print_userlist()

        elif action is '2':
            client_option = raw_input("您想連接到哪個用戶?,請輸入對應的id號:\n")
            try:
                # 獲取對端的地址
                cli_addr = (user_list[int(client_option)-1]['addr'], port)
                cli_pub_key = user_list[int(client_option)-1]['pub_key']
            except IndexError:
                print "沒有這個用戶,請重新選擇:"
                continue
            print "已建立好連接,可以開始對話,輸入quit或q可以結束會話"
            threads = []
            t2 = MyThread(send_msg, (udpSock, cli_addr, cli_pub_key, port), send_msg.__name__)
            threads.append(t2)
            t3 = MyThread(confirm_successd, (udpSock, ), confirm_successd.__name__)
            threads.append(t3)
            for t in threads:
                t.setDaemon(True)
                t.start()
            t2.join()#send_msg中止之前,讓父線程一直在阻塞狀態
            print "連接中斷,返回主菜單"

        elif action is '3':
            broadcast(broADDR, 'offline')
            udpSock.close()
            print "再見!"
            break

        else:
            pass


def main():
    '''主函數'''
    host = ''
    port = 2425
    broADDR = ('<broadcast>', port)
    localADDR = (host, port)
    BUFSIZ = 1024
    try:
        broadcast(broADDR, 'online')
        udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock.bind(localADDR)
        t1 = MyThread(recv_msg, (localADDR, BUFSIZ, udpSock, port, ),
                       recv_msg.__name__)
        t1.setDaemon(True)
        t1.start()
        option(udpSock, BUFSIZ, broADDR, port)
    except (KeyboardInterrupt, SystemError):
        udpSock.close()
        raise
    except:
        traceback.print_exc

if __name__ == '__main__':
    main()
cjyfffIM_v0.3

 

4月23日更新:

已實現RSA加密功能

 

4月18日:

額。。。本來想用弄一個類似於“飛鴿傳書”那樣的軟件的,目前已經實現了一部分功能了,還有一部分功能沒有實現,暫時把這篇文章當作是開發文檔,以后添加了新功能后再修改這篇文章吧。有什么錯漏的地方請各位大牛提出哈。

目前已經實現了的功能:

1 自動發現局域網內也運行了本程序的機器,把該機器添加到客戶列表,假如對方下線的話,自動在客戶列表中刪除對應的信息。

2 具備確認機制,對方收到我方發送的信息后會反饋回來確認信息,沒有收到確認的信息將會在一段時間之后重新發送。

3 信息采用RSA加密

 

待實現的功能:

1 實現文件傳輸功能

2 優化代碼,使代碼的可讀性增強 

 

程序各個模塊的簡單邏輯關系如下圖:

 

 

各個模塊功能表述:

一、選項菜單模塊(option)

這個模塊有3個選項,分別是:

選項1 打印用戶列表。通過調用print_userlist()函數把當前用戶列表中的用戶打印出來。

選項2 與指定的用戶建立連接。根據用戶輸入的id號,與用戶列表中的指定用戶建立連接。

選項3 退出程序。在退出前首先會調用發送廣播信息模塊(broadcast),向局域網廣播一條信息通知本機即將下線,然后關閉socket,最后再退出程序。

 

二、發送廣播信息模塊(broadcast)

這個模塊的作用是在程序啟動(退出)時,向局域網內的其他機器發送廣播,通知其他機器在各自的用戶列表中添加(刪除)此用戶。

假設本機的用戶名是Mike,主機名是Mike‘PC

本機上線的廣播信息將是:online^Mike Mike’PC‘^Mike’PC的rsa公鑰

本機下線的廣播信息將是:offline^Mike Mike’PC’

 

三、信息發送模塊(send_msg)

這個模塊運行在一個循環當中,不斷的處理用戶的輸入。

假如用戶輸入退出指令('q', 'quit', 'exit'),這時候這個模塊首先向本機發送一個“local^quit”信息,讓本機的信息接收模塊(recv_msg)停止接收數據,同時發送一個“quit”給對方,通知對方連接即將中斷,然后退出循環,讓程序回到選項菜單模塊(option)。

假如用戶輸入的不是退出指令,那么就認為用戶將要發送的是正常信息。這里要提一下這個程序中確認機制的實現原理:本機在發送一個消息出去的時候,會在消息的頭部加上一個(0~9999)的隨機數作為確認標記,同時把這個消息添加到信息確認列表(confirm_list)。對端收到這條消息后,會把確認標記發送回來,然后本機就會根據所接收到的確認標記刪除信息確認列表(confirm_list)所對應的條目,這樣就認為一條消息對方已經成功接收。

回到具體實現的過程,這個模塊會在輸入的信息之前加上一個(0~9999)的隨機數作為標記,同時加上用戶名。例如本機Mike用戶向對端一個ip地址為192.168.1.10的用戶發送一個“Hello”,那么經這個模塊發送出去的信息可能是這樣:“1255^Mike^Hello”。同時這個模塊會在信息確認列表(confirm_list)中添加上“[1255^Mike^Hello,192.168.1.10]”這樣的一條記錄。

 

四、信息接收模塊(recv_msg)

這個模塊的主要功能是,跟據接收到的廣播信息更新用戶列表(confirm_list),以及處理對端發送過來信息。

假如收到以“online”開頭的信息,這個模塊會認為這是對端發送過來的通知上線的廣播信息,於是便會在信息中提取出用戶名以及主機名,再加上對端的ip地址和端口,添加到用戶列表中。並且以一條以“respon_online”開頭的信息反饋給對方本機的信息,以便對方也可以更新用戶列表。例如收到從192.168.1.11發送過來的一條“online^Kate Kate'PC'^Kate'PC'的rsa公鑰”這樣一條廣播信息后,本機將在用戶列表中添加上“[['Kate Kate'PC', Kate'PC'的rsa公鑰], ('192.168.1.11', 12345)]”(這個端口號是隨機分配的),同時本機返回一條這樣的信息給對方:respon_online^'Mike Mike'PC'^Mike'PC'的rsa公鑰。

假如是本機收到以“respon_online”開頭的信息的話,那就跟上面“online”的情況一樣,提取出用戶名、主機名、ip地址和端口,添加到用戶列表(confirm_list)上。

假如收到的是以“offline”開頭的信息,就提取出用戶名、主機名、ip地址和端口,檢查用戶列表(confirm_list)中有沒有對應的條目,假如有的話就刪除掉對應的條目。

假如收到的是“quit”信息,說明對端即將斷開連接,這個時候本模塊將提示用戶輸入退出命令,以便退出連接。

假如收到的是“local^quit”信息,說明本機即將斷開連接,這個時候本模塊將返回模塊的開頭,准備接收新的信息。

假如接收到的信息不滿足以上的條件,就會被認為是用戶間發送的正常消息:

首先要提取消息頭部的確認標志。如果收到的信息除了確認標志外沒有其他內容了,那么這條消息會被認為是對端在收到本機發送出去的信息后,反饋回來的確認信息,因此接下來的工作就是根據確認標志,查找信息確認列表(confirm_list)所對應的條目並刪除。

假如處理確認標志外還有其他內容,那么這條信息就是對端用戶所輸入的信息,於是首先提取出確認標志返回給對端,然后再本機上打印出對方所輸入的內容。

 

五、確認信息到達模塊(confirm_successd)

 這個模塊采用類似於最久未使用(LRU)算法,每隔5秒鍾檢查一下信息確認列表(confirm_list),當信息確認列表長度大於5時(也就是說未確認接收的信息大於5),把信息確認列表前一半的信息再一次發送。

 

最后是這個程序的代碼:

#! /usr/bin/env python
#coding=utf-8
#author: cjyfff
#blog: http://www.cnblogs.com/cjyfff/

import socket
import os
import pwd
import threading
import traceback
import random
import time
import rsa

user_list = []
confirm_list = []
username = pwd.getpwuid(os.getuid())[0]
hostname = os.popen('hostname').read()
(pubkey, privkey) = rsa.newkeys(1024)
pub = pubkey.save_pkcs1()


class MyThread(threading.Thread):
    '''這個類用於創建新的線程'''

    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        apply(self.func, self.args)


def broadcast(broADDR, status):
    '''發送廣播信息模塊
    用於發送廣播信息給其他主機,通知其他主機本主機上線\下線狀態,以及發送本機的信息給其他主機。
    這個模塊會在廣播信息前添加上status這個參數的值。在本程序中,當需要通知其他主機,本機已經上線時,
    會傳遞"online"給status,當需要通知其他主機本機即將下線時,會傳遞"offline"給status。
    '''
    global username, hostname, pub

    def broadcast_send(oMsg):
        udpSock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        udpSock2.sendto(oMsg, broADDR)

    if status == 'online':
        oMsg = status + "^" + username + ' ' + hostname + "^" + pub
        broadcast_send(oMsg)
    elif status == 'offline':
        oMsg = status + "^" + username + ' ' + hostname
        broadcast_send(oMsg)


def recv_msg(localADDR, BUFSIZ, udpSock, port):
    '''信息接收模塊
    這個模塊的主要功能是,跟據接收到的廣播信息更新用戶列表,以及處理對端發送過來信息
    '''
    global user_list, confirm_list, username, hostname, pub, privkey

    while True:
        try:
            data, addr = udpSock.recvfrom(BUFSIZ)
        except:
            break

        if data.startswith('online'):
            data = data.split('^')[1:]
            if [data, addr] not in user_list:
                user_list.append([data, addr])
                # 把對方添加上用戶列表的同時,還要把自己的信息發給對方,以便對方把更新用戶列表
                res_msg = 'respon_online^' + username + ' ' + hostname + "^" + pub
                udpSock.sendto(res_msg, (addr[0], port))

        elif data.startswith('offline'):
            data = data.split('^')[1]
            for i in xrange(len(user_list)):
                if user_list[i][0][0] == data:
                    del user_list[i]

        elif data.startswith('respon_online'):
            data = data.split('^')[1:]
            if [data, addr] not in user_list:
                user_list.append([data, addr])

        elif data == 'quit':
            print "對方已斷開連接,請輸入'quit'或'q'返回主菜單"
            continue

        elif data == 'local^quit':
            continue

        else:
            confirm_recv = data.split('^')[0]
            # 假如收到的確認標志和確認表中的某項匹配,刪除該項
            for i in xrange(len(confirm_list)):
                if confirm_list[i][0].split('^')[0] == confirm_recv:
                    del confirm_list[i]
            data = data.split('^')[1:]
            if not data:
                continue
            addr_list = []
            for x in user_list:
                # 提取出用戶表中所有用戶的地址,存到addr_list中:
                addr_list.append(x[1][0])
            addr = addr[0]
            # 檢查發送信息的用戶的地址是否在用戶列表當中:
            if addr in addr_list:
                # 反饋收到確認信息給對方:
                udpSock.sendto(str(confirm_recv), (addr, port))
                # 打印信息:
                data_name = data[0]
                data_msg = rsa.decrypt((data[1]), privkey)
                print data_name, ":", data_msg


def print_userlist():
    '''打印用戶列表模塊'''
    global user_list
    print "當前有%d個用戶在線:" % len(user_list)
    for i in xrange(len(user_list)):
        print "ID: ", i+1, ":", user_list[i][0][0]


def send_msg(udpSock, cli_addr, cli_pub, port):
    '''信息發送模塊'''
    global username, user_list, confirm_list
    quit_list = ['q', 'quit', 'exit']
    cli_pubkey = rsa.PublicKey.load_pkcs1(cli_pub)

    while True:
        msg = raw_input("> ")
        if msg in quit_list:
            udpSock.sendto('local^quit', ('localhost', port))
            udpSock.sendto('quit', cli_addr)
            break

        random_num = random.randint(0, 1000)
        msg = rsa.encrypt(msg, cli_pubkey)
        out_msg = '%s' % random_num + '^' + username + '^' + msg
        confirm_list.append([out_msg, cli_addr])
        udpSock.sendto(out_msg, cli_addr)


def confirm_successd(udpSock):
    '''確認信息到達模塊
    采用類似於最久未使用(LRU)算法,每隔5秒鍾檢查一下信息確認列表(confirm_list),當信息確認列表長度大於5時(
    也就是說未確認接收的信息大於5),把信息確認列表前一半的信息再一次發送。
    '''
    global confirm_list

    while True:
        lenght = len(confirm_list)
        if lenght > 5:
            for i in xrange(lenght/2):
                msg = confirm_list[i][0]
                addr = confirm_list[i][1]
                udpSock.sendto(msg, addr)
            time.sleep(5)
        else:
            time.sleep(5)


def option(udpSock, BUFSIZ, broADDR, port):
    '''選項菜單模塊'''
    while True:
        print '''
        輸入您的選項:
顯示用戶列表
連接到指定用戶,並開始對話
退出
        '''
        action = raw_input("> ")
        if action is '1':
            print_userlist()

        elif action is '2':
            client_id = raw_input("您想連接到哪個用戶?,請輸入對應的id號:\n")
            try:
                # 獲取對端的地址
                cli_addr = (user_list[int(client_id)-1][1][0], port)
                cli_pub = user_list[int(client_id)-1][0][1]
            except IndexError:
                print "沒有這個用戶,請重新選擇:"
                continue
            print "已建立好連接,可以開始對話"
            threads = []
            t2 = MyThread(send_msg, (udpSock, cli_addr, cli_pub, port), send_msg.__name__)
            threads.append(t2)
            t3 = MyThread(confirm_successd, (udpSock, ), confirm_successd.__name__)
            threads.append(t3)
            for t in threads:
                t.setDaemon(True)
                t.start()
            t2.join()#send_msg中止之前,讓父線程一直在阻塞狀態

            print "連接中斷,返回主菜單"

        elif action is '3':
            broadcast(broADDR, 'offline')
            udpSock.close()
            print "再見!"
            break

        else:
            pass


def main():
    '''主函數'''
    host = ''
    port = 2425
    broADDR = ('<broadcast>', port)
    localADDR = (host, port)
    BUFSIZ = 1024
    try:
        broadcast(broADDR, 'online')
        udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock.bind(localADDR)
        t1 = MyThread(recv_msg, (localADDR, BUFSIZ, udpSock, port, ),
                       recv_msg.__name__)
        t1.setDaemon(True)
        t1.start()
        option(udpSock, BUFSIZ, broADDR, port)
    except (KeyboardInterrupt, SystemError):
        udpSock.close()
        raise
    except:
        traceback.print_exc

if __name__ == '__main__':
    main()
cjyfffIM_v0.1

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM