P2P學習(四)P2P編程實現


一:協議解析

(一)協議格式設計

(二)字段說明

Version(1Byte):版本信息,這里默認0即可

Status(1Byte):協議的狀態信息

#define PROTO_LOGIN_REQ                0x01    //登錄服務器的請求與響應
#define PROTO_LOGIN_ACK                0x81

#define PROTO_HEARTBEAT_REQ            0x02    //心跳包的請求與響應,防止P2P連接被NAT網關關閉
#define PROTO_HEARTBEAT_ACK            0x82

#define PROTO_CONNECT_REQ                0x11  //連接請求與響應,向服務端發送P2P連接請求----(服務器與本端)
#define PROTO_CONNECT_ACK                0x91

#define PROTO_NOTIFY_REQ                0x12   //服務端處理PROTO_CONNECT_REQ請求之后,發送PROTO_NOTIFY_REQ請求給對端----(服務器與對端)
#define PROTO_NOTIFY_ACK                0x92

#define PROTO_P2P_CONNECT_REQ            0x13  //對端接收到PROTO_NOTIFY_REQ請求之后,開始與本端建立P2P連接;本端接收到PROTO_P2P_CONNECT_REQ之后,回送PROTO_P2P_CONNECT_ACK給對端,雙方狀態機變為P2P建立完成,可以進行P2P傳輸
#define PROTO_P2P_CONNECT_ACK            0x93

#define RPORO_MESSAGE_REQ                0x21  //原始數據到達(是添加了自定義的首部之后的數據)---包含服務端轉發和P2P發送!!!
#define RPORO_MESSAGE_ACK                0xA1

Length(2Bytes):數據的長度字段 = Message數據的長度 + 數據頭部長度

Self ID(4Bytes):本端的ID信息

Other ID(4Bytes):對端的ID信息

Message:存放原始數據

(三)P2P客戶端的狀態機和協議的狀態信息

typedef enum {
    STATUS_INIT,
    STATUS_LOGIN,
    STATUS_HEARTBEAT,
    STATUS_CONNECT,
    STATUS_NOTIFY,
    STATUS_P2P_CONNECT,
    STATUS_MESSAGE,
} STATUS_SET;

(四)客戶端流程圖

1.本機A默認狀態STATUS_INIT,當本機A創建Socket之后,准備與服務器建立連接,狀態變為STATUS_LOGIN

2.本機A與服務端通過PROTO_LOGIN_REQ請求建立聯系,服務端記錄本機的id和地址ip和端口信息,返回PROTO_LOGIN_ACK確認消息給本機

3.本機A收到PROTO_LOGIN_ACK確認消息后,狀態變為STATUS_CONNECT,開始為建立p2p連接做准備,發送PROTO_CONNECT_REQ請求給服務器,服務端接收到本A端PROTO_CONNECT_REQ消息后,服務器回送PROTO_CONNECT_ACK確認消息和對端的地址信息給本機A,本機A狀態變為STATUS_P2P_CONNECT狀態。

4.服務端接收到本A端PROTO_CONNECT_REQ消息后,發送PROTO_NOTIFY_REQ請求(保護本端的地址信息)到對端B。對端B接收到PROTO_NOTIFY_REQ請求后,回送PROTO_NOTIFY_ACK確認消息給服務器,此時對端B狀態變為STATUS_P2P_CONNECT。

注意:如果無法建立P2P連接,則雙方的狀態停留在STATUS_P2P_CONNECT狀態,可以通過服務器進行轉發。而不需要進行p2p通信!

5.對端狀態為STATUS_P2P_CONNECT后,發生PROTO_P2P_CONNECT_REQ請求消息給本機端,打通對端-(NAT端口)-->本機。

6.對端狀態為STATUS_P2P_CONNECT后,發生PROTO_P2P_CONNECT_REQ請求消息給對端,打通本機端(NAT端口)--->對端。

注意:5、6是異步存在的!!

7.當客戶端接收到PROTO_P2P_CONNECT_REQ或者PROTO_P2P_CONNECT_ACK消息,本機狀態的狀態變為STATUS_MESSAGE。

之后可以正常的進行p2p通信!!!

二:代碼實現P2P程序

(一)頭文件p2p.h實現(含公共函數)

#ifndef __P2P_H__
#define __P2P_H__

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>    //互聯網地址族

#include <time.h>

//---------------------------定義數據占用空間大小---------------------------
#define CLIENT_MAX 1024                //定義客戶端中與對方連接的數量
#define CLIENT_ADDR_LENGTH 6        //定義空間存放客戶端地址信息,IP占4字節,端口占2字節
#define BUFFER_LENGTH 512            //定義發送和接收的緩沖區大小,512字節
#define NUMBER_ID_LENGTH 4            //定義客戶端ID的長度,占4字節

//---------------------------定義協議的狀態:注意響應比請求大於0x80,方便計算---------------------------
#define PROTO_LOGIN_REQ     0x01    //登錄服務器的請求與響應
#define PROTO_LOGIN_ACK     0x81

#define PROTO_HEARTBEAT_REQ 0x02    //心跳包的請求與響應,防止P2P連接被NAT網關關閉
#define PROTO_HEARTBEAT_ACK 0x82
 
#define PROTO_CONNECT_REQ   0x11     //連接請求與響應,向服務端發送P2P連接請求----(服務器與本端)
#define PROTO_CONNECT_ACK   0x91

#define PROTO_NOTIFY_REQ    0x12       //服務端處理PROTO_CONNECT_REQ請求之后,發送PROTO_NOTIFY_REQ請求給對端----(服務器與對端)
#define PROTO_NOTIFY_ACK    0x92

#define PROTO_P2P_CONNECT_REQ 0x13  //對端接收到PROTO_NOTIFY_REQ請求之后,開始與本端建立P2P連接;本端接收到PROTO_P2P_CONNECT_REQ之后,回送PROTO_P2P_CONNECT_ACK給對端,雙方狀態機變為P2P建立完成,可以進行P2P傳輸
#define PROTO_P2P_CONNECT_ACK 0x93

#define PROTO_MESSAGE_REQ      0x21  //原始數據到達(是添加了自定義的首部之后的數據)---包含服務端轉發和P2P發送!!!
#define PROTO_MESSAGE_ACK     0xA1

//---------------------------定義協議的索引,和各個協議狀態對應的索引位置---------------------------
#define PROTO_BUFFER_VERSION_IDX       0        //版本字段位置索引,索引0,占1個字節
#define PROTO_BUFFER_STATUS_IDX        1        //協議的狀態信息,索引1,占1個字節

#define PROTO_BUFFER_LENGTH_IDX        (PROTO_BUFFER_STATUS_IDX+1)    //協議的長度字段,索引2,占2個字節
#define PROTO_BUFFER_SELFID_IDX        (PROTO_BUFFER_LENGTH_IDX+2)    //協議的本端的ID信息字段,索引4,占4個字節

//login
#define PROTO_LOGIN_SELFID_IDX         PROTO_BUFFER_SELFID_IDX        //登錄時,需要添加本機的id到協議中去,在self id字段中,索引為4

//login ack
#define PROTO_LOGIN_ACK_SELFID_IDX     PROTO_BUFFER_SELFID_IDX        //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4

//heartbeat
#define PROTO_HEARTBEAT_SELFID_IDX     PROTO_BUFFER_SELFID_IDX        //心跳檢測,需要添加本機的id到協議中去,在self id字段中,索引為4

//heartbeat ack
#define PROTO_HEARTBEAT_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX        //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4

//connect
#define PROTO_CONNECT_SELFID_IDX       PROTO_BUFFER_SELFID_IDX        //連接相關,需要添加本端和對端的id信息,而本端的id放入self id字段,索引4
#define PROTO_CONNECT_OTHERID_IDX      (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH)        //對端的id放入other id字段,索引為8

//connect ack
#define PROTO_CONNECT_ACK_SELFID_IDX   PROTO_BUFFER_SELFID_IDX          //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4
#define PROTO_CONNECT_ACK_OTHERID_IDX  (PROTO_CONNECT_ACK_SELFID_IDX+NUMBER_ID_LENGTH)  //對端的id放入other id字段,索引為8
#define PROTO_CONNECT_MESSAGE_ADDR_IDX (PROTO_CONNECT_ACK_OTHERID_IDX+NUMBER_ID_LENGTH)    //這里開始存放地址數據,索引12。占6個字節,存放地址信息!!!---本機需要獲取到的地址信息,才能發送p2p請求,而之前並沒有獲取過這個數據,所以最好攜帶過去

//notify
#define PROTO_NOTIFY_SELFID_IDX        PROTO_BUFFER_SELFID_IDX          //通知對端字段,需要添加本端Id信息放入self id字段,索引為4
#define PROTO_NOTIFY_OTHERID_IDX       (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH)        //對端的id放入other id字段,索引為8
#define PROTO_NOTIFY_MESSAGE_ADDR_IDX  (PROTO_NOTIFY_OTHERID_IDX+NUMBER_ID_LENGTH)         //這里開始存放地址數據,索引12。占6個字節,存放地址信息!!!---對端需要獲取到本機的地址信息,才能發送p2p請求,而之前並沒有獲取過這個數據,所以最好攜帶過去

//notify ack
#define PROTO_NOTIFY_ACK_SELFID_IDX       PROTO_BUFFER_SELFID_IDX          //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4

//p2p connect
#define PROTO_P2P_CONNECT_SELFID_IDX    PROTO_BUFFER_SELFID_IDX       //P2P連接請求時,需要加入本端的Id信息放入self id這段,索引為4

//p2p connect ack
#define PROTO_P2P_CONNECT_ACK_SELFID_IDX    PROTO_BUFFER_SELFID_IDX   //P2P連接響應時,需要加入本端的Id信息放入self id這段,索引為4

//message
#define PROTO_MESSAGE_SELFID_IDX        PROTO_BUFFER_SELFID_IDX       //開始發送數據,需要添加本端Id信息,放入self id字段,索引為4
#define PROTO_MESSAGE_OTHERID_IDX       (PROTO_MESSAGE_SELFID_IDX+NUMBER_ID_LENGTH)    //需要加入對端ID信息到other id字段中,索引為8
#define PROTO_MESSAGE_CONTENT_IDX       (PROTO_MESSAGE_OTHERID_IDX+NUMBER_ID_LENGTH)   //從這里開始添加數據,索引為12

//message ack
#define PROTO_MESSAGE_ACK_SELFID_IDX    PROTO_BUFFER_SELFID_IDX       //數據發送結束,需要進行響應,索引為4
#define PROTO_MESSAGE_ACK_OTHERID_IDX   (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH)     //數據發送結束,需要進行響應,索引為4


typedef unsigned int U32;
typedef unsigned short U16;
typedef unsigned char U8;

//volatile的學習:https://www.runoob.com/w3cnote/c-volatile-keyword.html
typedef volatile long UATOMIC;    //當要求使用 volatile 聲明的變量的值的時候,系統總是重新從它所在的內存讀取數據,即使它前面的指令剛剛從該處讀取過數據。
//可以用於實現原語操作

//定義回調函數
typedef void* (*CALLBACK)(void* arg);    

//定義返回狀態
typedef enum{
    RESULT_FAILED = -1,
    RESULT_SUCCESS = 0
}RESULT;

//---------------------------定義客戶端狀態---------------------------
typedef enum {
    STATUS_INIT,
    STATUS_LOGIN,
    STATUS_HEARTBEAT,
    STATUS_CONNECT,
    STATUS_NOTIFY,
    STATUS_P2P_CONNECT,
    STATUS_MESSAGE
} STATUS_SET;

//---------------------------定義一個映射結構體,id==>地址和時間戳信息---------------------------
typedef struct __CLIENT_TABLE
{
    U8 addr[CLIENT_ADDR_LENGTH];    //6字節存放地址信息
    U32    client_id;                    //4字節存放客戶端id
    long stamp;                        //存放時間戳信息
}client_table;

//---------------------------服務器端數據結構---------------------------
int client_count = 0;
client_table table[CLIENT_MAX] = {0};

//---------------------------客戶端端數據結構---------------------------

//---------------------------服務器端函數---------------------------
/*
cmpxchg(void* ptr, int old, int new)
如果ptr和old的值一樣,則把new寫到ptr內存,
否則寫入ptr的值到old中
整個操作是原子的。
res返回值為0(失敗)或1(成功)表明cas(對比和替換)操作是否成功.
下面__asm__學習:https://www.jianshu.com/p/fa6d9d9c63b4
-----------`x++`是否是原子的?
不是,是3個指令,`取x,x+1,存入x`。
>在單處理器上,如果執行x++時,禁止多線程調度,就可以實現原子。因為單處理的多線程並發是偽並發。
在多處理器上,需要借助cpu提供的Lock功能。
鎖總線。讀取內存值,修改,寫回內存三步期間禁止別的CPU訪問總線。
同時我估計使用Lock指令鎖總線的時候,OS也不會把當前線程調度走了。要是調走了,那就麻煩了。
*/
static unsigned long cmpxchg(UATOMIC* addr,unsigned long _old,unsigned long _new){
    U8 res;
    //"__asm__"表示后面的代碼為內嵌匯編
    //"__volatile__"表示編譯器不要優化代碼,后面的指令保留原樣,"volatile"是它的別名
    __asm__ volatile (
        "lock; cmpxchg %3, %1;sete %0"            //加鎖以及比較和替換原子操作,按后面順序ret 0 , addr 1 , old 2, new 3
        : "=a" (res)                            //"=a"是說要把__asm__操作結果寫到__ret中
        : "m" (*addr), "a" (_old), "r" (_new)    //各個值存放的位置
        : "cc", "memory");

    return res;    //返回結果,0(失敗)或1(成功)
}

//返回時間戳信息
static long time_generator(){
    static long lTimeStamp = 0;                    //局部靜態變量
    static long timeStampMutex = 0;                //局部靜態變量

    if(cmpxchg(&timeStampMutex,0,1)){            //注意:只有TimeStampMutex原子操作成功才行進入下面語句
        lTimeStamp = time(NULL);                //生成時間戳,精度為s
        timeStampMutex = 0;
    }

    return lTimeStamp;                            //返回時間戳信息
}


//將sockaddr地址轉為array格式
static void addr_to_array(U8 *array, struct sockaddr_in *p_addr){
    //存放IP和端口,需要6個字節
    int i = 0;
    for(i = 0; i < 4; i++){
        array[i] = *((unsigned char*)(&(p_addr->sin_addr.s_addr))+i);        //獲取IP,順序存儲
    }

    for(i = 0; i < 2; i++){
        array[4+i] = *((unsigned char*)(&(p_addr->sin_port))+i);            //獲取Port信息
    }
}

//將array數組轉為sockaddr地址格式
static void array_to_addr(U8 *array,struct sockaddr_in *p_addr){
    int i=0;
    for(i = 0;i < 4;i++){
        *((unsigned char*)(&p_addr->sin_addr.s_addr)+i) = array[i];            //獲取IP,存放到sockaddr_in格式
    }
    for(i = 0;i < 2;i++){
        *((unsigned char*)(&p_addr->sin_port)+i) = array[4+i];                //獲取Port,存放到sockaddr_in格式
    }
}


static int get_index_by_clientid(int client_id){
    int i = 0;
    int now_count = client_count;
    for(i = 1;i<=now_count;i++){
        if(table[i].client_id == client_id)
            return i;
    }
    return RESULT_FAILED;
}

static int deal_connect_req(int sockfd,int client_id,int other_id){
    U8 buffer[BUFFER_LENGTH] = {0};
    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_NOTIFY_REQ;                      //發送PROTO_NOTIFY_REQ請求
    buffer[PROTO_NOTIFY_SELFID_IDX] = client_id;
    buffer[PROTO_NOTIFY_OTHERID_IDX] = other_id;

    int index = get_index_by_clientid(client_id);                            //獲取本端信息,一會發送給對端
    //填充數據,6字節的IP和端口信息
    memcpy(buffer+PROTO_NOTIFY_MESSAGE_ADDR_IDX,table[index].addr,CLIENT_ADDR_LENGTH);    

    index = get_index_by_clientid(other_id);                                //獲取對端信息,開始發送
    //獲取sockaddr信息
    struct sockaddr_in c_addr;
    c_addr.sin_family = AF_INET;
    array_to_addr(table[index].addr,&c_addr);

    int len = PROTO_NOTIFY_MESSAGE_ADDR_IDX + BUFFER_LENGTH;                //18字節,12的頭部,6字節的數據
    len = sendto(sockfd,buffer,len,0,(struct sockaddr*)&c_addr,sizeof(c_addr));
    if(len < 0){
        printf("Failed in deal_connect_req, send to other peer:%d\n",other_id);
        return RESULT_FAILED;
    }

    return RESULT_SUCCESS;
}


static int deal_connect_ack(int sockfd,int client_id,int other_id){            //可以和deal_connect_req合並
    //printf("call deal_connect_ack!\n");
    U8 buffer[BUFFER_LENGTH] = {0};
    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_CONNECT_ACK;                    //回送PROTO_CONNECT_ACK
    buffer[PROTO_NOTIFY_SELFID_IDX] = client_id;
    buffer[PROTO_NOTIFY_OTHERID_IDX] = other_id;

    int index = get_index_by_clientid(other_id);                            //獲取本端信息,一會發送給對端
    //填充數據,6字節的IP和端口信息
    memcpy(buffer+PROTO_CONNECT_MESSAGE_ADDR_IDX,table[index].addr,CLIENT_ADDR_LENGTH);    

    index = get_index_by_clientid(client_id);                                //獲取對端信息,開始發送
    //獲取sockaddr信息
    struct sockaddr_in c_addr;
    c_addr.sin_family = AF_INET;
    array_to_addr(table[index].addr,&c_addr);

    int len = PROTO_NOTIFY_MESSAGE_ADDR_IDX + BUFFER_LENGTH;                //18字節,12的頭部,6字節的數據
    len = sendto(sockfd,buffer,len,0,(struct sockaddr*)&c_addr,sizeof(c_addr));
    if(len < 0){
        printf("Failed in deal_connect_ack, send to client peer:%d\n",client_id);
        return RESULT_FAILED;
    }

    return RESULT_SUCCESS;
}

static int deal_message_req(int sockfd,int other_id,U8 *buffer,int length){
    int index = get_index_by_clientid(other_id);                                //獲取對端信息,開始發送
    //獲取sockaddr信息
    struct sockaddr_in c_addr;
    c_addr.sin_family = AF_INET;
    array_to_addr(table[index].addr,&c_addr);
    //printf("send to peer: %d.%d.%d.%d:%d\n",table[index].addr[0],table[index].addr[1],table[index].addr[2],table[index].addr[3],c_addr.sin_port);
    int n = sendto(sockfd,buffer,length,0,(struct sockaddr*)&c_addr,sizeof(c_addr));
    if(n < 0){
        printf("Failed in deal_message_req!\n");
        return RESULT_FAILED;
    }
    return RESULT_SUCCESS;
}


static int deal_ack(int sockfd,struct sockaddr_in *c_addr,U8 *buffer,int length){        //處理通用ACK消息,原來協議+0x80即可
    buffer[PROTO_BUFFER_STATUS_IDX] += 0x80;
    int n = sendto(sockfd,buffer,length,0,(struct sockaddr*)c_addr,sizeof(*c_addr));
    if(n < 0){
        printf("Failed in deal_ack!\n");
        return RESULT_FAILED;
    }
    return RESULT_SUCCESS;
}


//---------------------------客戶端函數---------------------------
static int send_login_req(int sockfd,int client_id,struct sockaddr_in *ser_addr){
    U8 buffer[BUFFER_LENGTH] = {0};            //buffer長度512

    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_LOGIN_REQ;
    *(int *)(buffer+PROTO_LOGIN_SELFID_IDX) = client_id;

    int n = PROTO_LOGIN_SELFID_IDX + NUMBER_ID_LENGTH;
    n = sendto(sockfd,buffer,n,0,(struct sockaddr*)ser_addr,sizeof(struct sockaddr_in));

    if(n < 0){
        printf("Failed to login server!\n");
        return RESULT_FAILED;
    }
    return RESULT_SUCCESS;
}

static int get_other_id(U8 *buffer,int *other_id){
    int id=0,i;
    for(i=2;buffer[i]!=':'&&buffer[i]!='\0';i++){        //還可以進行其他嚴格處理    
        id += id*10 + buffer[i]-'0';
    }
    *other_id = id;
    return i;                                //返回索引
}

static int send_connect_req(int sockfd,int client_id,int other_id,struct sockaddr_in *ser_addr){
    U8 buffer[BUFFER_LENGTH] = {0};            //buffer長度512

    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_CONNECT_REQ;
    *(int *)(buffer+PROTO_CONNECT_SELFID_IDX) = client_id;
    *(int *)(buffer+PROTO_CONNECT_OTHERID_IDX) = other_id;

    int n = PROTO_CONNECT_OTHERID_IDX + NUMBER_ID_LENGTH;
    n = sendto(sockfd,buffer,n,0,(struct sockaddr*)ser_addr,sizeof(struct sockaddr_in));

    if(n < 0){
        printf("Failed to login server!\n");
        return RESULT_FAILED;
    }
    return RESULT_SUCCESS;
}

static int send_message(int sockfd,int client_id,int other_id,struct sockaddr_in *addr,U8 *msg,int length){
    U8 buffer[BUFFER_LENGTH] = {0};

    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_MESSAGE_REQ;    //處理消息
    *(int*)(buffer+PROTO_MESSAGE_SELFID_IDX) = client_id;
    *(int*)(buffer+PROTO_MESSAGE_OTHERID_IDX) = other_id;

    memcpy(buffer + PROTO_MESSAGE_CONTENT_IDX,msg,length);    //初始化數據部分

    int n = PROTO_MESSAGE_CONTENT_IDX + length;
    *(U16*)(buffer+PROTO_BUFFER_LENGTH_IDX) = (U16)n;        //存放數據長度

    n = sendto(sockfd,buffer,n,0,(struct sockaddr*)addr,sizeof(struct sockaddr_in));
    if(n < 0){
        printf("Failed to send message to peer!\n");
        return RESULT_FAILED;
    } 
    return RESULT_SUCCESS;
}

static int send_p2pconnect(int sockfd,int client_id,struct sockaddr_in *p_addr){
    U8 buffer[BUFFER_LENGTH] = {0};

    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_P2P_CONNECT_REQ;
    *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX) = client_id;
    
    int n = PROTO_P2P_CONNECT_SELFID_IDX + NUMBER_ID_LENGTH;

    n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in));
    if(n<0){
        printf("Failed to send p2p connect req!\n");
        return RESULT_FAILED;
    }
    return RESULT_SUCCESS;
}

static int send_p2pconnect_ack(int sockfd,int client_id,struct sockaddr_in *p_addr){
    U8 buffer[BUFFER_LENGTH] = {0};

    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_P2P_CONNECT_ACK;
    *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX) = client_id;

    int n = PROTO_P2P_CONNECT_SELFID_IDX + NUMBER_ID_LENGTH;
    n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in));
    if(n < 0){
        printf("Failed to send p2p connect ack!\n");
        return RESULT_FAILED;
    }
    return RESULT_SUCCESS;
}


static int send_message_ack(int sockfd,int client_id,int other_id,struct sockaddr_in *p_addr){
    U8 buffer[BUFFER_LENGTH] = {0};

    buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_MESSAGE_ACK;
    *(int*)(buffer+PROTO_MESSAGE_ACK_SELFID_IDX) = client_id;
    *(int*)(buffer+PROTO_MESSAGE_ACK_OTHERID_IDX) = other_id;

    int n=PROTO_MESSAGE_ACK_OTHERID_IDX + NUMBER_ID_LENGTH;
    n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in));
    if(n < 0){
        printf("Failed to send message ack");
        return RESULT_FAILED;
    }
    return RESULT_SUCCESS;
}
#endif

(二)服務端p2p_server.c實現(簡單通信)

#include "p2p.h"


int recv_buffer_parser(int sockfd,U8 *buffer,U32 length,struct sockaddr_in *c_addr){    //length是傳遞過來的數據長度
    U8 status = buffer[PROTO_BUFFER_STATUS_IDX];                  //解析狀態
    //printf("recv_buffer_parser --->status: %d\n",status);
    int client_id,other_id,index;
    int old,now;
    U8 *msg;

    switch(status){
        case PROTO_LOGIN_REQ:                                      //處理登錄請求
            printf("recv login req!\n");
            old = client_count;
            now = old + 1;

            if(0 == cmpxchg((UATOMIC*)&client_count,old,now)){    //使用原子操作賦值
                printf("client_count --> %d,old:%d,now:%d\n", client_count,old,now);
                return RESULT_FAILED;
            }

            //開始登錄新用戶的信息
            U8 array[CLIENT_ADDR_LENGTH] = {0};                    //6字節存放地址IP:Port信息
            addr_to_array(array,c_addr);

            client_id = *(U32*)(buffer+PROTO_BUFFER_SELFID_IDX);

            printf("now:%d client:[%d],login ---> %d.%d.%d.%d:%d\n",now,client_id,
                *(unsigned char*)(&c_addr->sin_addr.s_addr), *((unsigned char*)(&c_addr->sin_addr.s_addr)+1),                                                    
                *((unsigned char*)(&c_addr->sin_addr.s_addr)+2), *((unsigned char*)(&c_addr->sin_addr.s_addr)+3),                      
                c_addr->sin_port);

            table[now].client_id = client_id;                    //獲取4字節長度的用戶id信息
            memcpy(table[now].addr,array,CLIENT_ADDR_LENGTH);    //獲取用戶的Addr地址信息

            //需要回送確認消息-----------
            deal_ack(sockfd,c_addr,buffer,length);
            break;
        case PROTO_HEARTBEAT_REQ:                                //處理心跳包請求
            printf("recv heartbeat req!\n");
            client_id = *(unsigned int*)(buffer+PROTO_HEARTBEAT_SELFID_IDX);
            index = get_index_by_clientid(client_id);

            table[index].stamp = time_generator();

            //需要回送確認消息-----------
            deal_ack(sockfd,c_addr,buffer,length);
            break;
        case PROTO_CONNECT_REQ:                                    //處理連接請求
            client_id = *(unsigned int*)(buffer+PROTO_CONNECT_SELFID_IDX);            //獲取本機id
            other_id = *(unsigned int*)(buffer+PROTO_CONNECT_OTHERID_IDX);            //獲取對端id
            printf("recv connect req from %d to %d!\n",client_id,other_id);

            deal_connect_req(sockfd,client_id,other_id);        //處理連接請求,1.向對端發送信息
            deal_connect_ack(sockfd,client_id,other_id);        //2.回送確認消息
            break;
        case PROTO_NOTIFY_ACK:                                    //處理對端發送回來的確認消息,無用
            printf("recv other notify ack message\n");
            break;
        case PROTO_MESSAGE_REQ:                              //處理要經過服務器轉發的數據和p2p無法建立的時候使用
            printf("recv message req!\n");
            msg = buffer + PROTO_MESSAGE_CONTENT_IDX;        //獲取要發送的數據
            client_id = *(unsigned int*)(buffer+PROTO_MESSAGE_SELFID_IDX);
            other_id = *(unsigned int*)(buffer+PROTO_MESSAGE_OTHERID_IDX);

            printf("Client[%d] send to Other[%d]:%s\n",client_id,other_id,msg);
            deal_message_req(sockfd,other_id,buffer,length);    //進行轉發
            break;
        case PROTO_MESSAGE_ACK:                                    //轉發確認消息
            printf("recv message ack!\n");
            client_id = *(unsigned int*)(buffer+PROTO_MESSAGE_SELFID_IDX);
            other_id = *(unsigned int*)(buffer+PROTO_MESSAGE_OTHERID_IDX);
            printf("Client[%d] send ack to Other[%d]\n",client_id,other_id);
            deal_message_req(sockfd,other_id,buffer,length);
            break;
    }

    return RESULT_SUCCESS;
}

int main(int argc,char *argv[]){
    int sockfd;
    int n,length;
    char buffer[BUFFER_LENGTH] = {0};
    struct sockaddr_in addr,c_addr;

    printf("UDP Server......\n");

    if(argc != 2){
        printf("Usage: %s port\n",argv[0]);
        exit(0);
    }
    
    sockfd = socket(AF_INET,SOCK_DGRAM,0);                            //獲取通信socket
    if(sockfd < 0){
        printf("Failed to open udp socket!\n");
        exit(0);
    }

    addr.sin_family = AF_INET;
    addr.sin_port = htons(atoi(argv[1]));                            //獲取端口信息
    addr.sin_addr.s_addr = htonl(INADDR_ANY);                        //允許接收所有網卡的到達數據

    length = sizeof(addr);

    if(bind(sockfd,(struct sockaddr*)&addr,length) < 0){
        printf("Failed to bind udp socket with ip port");
        exit(0);
    }

    while(1){
        n = recvfrom(sockfd,buffer,BUFFER_LENGTH,0,(struct sockaddr*)&c_addr,&length);
        if(n > 0){
            buffer[n] = 0x0;                                        //設置結束符號
            /*
            printf("%d.%d.%d.%d:%d say:%s\n", *(unsigned char*)(&c_addr.sin_addr.s_addr),*((unsigned char*)(&c_addr.sin_addr.s_addr)+1),
                *((unsigned char*)(&c_addr.sin_addr.s_addr)+2),*((unsigned char*)(&c_addr.sin_addr.s_addr)+3),
                c_addr.sin_port, buffer);                            //打印接收到的數據信息
            */
            int ret = recv_buffer_parser(sockfd,buffer,n,&c_addr);    //解析接收的數據,存儲相關信息
            if(ret == RESULT_FAILED)
                continue;

        }else if(n == 0){
            printf("client closed!\n");
        }else{
            printf("recv error\n");
            break;
        }
    }

    return 0;
}

(三)客戶端代碼實現(狀態機轉換,p2p通信)

#include "p2p.h"
#include <pthread.h>

static int status_machine = STATUS_INIT;    //狀態機
static int client_selfid = 0x0;                //默認本端的id,需要在main方法中輸入

struct sockaddr_in server_addr;                //服務端的信息

client_table p2p_clients[CLIENT_MAX];        //可以連接的P2P對端最大數量
static int p2p_count = 0;

static int buffer_parser(int sockfd,U8 *buffer,int length,struct sockaddr_in *addr){
    U8 status = buffer[PROTO_BUFFER_STATUS_IDX];    //獲取狀態
    U8 *msg;
    struct sockaddr_in p_addr;        //獲取對端的地址信息
    //printf("buffer_parser...%d\n",status);
    switch(status){
        case PROTO_LOGIN_ACK:                //處理登錄確認
            printf(" Connect Server Success\n");
            status_machine = STATUS_CONNECT;        //狀態轉移
            break;
        case PROTO_HEARTBEAT_ACK:
            //printf("recv heartbeat ack!\n");
            break;
        case PROTO_NOTIFY_REQ:                //處理服務端發送的NOTIFY請求
            //printf("recv notify req!\n");
            //獲取對端的數據信息
            p_addr.sin_family = AF_INET;
            array_to_addr(buffer+PROTO_NOTIFY_MESSAGE_ADDR_IDX,&p_addr);
            //回復確認消息給服務器
            buffer[PROTO_BUFFER_STATUS_IDX] += 0x80;
            sendto(sockfd,buffer,PROTO_NOTIFY_MESSAGE_ADDR_IDX,0,(struct sockaddr*)&server_addr,sizeof(struct sockaddr_in));    

            status_machine = STATUS_NOTIFY;
            //開始打洞
            send_p2pconnect(sockfd,client_selfid,&p_addr);    //開始打洞!!!
            if(status_machine != STATUS_MESSAGE){              //注意:需要進行判斷,因為是異步操作,所以本機接到NOTIFY請求的時候,可能已經接到對端的P2P連接請求,狀態已經變為STATUS_MESSAGE,那么我們不能再變為未就緒狀態
                status_machine = STATUS_P2P_CONNECT;
            }
            break;
        case PROTO_CONNECT_ACK:                //處理CONNECT 確認
            //printf("recv connect ack!\n");
            //獲取對端的數據信息
            p_addr.sin_family = AF_INET;
            array_to_addr(buffer+PROTO_CONNECT_MESSAGE_ADDR_IDX,&p_addr);

            send_p2pconnect(sockfd,client_selfid,&p_addr);    //開始打洞!!!
            if(status_machine != STATUS_MESSAGE){              //注意:需要進行判斷,因為是異步操作,所以本機接到NOTIFY請求的時候,可能已經接到對端的P2P連接請求,狀態已經變為STATUS_MESSAGE,那么我們不能再變為未就緒狀態
                status_machine = STATUS_P2P_CONNECT;
            }
            break;
        case PROTO_P2P_CONNECT_REQ:            //處理p2p連接請求---表示打洞成功,添加即可
            if(status_machine != STATUS_MESSAGE){
                //printf("recv p2p connect req!\n");
                int now_count = p2p_count++;

                p2p_clients[now_count].stamp = time_generator();
                p2p_clients[now_count].client_id = *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX);
                addr_to_array(p2p_clients[now_count].addr,addr);

                send_p2pconnect_ack(sockfd,client_selfid,addr);
                status_machine = STATUS_MESSAGE;
                printf("Enter P2P Model!\n");
            }
            break;
        case PROTO_P2P_CONNECT_ACK:            //處理p2p連接確認---表示打洞成功,添加即可
            if(status_machine != STATUS_MESSAGE){
                //printf("recv p2p connect ack!\n");
                int now_count = p2p_count++;

                p2p_clients[now_count].stamp = time_generator();
                p2p_clients[now_count].client_id = *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX);
                addr_to_array(p2p_clients[now_count].addr,addr);

                send_p2pconnect_ack(sockfd,client_selfid,addr);
                status_machine = STATUS_MESSAGE;
                printf("Enter P2P Model!\n");
            }
            break;
        case PROTO_MESSAGE_REQ:                //p2p數據到達
            //printf("recv p2p data....\n");

            msg = buffer + PROTO_MESSAGE_CONTENT_IDX;
            U32 other_id = *(U32*)(buffer+PROTO_MESSAGE_SELFID_IDX);
            printf("recv p2p data:%s from:%d\n",msg,other_id);

            send_message_ack(sockfd,client_selfid,other_id,addr);
            break;
        case PROTO_MESSAGE_ACK:
            //printf("peer recv message, and send ack to me!\n");
            break;
    }
}

void *recv_callback(void *arg){
    int sockfd = *(int*)arg;                //獲取sockfd

    struct sockaddr_in addr;
    int length = sizeof(struct sockaddr_in);
    U8 buffer[BUFFER_LENGTH] = {0};

    while(1){
        int n = recvfrom(sockfd,buffer,BUFFER_LENGTH,0,(struct sockaddr*)&addr,&length);
        printf("recvfrom data...\n");
        if(n > 0){
            buffer[n] = 0;
            buffer_parser(sockfd,buffer,n,&addr);    //解析數據
        }else if(n == 0){
            printf("server closed\n");
            close(sockfd);
            break;
        }else{
            printf("Failed to call recvfrom\n");
            close(sockfd);
            break;
        }
    }
}

void *send_callback(void *arg){                //線程處理發送消息
    int sockfd = *(int*)arg;                //獲取sockfd

    char buffer[BUFFER_LENGTH] = {0};

    while(1){
        bzero(buffer,BUFFER_LENGTH);        //置為0

        //printf("===client status====%d===\n",status_machine);
        if(status_machine == STATUS_CONNECT){
            printf("-----> please enter message(eg. C/S otherID: ...):\n");
            gets(buffer);                //獲取要輸入的數據
            //如果是登錄狀態,可以進行p2p連接或者服務器轉發
            int other_id,idx;
            idx = get_other_id(buffer,&other_id);
            //printf("%d--->%d\n",client_selfid,other_id);

            if(buffer[0] == 'C'){            //開始進行P2P連接
                send_connect_req(sockfd,client_selfid,other_id,&server_addr);
            }else{
                int length = strlen(buffer);

                send_message(sockfd,client_selfid,other_id,&server_addr,buffer+idx+1,length-idx-1);    //發送給服務器進行轉發
            }
            sleep(1);    //等待建立p2p連接
        }else if(status_machine == STATUS_MESSAGE){    //可以進行P2P通信
            printf("-----> please enter p2p message:\n");
            gets(buffer);                          //獲取要輸入的數據
            //與最新加入的進行p2p通信
            int now_count = p2p_count;            //這個是最新的序號
            struct sockaddr_in c_addr;            //對端的地址信息

            c_addr.sin_family = AF_INET;
            array_to_addr(p2p_clients[now_count-1].addr,&c_addr);

            int length = strlen(buffer);

            send_message(sockfd,client_selfid,0,&c_addr,buffer,length);    //直接發送給對端,P2P通信
        }else if(status_machine == STATUS_NOTIFY || status_machine == STATUS_P2P_CONNECT ){
            printf("-----> please enter message(S otherID:...):\n");
            printf("status:%d\n",status_machine);
            //scanf("%s",buffer);                    //獲取要輸入的數據
            gets(buffer);                          //獲取要輸入的數據

            int length = strlen(buffer);
            
            int other_id,idx;
            idx = get_other_id(buffer,&other_id);

            send_message(sockfd,client_selfid,other_id,&server_addr,buffer+idx+1,length-idx-1);    //發送給服務器進行轉發
        }
    }

}

int main(int argc,char *argv[]){
    printf("UDP Client......\n");

    if(argc != 4){
        printf("Usage: %s serverIp serverPort clientID\n",argv[0]);
        exit(0);
    }

    int sockfd = socket(AF_INET,SOCK_DGRAM,0);
    if(sockfd < 0){
        printf("Failed to create socket!\n");
        exit(0);
    }

    //創建兩個線程,分別處理接收和發送信息
    pthread_t thread_id[2] = {0};
    CALLBACK cb[2] = {send_callback,recv_callback};
    
    int i;
    for(i=0;i<2;i++){
        int ret = pthread_create(&thread_id[i],NULL,cb[i],&sockfd);    //創建線程,獲取線程號,傳入回調方法和參數
        if(ret){
            printf("Failed to create thread!\n");
            exit(1);
        }
    }

    //主線程進行登錄操作
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(argv[1]);
    server_addr.sin_port = htons(atoi(argv[2]));

    client_selfid = atoi(argv[3]);

    status_machine = STATUS_LOGIN;                                    //修改客戶端當前狀態
    send_login_req(sockfd,client_selfid,&server_addr);                //發送登錄請求

    for(i = 0;i<2;i++){
        pthread_join(thread_id[i],NULL);                            //join子線程
    }

    return 0;
}

(四)程序編譯

1.編譯服務端

gcc p2p_server.c -o ps

2.編譯客戶端

gcc p2p_client.c -o pc -lpthread

(五)代碼測試

1.服務端查看:

2.客戶端1查看

3.客戶端2查看

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM