一:協議解析
(一)協議格式設計
(二)字段說明
Version(1Byte):版本信息,這里默認0即可
Status(1Byte):協議的狀態信息
#define PROTO_LOGIN_REQ 0x01 //登錄服務器的請求與響應 #define PROTO_LOGIN_ACK 0x81 #define PROTO_HEARTBEAT_REQ 0x02 //心跳包的請求與響應,防止P2P連接被NAT網關關閉 #define PROTO_HEARTBEAT_ACK 0x82 #define PROTO_CONNECT_REQ 0x11 //連接請求與響應,向服務端發送P2P連接請求----(服務器與本端) #define PROTO_CONNECT_ACK 0x91 #define PROTO_NOTIFY_REQ 0x12 //服務端處理PROTO_CONNECT_REQ請求之后,發送PROTO_NOTIFY_REQ請求給對端----(服務器與對端) #define PROTO_NOTIFY_ACK 0x92 #define PROTO_P2P_CONNECT_REQ 0x13 //對端接收到PROTO_NOTIFY_REQ請求之后,開始與本端建立P2P連接;本端接收到PROTO_P2P_CONNECT_REQ之后,回送PROTO_P2P_CONNECT_ACK給對端,雙方狀態機變為P2P建立完成,可以進行P2P傳輸 #define PROTO_P2P_CONNECT_ACK 0x93 #define RPORO_MESSAGE_REQ 0x21 //原始數據到達(是添加了自定義的首部之后的數據)---包含服務端轉發和P2P發送!!! #define RPORO_MESSAGE_ACK 0xA1
Length(2Bytes):數據的長度字段 = Message數據的長度 + 數據頭部長度
Self ID(4Bytes):本端的ID信息
Other ID(4Bytes):對端的ID信息
Message:存放原始數據
(三)P2P客戶端的狀態機和協議的狀態信息
typedef enum { STATUS_INIT, STATUS_LOGIN, STATUS_HEARTBEAT, STATUS_CONNECT, STATUS_NOTIFY, STATUS_P2P_CONNECT, STATUS_MESSAGE, } STATUS_SET;
(四)客戶端流程圖
1.本機A默認狀態STATUS_INIT,當本機A創建Socket之后,准備與服務器建立連接,狀態變為STATUS_LOGIN
2.本機A與服務端通過PROTO_LOGIN_REQ請求建立聯系,服務端記錄本機的id和地址ip和端口信息,返回PROTO_LOGIN_ACK確認消息給本機
3.本機A收到PROTO_LOGIN_ACK確認消息后,狀態變為STATUS_CONNECT,開始為建立p2p連接做准備,發送PROTO_CONNECT_REQ請求給服務器,服務端接收到本A端PROTO_CONNECT_REQ消息后,服務器回送PROTO_CONNECT_ACK確認消息和對端的地址信息給本機A,本機A狀態變為STATUS_P2P_CONNECT狀態。
4.服務端接收到本A端PROTO_CONNECT_REQ消息后,發送PROTO_NOTIFY_REQ請求(保護本端的地址信息)到對端B。對端B接收到PROTO_NOTIFY_REQ請求后,回送PROTO_NOTIFY_ACK確認消息給服務器,此時對端B狀態變為STATUS_P2P_CONNECT。
注意:如果無法建立P2P連接,則雙方的狀態停留在STATUS_P2P_CONNECT狀態,可以通過服務器進行轉發。而不需要進行p2p通信!
5.對端狀態為STATUS_P2P_CONNECT后,發生PROTO_P2P_CONNECT_REQ請求消息給本機端,打通對端-(NAT端口)-->本機。
6.對端狀態為STATUS_P2P_CONNECT后,發生PROTO_P2P_CONNECT_REQ請求消息給對端,打通本機端(NAT端口)--->對端。
注意:5、6是異步存在的!!
7.當客戶端接收到PROTO_P2P_CONNECT_REQ或者PROTO_P2P_CONNECT_ACK消息,本機狀態的狀態變為STATUS_MESSAGE。
之后可以正常的進行p2p通信!!!
二:代碼實現P2P程序
(一)頭文件p2p.h實現(含公共函數)
#ifndef __P2P_H__ #define __P2P_H__ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> //互聯網地址族 #include <time.h> //---------------------------定義數據占用空間大小--------------------------- #define CLIENT_MAX 1024 //定義客戶端中與對方連接的數量 #define CLIENT_ADDR_LENGTH 6 //定義空間存放客戶端地址信息,IP占4字節,端口占2字節 #define BUFFER_LENGTH 512 //定義發送和接收的緩沖區大小,512字節 #define NUMBER_ID_LENGTH 4 //定義客戶端ID的長度,占4字節 //---------------------------定義協議的狀態:注意響應比請求大於0x80,方便計算--------------------------- #define PROTO_LOGIN_REQ 0x01 //登錄服務器的請求與響應 #define PROTO_LOGIN_ACK 0x81 #define PROTO_HEARTBEAT_REQ 0x02 //心跳包的請求與響應,防止P2P連接被NAT網關關閉 #define PROTO_HEARTBEAT_ACK 0x82 #define PROTO_CONNECT_REQ 0x11 //連接請求與響應,向服務端發送P2P連接請求----(服務器與本端) #define PROTO_CONNECT_ACK 0x91 #define PROTO_NOTIFY_REQ 0x12 //服務端處理PROTO_CONNECT_REQ請求之后,發送PROTO_NOTIFY_REQ請求給對端----(服務器與對端) #define PROTO_NOTIFY_ACK 0x92 #define PROTO_P2P_CONNECT_REQ 0x13 //對端接收到PROTO_NOTIFY_REQ請求之后,開始與本端建立P2P連接;本端接收到PROTO_P2P_CONNECT_REQ之后,回送PROTO_P2P_CONNECT_ACK給對端,雙方狀態機變為P2P建立完成,可以進行P2P傳輸 #define PROTO_P2P_CONNECT_ACK 0x93 #define PROTO_MESSAGE_REQ 0x21 //原始數據到達(是添加了自定義的首部之后的數據)---包含服務端轉發和P2P發送!!! #define PROTO_MESSAGE_ACK 0xA1 //---------------------------定義協議的索引,和各個協議狀態對應的索引位置--------------------------- #define PROTO_BUFFER_VERSION_IDX 0 //版本字段位置索引,索引0,占1個字節 #define PROTO_BUFFER_STATUS_IDX 1 //協議的狀態信息,索引1,占1個字節 #define PROTO_BUFFER_LENGTH_IDX (PROTO_BUFFER_STATUS_IDX+1) //協議的長度字段,索引2,占2個字節 #define PROTO_BUFFER_SELFID_IDX (PROTO_BUFFER_LENGTH_IDX+2) //協議的本端的ID信息字段,索引4,占4個字節 //login #define PROTO_LOGIN_SELFID_IDX PROTO_BUFFER_SELFID_IDX //登錄時,需要添加本機的id到協議中去,在self id字段中,索引為4 //login ack #define PROTO_LOGIN_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4 //heartbeat #define PROTO_HEARTBEAT_SELFID_IDX PROTO_BUFFER_SELFID_IDX //心跳檢測,需要添加本機的id到協議中去,在self id字段中,索引為4 //heartbeat ack #define PROTO_HEARTBEAT_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4 //connect #define PROTO_CONNECT_SELFID_IDX PROTO_BUFFER_SELFID_IDX //連接相關,需要添加本端和對端的id信息,而本端的id放入self id字段,索引4 #define PROTO_CONNECT_OTHERID_IDX (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH) //對端的id放入other id字段,索引為8 //connect ack #define PROTO_CONNECT_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4 #define PROTO_CONNECT_ACK_OTHERID_IDX (PROTO_CONNECT_ACK_SELFID_IDX+NUMBER_ID_LENGTH) //對端的id放入other id字段,索引為8 #define PROTO_CONNECT_MESSAGE_ADDR_IDX (PROTO_CONNECT_ACK_OTHERID_IDX+NUMBER_ID_LENGTH) //這里開始存放地址數據,索引12。占6個字節,存放地址信息!!!---本機需要獲取到的地址信息,才能發送p2p請求,而之前並沒有獲取過這個數據,所以最好攜帶過去 //notify #define PROTO_NOTIFY_SELFID_IDX PROTO_BUFFER_SELFID_IDX //通知對端字段,需要添加本端Id信息放入self id字段,索引為4 #define PROTO_NOTIFY_OTHERID_IDX (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH) //對端的id放入other id字段,索引為8 #define PROTO_NOTIFY_MESSAGE_ADDR_IDX (PROTO_NOTIFY_OTHERID_IDX+NUMBER_ID_LENGTH) //這里開始存放地址數據,索引12。占6個字節,存放地址信息!!!---對端需要獲取到本機的地址信息,才能發送p2p請求,而之前並沒有獲取過這個數據,所以最好攜帶過去 //notify ack #define PROTO_NOTIFY_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX //回送確認消息,需要添加本端Id信息,放入self id字段,索引為4 //p2p connect #define PROTO_P2P_CONNECT_SELFID_IDX PROTO_BUFFER_SELFID_IDX //P2P連接請求時,需要加入本端的Id信息放入self id這段,索引為4 //p2p connect ack #define PROTO_P2P_CONNECT_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX //P2P連接響應時,需要加入本端的Id信息放入self id這段,索引為4 //message #define PROTO_MESSAGE_SELFID_IDX PROTO_BUFFER_SELFID_IDX //開始發送數據,需要添加本端Id信息,放入self id字段,索引為4 #define PROTO_MESSAGE_OTHERID_IDX (PROTO_MESSAGE_SELFID_IDX+NUMBER_ID_LENGTH) //需要加入對端ID信息到other id字段中,索引為8 #define PROTO_MESSAGE_CONTENT_IDX (PROTO_MESSAGE_OTHERID_IDX+NUMBER_ID_LENGTH) //從這里開始添加數據,索引為12 //message ack #define PROTO_MESSAGE_ACK_SELFID_IDX PROTO_BUFFER_SELFID_IDX //數據發送結束,需要進行響應,索引為4 #define PROTO_MESSAGE_ACK_OTHERID_IDX (PROTO_BUFFER_SELFID_IDX+NUMBER_ID_LENGTH) //數據發送結束,需要進行響應,索引為4 typedef unsigned int U32; typedef unsigned short U16; typedef unsigned char U8; //volatile的學習:https://www.runoob.com/w3cnote/c-volatile-keyword.html typedef volatile long UATOMIC; //當要求使用 volatile 聲明的變量的值的時候,系統總是重新從它所在的內存讀取數據,即使它前面的指令剛剛從該處讀取過數據。 //可以用於實現原語操作 //定義回調函數 typedef void* (*CALLBACK)(void* arg); //定義返回狀態 typedef enum{ RESULT_FAILED = -1, RESULT_SUCCESS = 0 }RESULT; //---------------------------定義客戶端狀態--------------------------- typedef enum { STATUS_INIT, STATUS_LOGIN, STATUS_HEARTBEAT, STATUS_CONNECT, STATUS_NOTIFY, STATUS_P2P_CONNECT, STATUS_MESSAGE } STATUS_SET; //---------------------------定義一個映射結構體,id==>地址和時間戳信息--------------------------- typedef struct __CLIENT_TABLE { U8 addr[CLIENT_ADDR_LENGTH]; //6字節存放地址信息 U32 client_id; //4字節存放客戶端id long stamp; //存放時間戳信息 }client_table; //---------------------------服務器端數據結構--------------------------- int client_count = 0; client_table table[CLIENT_MAX] = {0}; //---------------------------客戶端端數據結構--------------------------- //---------------------------服務器端函數--------------------------- /* cmpxchg(void* ptr, int old, int new) 如果ptr和old的值一樣,則把new寫到ptr內存, 否則寫入ptr的值到old中 整個操作是原子的。 res返回值為0(失敗)或1(成功)表明cas(對比和替換)操作是否成功. 下面__asm__學習:https://www.jianshu.com/p/fa6d9d9c63b4 -----------`x++`是否是原子的? 不是,是3個指令,`取x,x+1,存入x`。 >在單處理器上,如果執行x++時,禁止多線程調度,就可以實現原子。因為單處理的多線程並發是偽並發。 在多處理器上,需要借助cpu提供的Lock功能。 鎖總線。讀取內存值,修改,寫回內存三步期間禁止別的CPU訪問總線。 同時我估計使用Lock指令鎖總線的時候,OS也不會把當前線程調度走了。要是調走了,那就麻煩了。 */ static unsigned long cmpxchg(UATOMIC* addr,unsigned long _old,unsigned long _new){ U8 res; //"__asm__"表示后面的代碼為內嵌匯編 //"__volatile__"表示編譯器不要優化代碼,后面的指令保留原樣,"volatile"是它的別名 __asm__ volatile ( "lock; cmpxchg %3, %1;sete %0" //加鎖以及比較和替換原子操作,按后面順序ret 0 , addr 1 , old 2, new 3 : "=a" (res) //"=a"是說要把__asm__操作結果寫到__ret中 : "m" (*addr), "a" (_old), "r" (_new) //各個值存放的位置 : "cc", "memory"); return res; //返回結果,0(失敗)或1(成功) } //返回時間戳信息 static long time_generator(){ static long lTimeStamp = 0; //局部靜態變量 static long timeStampMutex = 0; //局部靜態變量 if(cmpxchg(&timeStampMutex,0,1)){ //注意:只有TimeStampMutex原子操作成功才行進入下面語句 lTimeStamp = time(NULL); //生成時間戳,精度為s timeStampMutex = 0; } return lTimeStamp; //返回時間戳信息 } //將sockaddr地址轉為array格式 static void addr_to_array(U8 *array, struct sockaddr_in *p_addr){ //存放IP和端口,需要6個字節 int i = 0; for(i = 0; i < 4; i++){ array[i] = *((unsigned char*)(&(p_addr->sin_addr.s_addr))+i); //獲取IP,順序存儲 } for(i = 0; i < 2; i++){ array[4+i] = *((unsigned char*)(&(p_addr->sin_port))+i); //獲取Port信息 } } //將array數組轉為sockaddr地址格式 static void array_to_addr(U8 *array,struct sockaddr_in *p_addr){ int i=0; for(i = 0;i < 4;i++){ *((unsigned char*)(&p_addr->sin_addr.s_addr)+i) = array[i]; //獲取IP,存放到sockaddr_in格式 } for(i = 0;i < 2;i++){ *((unsigned char*)(&p_addr->sin_port)+i) = array[4+i]; //獲取Port,存放到sockaddr_in格式 } } static int get_index_by_clientid(int client_id){ int i = 0; int now_count = client_count; for(i = 1;i<=now_count;i++){ if(table[i].client_id == client_id) return i; } return RESULT_FAILED; } static int deal_connect_req(int sockfd,int client_id,int other_id){ U8 buffer[BUFFER_LENGTH] = {0}; buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_NOTIFY_REQ; //發送PROTO_NOTIFY_REQ請求 buffer[PROTO_NOTIFY_SELFID_IDX] = client_id; buffer[PROTO_NOTIFY_OTHERID_IDX] = other_id; int index = get_index_by_clientid(client_id); //獲取本端信息,一會發送給對端 //填充數據,6字節的IP和端口信息 memcpy(buffer+PROTO_NOTIFY_MESSAGE_ADDR_IDX,table[index].addr,CLIENT_ADDR_LENGTH); index = get_index_by_clientid(other_id); //獲取對端信息,開始發送 //獲取sockaddr信息 struct sockaddr_in c_addr; c_addr.sin_family = AF_INET; array_to_addr(table[index].addr,&c_addr); int len = PROTO_NOTIFY_MESSAGE_ADDR_IDX + BUFFER_LENGTH; //18字節,12的頭部,6字節的數據 len = sendto(sockfd,buffer,len,0,(struct sockaddr*)&c_addr,sizeof(c_addr)); if(len < 0){ printf("Failed in deal_connect_req, send to other peer:%d\n",other_id); return RESULT_FAILED; } return RESULT_SUCCESS; } static int deal_connect_ack(int sockfd,int client_id,int other_id){ //可以和deal_connect_req合並 //printf("call deal_connect_ack!\n"); U8 buffer[BUFFER_LENGTH] = {0}; buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_CONNECT_ACK; //回送PROTO_CONNECT_ACK buffer[PROTO_NOTIFY_SELFID_IDX] = client_id; buffer[PROTO_NOTIFY_OTHERID_IDX] = other_id; int index = get_index_by_clientid(other_id); //獲取本端信息,一會發送給對端 //填充數據,6字節的IP和端口信息 memcpy(buffer+PROTO_CONNECT_MESSAGE_ADDR_IDX,table[index].addr,CLIENT_ADDR_LENGTH); index = get_index_by_clientid(client_id); //獲取對端信息,開始發送 //獲取sockaddr信息 struct sockaddr_in c_addr; c_addr.sin_family = AF_INET; array_to_addr(table[index].addr,&c_addr); int len = PROTO_NOTIFY_MESSAGE_ADDR_IDX + BUFFER_LENGTH; //18字節,12的頭部,6字節的數據 len = sendto(sockfd,buffer,len,0,(struct sockaddr*)&c_addr,sizeof(c_addr)); if(len < 0){ printf("Failed in deal_connect_ack, send to client peer:%d\n",client_id); return RESULT_FAILED; } return RESULT_SUCCESS; } static int deal_message_req(int sockfd,int other_id,U8 *buffer,int length){ int index = get_index_by_clientid(other_id); //獲取對端信息,開始發送 //獲取sockaddr信息 struct sockaddr_in c_addr; c_addr.sin_family = AF_INET; array_to_addr(table[index].addr,&c_addr); //printf("send to peer: %d.%d.%d.%d:%d\n",table[index].addr[0],table[index].addr[1],table[index].addr[2],table[index].addr[3],c_addr.sin_port); int n = sendto(sockfd,buffer,length,0,(struct sockaddr*)&c_addr,sizeof(c_addr)); if(n < 0){ printf("Failed in deal_message_req!\n"); return RESULT_FAILED; } return RESULT_SUCCESS; } static int deal_ack(int sockfd,struct sockaddr_in *c_addr,U8 *buffer,int length){ //處理通用ACK消息,原來協議+0x80即可 buffer[PROTO_BUFFER_STATUS_IDX] += 0x80; int n = sendto(sockfd,buffer,length,0,(struct sockaddr*)c_addr,sizeof(*c_addr)); if(n < 0){ printf("Failed in deal_ack!\n"); return RESULT_FAILED; } return RESULT_SUCCESS; } //---------------------------客戶端函數--------------------------- static int send_login_req(int sockfd,int client_id,struct sockaddr_in *ser_addr){ U8 buffer[BUFFER_LENGTH] = {0}; //buffer長度512 buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_LOGIN_REQ; *(int *)(buffer+PROTO_LOGIN_SELFID_IDX) = client_id; int n = PROTO_LOGIN_SELFID_IDX + NUMBER_ID_LENGTH; n = sendto(sockfd,buffer,n,0,(struct sockaddr*)ser_addr,sizeof(struct sockaddr_in)); if(n < 0){ printf("Failed to login server!\n"); return RESULT_FAILED; } return RESULT_SUCCESS; } static int get_other_id(U8 *buffer,int *other_id){ int id=0,i; for(i=2;buffer[i]!=':'&&buffer[i]!='\0';i++){ //還可以進行其他嚴格處理 id += id*10 + buffer[i]-'0'; } *other_id = id; return i; //返回索引 } static int send_connect_req(int sockfd,int client_id,int other_id,struct sockaddr_in *ser_addr){ U8 buffer[BUFFER_LENGTH] = {0}; //buffer長度512 buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_CONNECT_REQ; *(int *)(buffer+PROTO_CONNECT_SELFID_IDX) = client_id; *(int *)(buffer+PROTO_CONNECT_OTHERID_IDX) = other_id; int n = PROTO_CONNECT_OTHERID_IDX + NUMBER_ID_LENGTH; n = sendto(sockfd,buffer,n,0,(struct sockaddr*)ser_addr,sizeof(struct sockaddr_in)); if(n < 0){ printf("Failed to login server!\n"); return RESULT_FAILED; } return RESULT_SUCCESS; } static int send_message(int sockfd,int client_id,int other_id,struct sockaddr_in *addr,U8 *msg,int length){ U8 buffer[BUFFER_LENGTH] = {0}; buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_MESSAGE_REQ; //處理消息 *(int*)(buffer+PROTO_MESSAGE_SELFID_IDX) = client_id; *(int*)(buffer+PROTO_MESSAGE_OTHERID_IDX) = other_id; memcpy(buffer + PROTO_MESSAGE_CONTENT_IDX,msg,length); //初始化數據部分 int n = PROTO_MESSAGE_CONTENT_IDX + length; *(U16*)(buffer+PROTO_BUFFER_LENGTH_IDX) = (U16)n; //存放數據長度 n = sendto(sockfd,buffer,n,0,(struct sockaddr*)addr,sizeof(struct sockaddr_in)); if(n < 0){ printf("Failed to send message to peer!\n"); return RESULT_FAILED; } return RESULT_SUCCESS; } static int send_p2pconnect(int sockfd,int client_id,struct sockaddr_in *p_addr){ U8 buffer[BUFFER_LENGTH] = {0}; buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_P2P_CONNECT_REQ; *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX) = client_id; int n = PROTO_P2P_CONNECT_SELFID_IDX + NUMBER_ID_LENGTH; n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in)); if(n<0){ printf("Failed to send p2p connect req!\n"); return RESULT_FAILED; } return RESULT_SUCCESS; } static int send_p2pconnect_ack(int sockfd,int client_id,struct sockaddr_in *p_addr){ U8 buffer[BUFFER_LENGTH] = {0}; buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_P2P_CONNECT_ACK; *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX) = client_id; int n = PROTO_P2P_CONNECT_SELFID_IDX + NUMBER_ID_LENGTH; n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in)); if(n < 0){ printf("Failed to send p2p connect ack!\n"); return RESULT_FAILED; } return RESULT_SUCCESS; } static int send_message_ack(int sockfd,int client_id,int other_id,struct sockaddr_in *p_addr){ U8 buffer[BUFFER_LENGTH] = {0}; buffer[PROTO_BUFFER_STATUS_IDX] = PROTO_MESSAGE_ACK; *(int*)(buffer+PROTO_MESSAGE_ACK_SELFID_IDX) = client_id; *(int*)(buffer+PROTO_MESSAGE_ACK_OTHERID_IDX) = other_id; int n=PROTO_MESSAGE_ACK_OTHERID_IDX + NUMBER_ID_LENGTH; n = sendto(sockfd,buffer,n,0,(struct sockaddr*)p_addr,sizeof(struct sockaddr_in)); if(n < 0){ printf("Failed to send message ack"); return RESULT_FAILED; } return RESULT_SUCCESS; } #endif
(二)服務端p2p_server.c實現(簡單通信)
#include "p2p.h" int recv_buffer_parser(int sockfd,U8 *buffer,U32 length,struct sockaddr_in *c_addr){ //length是傳遞過來的數據長度 U8 status = buffer[PROTO_BUFFER_STATUS_IDX]; //解析狀態 //printf("recv_buffer_parser --->status: %d\n",status); int client_id,other_id,index; int old,now; U8 *msg; switch(status){ case PROTO_LOGIN_REQ: //處理登錄請求 printf("recv login req!\n"); old = client_count; now = old + 1; if(0 == cmpxchg((UATOMIC*)&client_count,old,now)){ //使用原子操作賦值 printf("client_count --> %d,old:%d,now:%d\n", client_count,old,now); return RESULT_FAILED; } //開始登錄新用戶的信息 U8 array[CLIENT_ADDR_LENGTH] = {0}; //6字節存放地址IP:Port信息 addr_to_array(array,c_addr); client_id = *(U32*)(buffer+PROTO_BUFFER_SELFID_IDX); printf("now:%d client:[%d],login ---> %d.%d.%d.%d:%d\n",now,client_id, *(unsigned char*)(&c_addr->sin_addr.s_addr), *((unsigned char*)(&c_addr->sin_addr.s_addr)+1), *((unsigned char*)(&c_addr->sin_addr.s_addr)+2), *((unsigned char*)(&c_addr->sin_addr.s_addr)+3), c_addr->sin_port); table[now].client_id = client_id; //獲取4字節長度的用戶id信息 memcpy(table[now].addr,array,CLIENT_ADDR_LENGTH); //獲取用戶的Addr地址信息 //需要回送確認消息----------- deal_ack(sockfd,c_addr,buffer,length); break; case PROTO_HEARTBEAT_REQ: //處理心跳包請求 printf("recv heartbeat req!\n"); client_id = *(unsigned int*)(buffer+PROTO_HEARTBEAT_SELFID_IDX); index = get_index_by_clientid(client_id); table[index].stamp = time_generator(); //需要回送確認消息----------- deal_ack(sockfd,c_addr,buffer,length); break; case PROTO_CONNECT_REQ: //處理連接請求 client_id = *(unsigned int*)(buffer+PROTO_CONNECT_SELFID_IDX); //獲取本機id other_id = *(unsigned int*)(buffer+PROTO_CONNECT_OTHERID_IDX); //獲取對端id printf("recv connect req from %d to %d!\n",client_id,other_id); deal_connect_req(sockfd,client_id,other_id); //處理連接請求,1.向對端發送信息 deal_connect_ack(sockfd,client_id,other_id); //2.回送確認消息 break; case PROTO_NOTIFY_ACK: //處理對端發送回來的確認消息,無用 printf("recv other notify ack message\n"); break; case PROTO_MESSAGE_REQ: //處理要經過服務器轉發的數據和p2p無法建立的時候使用 printf("recv message req!\n"); msg = buffer + PROTO_MESSAGE_CONTENT_IDX; //獲取要發送的數據 client_id = *(unsigned int*)(buffer+PROTO_MESSAGE_SELFID_IDX); other_id = *(unsigned int*)(buffer+PROTO_MESSAGE_OTHERID_IDX); printf("Client[%d] send to Other[%d]:%s\n",client_id,other_id,msg); deal_message_req(sockfd,other_id,buffer,length); //進行轉發 break; case PROTO_MESSAGE_ACK: //轉發確認消息 printf("recv message ack!\n"); client_id = *(unsigned int*)(buffer+PROTO_MESSAGE_SELFID_IDX); other_id = *(unsigned int*)(buffer+PROTO_MESSAGE_OTHERID_IDX); printf("Client[%d] send ack to Other[%d]\n",client_id,other_id); deal_message_req(sockfd,other_id,buffer,length); break; } return RESULT_SUCCESS; } int main(int argc,char *argv[]){ int sockfd; int n,length; char buffer[BUFFER_LENGTH] = {0}; struct sockaddr_in addr,c_addr; printf("UDP Server......\n"); if(argc != 2){ printf("Usage: %s port\n",argv[0]); exit(0); } sockfd = socket(AF_INET,SOCK_DGRAM,0); //獲取通信socket if(sockfd < 0){ printf("Failed to open udp socket!\n"); exit(0); } addr.sin_family = AF_INET; addr.sin_port = htons(atoi(argv[1])); //獲取端口信息 addr.sin_addr.s_addr = htonl(INADDR_ANY); //允許接收所有網卡的到達數據 length = sizeof(addr); if(bind(sockfd,(struct sockaddr*)&addr,length) < 0){ printf("Failed to bind udp socket with ip port"); exit(0); } while(1){ n = recvfrom(sockfd,buffer,BUFFER_LENGTH,0,(struct sockaddr*)&c_addr,&length); if(n > 0){ buffer[n] = 0x0; //設置結束符號 /* printf("%d.%d.%d.%d:%d say:%s\n", *(unsigned char*)(&c_addr.sin_addr.s_addr),*((unsigned char*)(&c_addr.sin_addr.s_addr)+1), *((unsigned char*)(&c_addr.sin_addr.s_addr)+2),*((unsigned char*)(&c_addr.sin_addr.s_addr)+3), c_addr.sin_port, buffer); //打印接收到的數據信息 */ int ret = recv_buffer_parser(sockfd,buffer,n,&c_addr); //解析接收的數據,存儲相關信息 if(ret == RESULT_FAILED) continue; }else if(n == 0){ printf("client closed!\n"); }else{ printf("recv error\n"); break; } } return 0; }
(三)客戶端代碼實現(狀態機轉換,p2p通信)
#include "p2p.h" #include <pthread.h> static int status_machine = STATUS_INIT; //狀態機 static int client_selfid = 0x0; //默認本端的id,需要在main方法中輸入 struct sockaddr_in server_addr; //服務端的信息 client_table p2p_clients[CLIENT_MAX]; //可以連接的P2P對端最大數量 static int p2p_count = 0; static int buffer_parser(int sockfd,U8 *buffer,int length,struct sockaddr_in *addr){ U8 status = buffer[PROTO_BUFFER_STATUS_IDX]; //獲取狀態 U8 *msg; struct sockaddr_in p_addr; //獲取對端的地址信息 //printf("buffer_parser...%d\n",status); switch(status){ case PROTO_LOGIN_ACK: //處理登錄確認 printf(" Connect Server Success\n"); status_machine = STATUS_CONNECT; //狀態轉移 break; case PROTO_HEARTBEAT_ACK: //printf("recv heartbeat ack!\n"); break; case PROTO_NOTIFY_REQ: //處理服務端發送的NOTIFY請求 //printf("recv notify req!\n"); //獲取對端的數據信息 p_addr.sin_family = AF_INET; array_to_addr(buffer+PROTO_NOTIFY_MESSAGE_ADDR_IDX,&p_addr); //回復確認消息給服務器 buffer[PROTO_BUFFER_STATUS_IDX] += 0x80; sendto(sockfd,buffer,PROTO_NOTIFY_MESSAGE_ADDR_IDX,0,(struct sockaddr*)&server_addr,sizeof(struct sockaddr_in)); status_machine = STATUS_NOTIFY; //開始打洞 send_p2pconnect(sockfd,client_selfid,&p_addr); //開始打洞!!! if(status_machine != STATUS_MESSAGE){ //注意:需要進行判斷,因為是異步操作,所以本機接到NOTIFY請求的時候,可能已經接到對端的P2P連接請求,狀態已經變為STATUS_MESSAGE,那么我們不能再變為未就緒狀態 status_machine = STATUS_P2P_CONNECT; } break; case PROTO_CONNECT_ACK: //處理CONNECT 確認 //printf("recv connect ack!\n"); //獲取對端的數據信息 p_addr.sin_family = AF_INET; array_to_addr(buffer+PROTO_CONNECT_MESSAGE_ADDR_IDX,&p_addr); send_p2pconnect(sockfd,client_selfid,&p_addr); //開始打洞!!! if(status_machine != STATUS_MESSAGE){ //注意:需要進行判斷,因為是異步操作,所以本機接到NOTIFY請求的時候,可能已經接到對端的P2P連接請求,狀態已經變為STATUS_MESSAGE,那么我們不能再變為未就緒狀態 status_machine = STATUS_P2P_CONNECT; } break; case PROTO_P2P_CONNECT_REQ: //處理p2p連接請求---表示打洞成功,添加即可 if(status_machine != STATUS_MESSAGE){ //printf("recv p2p connect req!\n"); int now_count = p2p_count++; p2p_clients[now_count].stamp = time_generator(); p2p_clients[now_count].client_id = *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX); addr_to_array(p2p_clients[now_count].addr,addr); send_p2pconnect_ack(sockfd,client_selfid,addr); status_machine = STATUS_MESSAGE; printf("Enter P2P Model!\n"); } break; case PROTO_P2P_CONNECT_ACK: //處理p2p連接確認---表示打洞成功,添加即可 if(status_machine != STATUS_MESSAGE){ //printf("recv p2p connect ack!\n"); int now_count = p2p_count++; p2p_clients[now_count].stamp = time_generator(); p2p_clients[now_count].client_id = *(int*)(buffer+PROTO_P2P_CONNECT_SELFID_IDX); addr_to_array(p2p_clients[now_count].addr,addr); send_p2pconnect_ack(sockfd,client_selfid,addr); status_machine = STATUS_MESSAGE; printf("Enter P2P Model!\n"); } break; case PROTO_MESSAGE_REQ: //p2p數據到達 //printf("recv p2p data....\n"); msg = buffer + PROTO_MESSAGE_CONTENT_IDX; U32 other_id = *(U32*)(buffer+PROTO_MESSAGE_SELFID_IDX); printf("recv p2p data:%s from:%d\n",msg,other_id); send_message_ack(sockfd,client_selfid,other_id,addr); break; case PROTO_MESSAGE_ACK: //printf("peer recv message, and send ack to me!\n"); break; } } void *recv_callback(void *arg){ int sockfd = *(int*)arg; //獲取sockfd struct sockaddr_in addr; int length = sizeof(struct sockaddr_in); U8 buffer[BUFFER_LENGTH] = {0}; while(1){ int n = recvfrom(sockfd,buffer,BUFFER_LENGTH,0,(struct sockaddr*)&addr,&length); printf("recvfrom data...\n"); if(n > 0){ buffer[n] = 0; buffer_parser(sockfd,buffer,n,&addr); //解析數據 }else if(n == 0){ printf("server closed\n"); close(sockfd); break; }else{ printf("Failed to call recvfrom\n"); close(sockfd); break; } } } void *send_callback(void *arg){ //線程處理發送消息 int sockfd = *(int*)arg; //獲取sockfd char buffer[BUFFER_LENGTH] = {0}; while(1){ bzero(buffer,BUFFER_LENGTH); //置為0 //printf("===client status====%d===\n",status_machine); if(status_machine == STATUS_CONNECT){ printf("-----> please enter message(eg. C/S otherID: ...):\n"); gets(buffer); //獲取要輸入的數據 //如果是登錄狀態,可以進行p2p連接或者服務器轉發 int other_id,idx; idx = get_other_id(buffer,&other_id); //printf("%d--->%d\n",client_selfid,other_id); if(buffer[0] == 'C'){ //開始進行P2P連接 send_connect_req(sockfd,client_selfid,other_id,&server_addr); }else{ int length = strlen(buffer); send_message(sockfd,client_selfid,other_id,&server_addr,buffer+idx+1,length-idx-1); //發送給服務器進行轉發 } sleep(1); //等待建立p2p連接 }else if(status_machine == STATUS_MESSAGE){ //可以進行P2P通信 printf("-----> please enter p2p message:\n"); gets(buffer); //獲取要輸入的數據 //與最新加入的進行p2p通信 int now_count = p2p_count; //這個是最新的序號 struct sockaddr_in c_addr; //對端的地址信息 c_addr.sin_family = AF_INET; array_to_addr(p2p_clients[now_count-1].addr,&c_addr); int length = strlen(buffer); send_message(sockfd,client_selfid,0,&c_addr,buffer,length); //直接發送給對端,P2P通信 }else if(status_machine == STATUS_NOTIFY || status_machine == STATUS_P2P_CONNECT ){ printf("-----> please enter message(S otherID:...):\n"); printf("status:%d\n",status_machine); //scanf("%s",buffer); //獲取要輸入的數據 gets(buffer); //獲取要輸入的數據 int length = strlen(buffer); int other_id,idx; idx = get_other_id(buffer,&other_id); send_message(sockfd,client_selfid,other_id,&server_addr,buffer+idx+1,length-idx-1); //發送給服務器進行轉發 } } } int main(int argc,char *argv[]){ printf("UDP Client......\n"); if(argc != 4){ printf("Usage: %s serverIp serverPort clientID\n",argv[0]); exit(0); } int sockfd = socket(AF_INET,SOCK_DGRAM,0); if(sockfd < 0){ printf("Failed to create socket!\n"); exit(0); } //創建兩個線程,分別處理接收和發送信息 pthread_t thread_id[2] = {0}; CALLBACK cb[2] = {send_callback,recv_callback}; int i; for(i=0;i<2;i++){ int ret = pthread_create(&thread_id[i],NULL,cb[i],&sockfd); //創建線程,獲取線程號,傳入回調方法和參數 if(ret){ printf("Failed to create thread!\n"); exit(1); } } //主線程進行登錄操作 server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = inet_addr(argv[1]); server_addr.sin_port = htons(atoi(argv[2])); client_selfid = atoi(argv[3]); status_machine = STATUS_LOGIN; //修改客戶端當前狀態 send_login_req(sockfd,client_selfid,&server_addr); //發送登錄請求 for(i = 0;i<2;i++){ pthread_join(thread_id[i],NULL); //join子線程 } return 0; }
(四)程序編譯
1.編譯服務端
gcc p2p_server.c -o ps
2.編譯客戶端
gcc p2p_client.c -o pc -lpthread
(五)代碼測試
1.服務端查看:
2.客戶端1查看
3.客戶端2查看