轉自 https://github.com/kejinlu/objc-doc/blob/master/Socket%E7%BC%96%E7%A8%8B.md
大綱
- 一.Socket簡介
- 二.BSD Socket編程准備
- 1.地址
- 2.端口
- 3.網絡字節序
- 4.半相關與全相關
- 5.網絡編程模型
- 三.socket接口編程示例
- 四.使用select
- 五.使用kqueue
- 六.使用流
注:文檔中設計涉及的代碼也都在本人github目錄下,分別為socketServer和socketClient.對應着各個分支。
一.Socket簡介
在UNIX系統中,萬物皆文件(Everything is a file)。所有的IO操作都可以看作對文件的IO操作,都遵循着這樣的操作模式:打開 -> 讀/寫 -> 關閉,打開操作(如open函數)獲取“文件”使用權,返回文件描述符,后繼的操作都通過這個文件描述符來進行。很多系統調用都依賴於文件描述符,它是一個無符號整數,每一個用戶進程都對應着一個文件描述符表,通過文件描述符就可以找到對應文件的信息。 在類UNIX平台上,對於控制台的標准輸入輸出以及標准錯誤輸出都有對應的文件描述符,分別為0,1,2。它們定義在 unistd.h
中
#define STDIN_FILENO 0 /* standard input file descriptor */
#define STDOUT_FILENO 1 /* standard output file descriptor */
#define STDERR_FILENO 2 /* standard error file descriptor */
在Mac系統中,可以通過Activity Monitor來查看某個進程打開的文件和端口。
UNIX內核加入TCP/IP協議的時候,便在系統中引入了一種新的IO操作,只不過由於網絡連接的不可靠性,所以網絡IO比本地設備的IO復雜很多。這一系列的接口叫做BSD Socket API,當初由伯克利大學研發,最終成為網絡開發接口的標准。 網絡通信從本質上講也是進程間通信,只是這兩個進程一般在網絡中不同計算機上。當然Socket API其實也提供了專門用於本地IPC的使用方式:UNIX Domain Socket,這個這里就不細說了。本文所講的Socket如無例外,均是說的Internet Socket。
在本地的進程中,每一個進程都可以通過PID來標識,對於網絡上的一個計算機中的進程如何標識呢?網絡中的計算機可以通過一個IP地址進行標識,一個計算機中的某個進程則可以通過一個無符號整數(端口號)來標識,所以一個網絡中的進程可以通過IP地址+端口號
的方式進行標識。
二.BSD Socket編程准備
1.地址
在程序中,我們如何保存一個地址呢?在 <sys/socket.h>
中的sockaddr便是描述socket地址的結構體類型.
/*
* [XSI] Structure used by kernel to store most addresses.
*/
struct sockaddr {
__uint8_t sa_len; /* total length */
sa_family_t sa_family; /* [XSI] address family */
char sa_data[14]; /* [XSI] addr value (actually larger) */
};
為了方便設置用語網絡通信的socket地址,引入了sockaddr_in結構體(對於UNIX Domain Socket則對應sockaddr_un)
/*
* Socket address, internet style.
*/
struct sockaddr_in {
__uint8_t sin_len;
sa_family_t sin_family;
in_port_t sin_port;//得是網絡字節序
struct in_addr sin_addr;//in_addr存在的原因則是歷史原因,其實質是代表一個IP地址的32位整數
char sin_zero[8];//bzero之,純粹是為了兼容sockaddr
};
在實際編程的時候,經常需要將sockaddr_in強制轉換成sockaddr類型。
2.端口
說到端口我們經常會聯想到硬件,在網絡編程中的端口其實是一個標識而已,或者說是系統的資源而已。系統提供了端口分配和管理的機制。
3.網絡字節序
談網絡字節序(Endianness)之前我們先說說什么是字節序。字節序又叫端序,就是指計算機中存放 多字節數據的字節的順序。典型的就是數據存放在內存中或者網絡傳輸時的字節的順序。常用的字節序有大端序(big-endian),小端序(litle-endian,另還有不常見的混合序middle-endian)。不同的CPU可能會使用不同的字節序,如X86,PDP-11等處理器為小端序,Motorola 6800,PowerPC 970等使用的是大端序。小端序是指低字節位存放在內存地址的低端,高端序是指高位字節存放在內存的低端。 舉個例子來說明什么是大端序和小端序: 比如一個4字節的整數 16進制形式為 0x12345678,最左邊是高位。
大端序
低位 | > > | > > | 高位 |
---|---|---|---|
12 | 34 | 56 | 78 |
小端序
低位 | > > | > > | 高位 |
---|---|---|---|
78 | 56 | 34 | 12 |
TCP/IP 各層協議將字節序使用的是大端序,我們把TCP/IP協議中使用的字節序稱之為網絡字節序。 編程的時候可以使用定義在sys/_endian.h
中的相關的接口進行本地字節序和網絡字節序的互轉。
#define ntohs(x) __DARWIN_OSSwapInt16(x) // 16位整數 網絡字節序轉主機字節序
#define htons(x) __DARWIN_OSSwapInt16(x) // 16位整數 主機字節序轉網絡字節序
#define ntohl(x) __DARWIN_OSSwapInt32(x) //32位整數 網絡字節序轉主機字節序
#define htonl(x) __DARWIN_OSSwapInt32(x) //32位整數 主機字節序轉網絡字節序
以上聲明中 n代表netwrok, h代表host ,s代表short,l代表long
如果數據是單字節的話,則其沒有字節序的說法了。
4.半相關與全相關
半相關(half-association)是指一個三元組 (協議,本地IP地址,本地端口)
,通過這個三元組就可以唯一標識一個網絡中的進程,一般用於listening socket。但是實際進行通信的過程,至少需要兩個進程,且它們所使用的協議必須一致,所以一個完成的網絡通信至少需要一個五元組表示(協議,本地地址,本地端口,遠端地址,遠端端口)
,這樣的五元組叫做全相關。
5.網絡編程模型
網絡存在的本質其實就是網絡中個體之間的在某個領域的信息存在不對等性,所以一般情況下總有一些個體為另一些個體提供服務。提供服務器的我們把它叫做服務器,接受服務的叫做客戶端。所以在網絡編程中,也存在服務器端和客戶端之分。
服務器端 | 客戶端 |
---|---|
創建Socket | - |
將Socket和本地的地址端口綁定 | - |
開始進行偵聽 | 創建一個Socket和服務器的地址並通過它們向服務器發送連接請求 |
握手成功,接受請求,得到一個新的Socket,通過它可以和客戶端進行通信 | 連接成功,客戶端的Socket會綁定到系統分配的一個端口上,並可以通過它和服務器端進行通信 |
三.BSD Socket編程詳解
下面的例子是一個簡單的一對一聊天的程序,分服務器和客戶端,且發送消息和接受消息次序固定。
Server端代碼
#include <stdio.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
int main (int argc, const char * argv[])
{
struct sockaddr_in server_addr;
server_addr.sin_len = sizeof(struct sockaddr_in);
server_addr.sin_family = AF_INET;//Address families AF_INET互聯網地址簇
server_addr.sin_port = htons(11332);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
bzero(&(server_addr.sin_zero),8);
//創建socket
int server_socket = socket(AF_INET, SOCK_STREAM, 0);//SOCK_STREAM 有連接
if (server_socket == -1) {
perror("socket error");
return 1;
}
//綁定socket:將創建的socket綁定到本地的IP地址和端口,此socket是半相關的,只是負責偵聽客戶端的連接請求,並不能用於和客戶端通信
int bind_result = bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (bind_result == -1) {
perror("bind error");
return 1;
}
//listen偵聽 第一個參數是套接字,第二個參數為等待接受的連接的隊列的大小,在connect請求過來的時候,完成三次握手后先將連接放到這個隊列中,直到被accept處理。如果這個隊列滿了,且有新的連接的時候,對方可能會收到出錯信息。
if (listen(server_socket, 5) == -1) {
perror("listen error");
return 1;
}
struct sockaddr_in client_address;
socklen_t address_len;
int client_socket = accept(server_socket, (struct sockaddr *)&client_address, &address_len);
//返回的client_socket為一個全相關的socket,其中包含client的地址和端口信息,通過client_socket可以和客戶端進行通信。
if (client_socket == -1) {
perror("accept error");
return -1;
}
char recv_msg[1024];
char reply_msg[1024];
while (1) {
bzero(recv_msg, 1024);
bzero(reply_msg, 1024);
printf("reply:");
scanf("%s",reply_msg);
send(client_socket, reply_msg, 1024, 0);
long byte_num = recv(client_socket,recv_msg,1024,0);
recv_msg[byte_num] = '\0';
printf("client said:%s\n",recv_msg);
}
return 0;
}
Client端代碼
#include <stdio.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
int main (int argc, const char * argv[])
{
struct sockaddr_in server_addr;
server_addr.sin_len = sizeof(struct sockaddr_in);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(11332);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
bzero(&(server_addr.sin_zero),8);
int server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket == -1) {
perror("socket error");
return 1;
}
char recv_msg[1024];
char reply_msg[1024];
if (connect(server_socket, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))==0) {
//connect 成功之后,其實系統將你創建的socket綁定到一個系統分配的端口上,且其為全相關,包含服務器端的信息,可以用來和服務器端進行通信。
while (1) {
bzero(recv_msg, 1024);
bzero(reply_msg, 1024);
long byte_num = recv(server_socket,recv_msg,1024,0);
recv_msg[byte_num] = '\0';
printf("server said:%s\n",recv_msg);
printf("reply:");
scanf("%s",reply_msg);
if (send(server_socket, reply_msg, 1024, 0) == -1) {
perror("send error");
}
}
}
// insert code here...
printf("Hello, World!\n");
return 0;
}
上面的服務器端和客戶端連接成功之后打開的端口的情況是怎么樣的呢?
由於accept只運行了一次,所以服務器端一次只能和一個客戶端進行通信,且使用的send和recv方法都是阻塞的,所以上面這個例子存在一個問題就是服務器端客戶端連接成功之后,發送,接受,發送,接受的次序就被固定了。比如服務器端發送消息之后就等客戶端發送消息了,沒有接受到客戶端的消息之前服務器端是沒有辦法發送消息的。使用select這個這個系統調用可以解決上面的問題。
四.使用select
select這個系統調用,是一種多路復用IO方案,可以同時對多個文件描述符進行監控,從而知道哪些文件描述符可讀,可寫或者出錯,不過select方法是阻塞的,可以設定超時時間。 select使用的步驟如下:
- 1.創建一個fd_set變量(fd_set實為包含了一個整數數組的結構體),用來存放所有的待檢查的文件描述符
- 2.清空fd_set變量,並將需要檢查的所有文件描述符加入fd_set
- 3.調用select。若返回-1,則說明出錯;返回0,則說明超時,返回正數,則為發生狀態變化的文件描述符的個數
- 4.若select返回大於0,則依次查看哪些文件描述符變的可讀,並對它們進行處理
- 5.返回步驟2,開始新一輪的檢測
若上面的聊天程序使用select進行改寫,則是下面這樣的
服務器端
#include <stdio.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>
#define BACKLOG 5 //完成三次握手但沒有accept的隊列的長度
#define CONCURRENT_MAX 8 //應用層同時可以處理的連接
#define SERVER_PORT 11332
#define BUFFER_SIZE 1024
#define QUIT_CMD ".quit"
int client_fds[CONCURRENT_MAX];
int main (int argc, const char * argv[])
{
char input_msg[BUFFER_SIZE];
char recv_msg[BUFFER_SIZE];
//本地地址
struct sockaddr_in server_addr;
server_addr.sin_len = sizeof(struct sockaddr_in);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
bzero(&(server_addr.sin_zero),8);
//創建socket
int server_sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_sock_fd == -1) {
perror("socket error");
return 1;
}
//綁定socket
int bind_result = bind(server_sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (bind_result == -1) {
perror("bind error");
return 1;
}
//listen
if (listen(server_sock_fd, BACKLOG) == -1) {
perror("listen error");
return 1;
}
//fd_set
fd_set server_fd_set;
int max_fd = -1;
struct timeval tv;
tv.tv_sec = 20;
tv.tv_usec = 0;
while (1) {
FD_ZERO(&server_fd_set);
//標准輸入
FD_SET(STDIN_FILENO, &server_fd_set);
if (max_fd < STDIN_FILENO) {
max_fd = STDIN_FILENO;
}
//服務器端socket
FD_SET(server_sock_fd, &server_fd_set);
if (max_fd < server_sock_fd) {
max_fd = server_sock_fd;
}
//客戶端連接
for (int i = 0; i < CONCURRENT_MAX; i++) {
if (client_fds[i]!=0) {
FD_SET(client_fds[i], &server_fd_set);
if (max_fd < client_fds[i]) {
max_fd = client_fds[i];
}
}
}
int ret = select(max_fd+1, &server_fd_set, NULL, NULL, &tv);
if (ret < 0) {
perror("select 出錯\n");
continue;
}else if(ret == 0){
printf("select 超時\n");
continue;
}else{
//ret為未狀態發生變化的文件描述符的個數
if (FD_ISSET(STDIN_FILENO, &server_fd_set)) {
//標准輸入
bzero(input_msg, BUFFER_SIZE);
fgets(input_msg, BUFFER_SIZE, stdin);
//輸入 ".quit" 則退出服務器
if (strcmp(input_msg, QUIT_CMD) == 0) {
exit(0);
}
for (int i=0; i<CONCURRENT_MAX; i++) {
if (client_fds[i]!=0) {
send(client_fds[i], input_msg, BUFFER_SIZE, 0);
}
}
}
if (FD_ISSET(server_sock_fd, &server_fd_set)) {
//有新的連接請求
struct sockaddr_in client_address;
socklen_t address_len;
int client_socket_fd = accept(server_sock_fd, (struct sockaddr *)&client_address, &address_len);
if (client_socket_fd > 0) {
int index = -1;
for (int i = 0; i < CONCURRENT_MAX; i++) {
if (client_fds[i] == 0) {
index = i;
client_fds[i] = client_socket_fd;
break;
}
}
if (index >= 0) {
printf("新客戶端(%d)加入成功 %s:%d \n",index,inet_ntoa(client_address.sin_addr),ntohs(client_address.sin_port));
}else{
bzero(input_msg, BUFFER_SIZE);
strcpy(input_msg, "服務器加入的客戶端數達到最大值,無法加入!\n");
send(client_socket_fd, input_msg, BUFFER_SIZE, 0);
printf("客戶端連接數達到最大值,新客戶端加入失敗 %s:%d \n",inet_ntoa(client_address.sin_addr),ntohs(client_address.sin_port));
}
}
}
for (int i = 0; i <CONCURRENT_MAX; i++) {
if (client_fds[i]!=0) {
if (FD_ISSET(client_fds[i], &server_fd_set)) {
//處理某個客戶端過來的消息
bzero(recv_msg, BUFFER_SIZE);
long byte_num = recv(client_fds[i],recv_msg,BUFFER_SIZE,0);
if (byte_num > 0) {
if (byte_num > BUFFER_SIZE) {
byte_num = BUFFER_SIZE;
}
recv_msg[byte_num] = '\0';
printf("客戶端(%d):%s\n",i,recv_msg);
}else if(byte_num < 0){
printf("從客戶端(%d)接受消息出錯.\n",i);
}else{
FD_CLR(client_fds[i], &server_fd_set);
client_fds[i] = 0;
printf("客戶端(%d)退出了\n",i);
}
}
}
}
}
}
return 0;
}
客戶端
#include <stdio.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#define BUFFER_SIZE 1024
int main (int argc, const char * argv[])
{
struct sockaddr_in server_addr;
server_addr.sin_len = sizeof(struct sockaddr_in);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(11332);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
bzero(&(server_addr.sin_zero),8);
int server_sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_sock_fd == -1) {
perror("socket error");
return 1;
}
char recv_msg[BUFFER_SIZE];
char input_msg[BUFFER_SIZE];
if (connect(server_sock_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))==0) {
fd_set client_fd_set;
struct timeval tv;
tv.tv_sec = 20;
tv.tv_usec = 0;
while (1) {
FD_ZERO(&client_fd_set);
FD_SET(STDIN_FILENO, &client_fd_set);
FD_SET(server_sock_fd, &client_fd_set);
int ret = select(server_sock_fd + 1, &client_fd_set, NULL, NULL, &tv);
if (ret < 0 ) {
printf("select 出錯!\n");
continue;
}else if(ret ==0){
printf("select 超時!\n");
continue;
}else{
if (FD_ISSET(STDIN_FILENO, &client_fd_set)) {
bzero(input_msg, BUFFER_SIZE);
fgets(input_msg, BUFFER_SIZE, stdin);
if (send(server_sock_fd, input_msg, BUFFER_SIZE, 0) == -1) {
perror("發送消息出錯!\n");
}
}
if (FD_ISSET(server_sock_fd, &client_fd_set)) {
bzero(recv_msg, BUFFER_SIZE);
long byte_num = recv(server_sock_fd,recv_msg,BUFFER_SIZE,0);
if (byte_num > 0) {
if (byte_num > BUFFER_SIZE) {
byte_num = BUFFER_SIZE;
}
recv_msg[byte_num] = '\0';
printf("服務器:%s\n",recv_msg);
}else if(byte_num < 0){
printf("接受消息出錯!\n");
}else{
printf("服務器端退出!\n");
exit(0);
}
}
}
}
}
return 0;
}
當然select也有其局限性。當fd_set中的文件描述符較少,或者大都數文件描述符都比較活躍的時候,select的效率還是不錯的。Mac系統中已經定義了fd_set 最大可以容納的文件描述符的個數為1024
//sys/_structs.h
#define __DARWIN_FD_SETSIZE 1024
/////////////////////////////////////////////
//Kernel.framework sys/select.h
#define FD_SETSIZE __DARWIN_FD_SETSIZE
每一次select 調用的時候,都涉及到user space和kernel space的內存拷貝,且會對fd_set中的所有文件描述符進行遍歷,如果所有的文件描述符均不滿足,且沒有超時,則當前進程便開始睡眠,直到超時或者有文件描述符狀態發生變化。當文件描述符數量較大的時候,將耗費大量的CPU時間。所以后來有新的方案出現了,如windows2000引入的IOCP,Linux Kernel 2.6中成熟的epoll,FreeBSD4.x引入的kqueue。
五.使用kqueue
Mac是基於BSD的內核,所使用的是kqueue(kernel event notification mechanism,詳細內容可以Mac中 man 2 kqueue
),kqueue比select先進的地方就在於使用事件觸發的機制,且其調用無需每次對所有的文件描述符進行遍歷,返回的時候只返回需要處理的事件,而不像select中需要自己去一個個通過FD_ISSET檢查。
kqueue默認的觸發方式是level 水平觸發,可以通過設置event的flag為EV_CLEAR
使得這個事件變為邊沿觸發,可能epoll的觸發方式無法細化到單個event,需要查證。
kqueue中涉及兩個系統調用,kqueue()和kevent()
- kqueue() 創建kernel級別的事件隊列,並返回隊列的文件描述符
- kevent() 往事件隊列中加入訂閱事件,或者返回相關的事件數組
kqueue使用的流程一般如下:
- 創建kqueue
- 創建struct kevent變量(注意這里的kevent是結構體類型名),可以通過EV_SET這個宏提供的快捷方式進行創建
- 通過kevent系統調用將創建好的kevent結構體變量加入到kqueue隊列中,完成對指定文件描述符的事件的訂閱
-
通過kevent系統調用獲取滿足條件的事件隊列,並對每一個事件進行處理
#include <stdio.h> #include <stdlib.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/event.h> #include <sys/types.h> #include <sys/time.h> #include <arpa/inet.h> #include <string.h> #include <unistd.h> #define BACKLOG 5 //完成三次握手但沒有accept的隊列的長度 #define CONCURRENT_MAX 8 //應用層同時可以處理的連接 #define SERVER_PORT 11332 #define BUFFER_SIZE 1024 #define QUIT_CMD ".quit" int client_fds[CONCURRENT_MAX]; struct kevent events[10];//CONCURRENT_MAX + 2 int main (int argc, const char * argv[]) { char input_msg[BUFFER_SIZE]; char recv_msg[BUFFER_SIZE]; //本地地址 struct sockaddr_in server_addr; server_addr.sin_len = sizeof(struct sockaddr_in); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(SERVER_PORT); server_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); bzero(&(server_addr.sin_zero),8); //創建socket int server_sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (server_sock_fd == -1) { perror("socket error"); return 1; } //綁定socket int bind_result = bind(server_sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)); if (bind_result == -1) { perror("bind error"); return 1; } //listen if (listen(server_sock_fd, BACKLOG) == -1) { perror("listen error"); return 1; } struct timespec timeout = {10,0}; //kqueue int kq = kqueue(); if (kq == -1) { perror("創建kqueue出錯!\n"); exit(1); } struct kevent event_change; EV_SET(&event_change, STDIN_FILENO, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, &event_change, 1, NULL, 0, NULL); EV_SET(&event_change, server_sock_fd, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, &event_change, 1, NULL, 0, NULL); while (1) { int ret = kevent(kq, NULL, 0, events, 10, &timeout); if (ret < 0) { printf("kevent 出錯!\n"); continue; }else if(ret == 0){ printf("kenvent 超時!\n"); continue; }else{ //ret > 0 返回事件放在events中 for (int i = 0; i < ret; i++) { struct kevent current_event = events[i]; //kevent中的ident就是文件描述符 if (current_event.ident == STDIN_FILENO) { //標准輸入 bzero(input_msg, BUFFER_SIZE); fgets(input_msg, BUFFER_SIZE, stdin); //輸入 ".quit" 則退出服務器 if (strcmp(input_msg, QUIT_CMD) == 0) { exit(0); } for (int i=0; i<CONCURRENT_MAX; i++) { if (client_fds[i]!=0) { send(client_fds[i], input_msg, BUFFER_SIZE, 0); } } }else if(current_event.ident == server_sock_fd){ //有新的連接請求 struct sockaddr_in client_address; socklen_t address_len; int client_socket_fd = accept(server_sock_fd, (struct sockaddr *)&client_address, &address_len); if (client_socket_fd > 0) { int index = -1; for (int i = 0; i < CONCURRENT_MAX; i++) { if (client_fds[i] == 0) { index = i; client_fds[i] = client_socket_fd; break; } } if (index >= 0) { EV_SET(&event_change, client_socket_fd, EVFILT_READ, EV_ADD, 0, 0, NULL); kevent(kq, &event_change, 1, NULL, 0, NULL); printf("新客戶端(fd = %d)加入成功 %s:%d \n",client_socket_fd,inet_ntoa(client_address.sin_addr),ntohs(client_address.sin_port)); }else{ bzero(input_msg, BUFFER_SIZE); strcpy(input_msg, "服務器加入的客戶端數達到最大值,無法加入!\n"); send(client_socket_fd, input_msg, BUFFER_SIZE, 0); printf("客戶端連接數達到最大值,新客戶端加入失敗 %s:%d \n",inet_ntoa(client_address.sin_addr),ntohs(client_address.sin_port)); } } }else{ //處理某個客戶端過來的消息 bzero(recv_msg, BUFFER_SIZE); long byte_num = recv((int)current_event.ident,recv_msg,BUFFER_SIZE,0); if (byte_num > 0) { if (byte_num > BUFFER_SIZE) { byte_num = BUFFER_SIZE; } recv_msg[byte_num] = '\0'; printf("客戶端(fd = %d):%s\n",(int)current_event.ident,recv_msg); }else if(byte_num < 0){ printf("從客戶端(fd = %d)接受消息出錯.\n",(int)current_event.ident); }else{ EV_SET(&event_change, current_event.ident, EVFILT_READ, EV_DELETE, 0, 0, NULL); kevent(kq, &event_change, 1, NULL, 0, NULL); close((int)current_event.ident); for (int i = 0; i < CONCURRENT_MAX; i++) { if (client_fds[i] == (int)current_event.ident) { client_fds[i] = 0; break; } } printf("客戶端(fd = %d)退出了\n",(int)current_event.ident); } } } } } return 0; }
其實kqueue的應用場景非常的廣闊,可以監控文件系統中文件的變化(對文件變化的事件可以粒度非常的細,具體可以查看kqueue的手冊),監控系統進程的生命周期。GCD的事件處理便是建立在kqueue之上的。
六.使用Streams
使用Objective-C的一大優點便是面向對象編程,使得邏輯抽象得更加優美,更加符合人類思維。 一開始說過,無論是對於文件的操作或者對於網絡的操作,本質上都是IO操作,無非寫數據和讀數據,可以對這種輸入輸出進行抽象,抽象成輸入流和輸出流, 從輸入流中讀取數據,往輸出流中寫數據。 Cocoa中的NSInputStream和NSOutputStream便是輸入流和輸出流的抽象,它們的實現分別基於CoreFoundation中的CFReadStream和CFWriteStream。 輸入輸出流對runloop有很好的支持。 NSInputStream和CFReadStream以及NSOutputStream和CFWriteStream之間可以通過 "toll-free bridging"實現無縫的類型轉換。 CoreFoundation中的CFStream提供了輸入輸出流和CFSocket綁定的函數。 這樣便可以通過輸入輸出流和遠端進行通信了。
首先通過XCode創建一個Foundation(C的也行,但是你得將main.c
改成main.m
)的命令行項目. 創建一個ChatServer的類,包含一個run的方法。在Cocoa的程序中有一點是和C語言不同的,你無需自己去寫一個死循環充當runloop,框架本身就對runloop進行了支持,需要做的就是將事件源加入到當前線程的runloop中,然后啟動runloop。 所以在run方法中,創建好用於偵聽連接請求的socket,socket有對應的處理連接accept的回調函數,以及把它封裝成runloop的輸入源,加入到當前runloop。 我們還得從標准輸入獲取需要發送消息,所以使用了CFFileDescriptor,它是文件描述符的objc的封裝,加入了runloop的支持,通過它可以將標准輸入以輸入源的方法加入到當前runloop,當標准輸入緩沖區有數據可讀的時候,設置好的回調函數便會被調用。 最后啟動runloop。
ChatServer中的run方法
- (BOOL)run:(NSError **)error{
BOOL successful = YES;
CFSocketContext socketCtxt = {0, self, NULL, NULL, NULL};
_socket = CFSocketCreate(kCFAllocatorDefault, PF_INET, SOCK_STREAM,
IPPROTO_TCP,
kCFSocketAcceptCallBack,
(CFSocketCallBack)&SocketConnectionAcceptedCallBack,
&socketCtxt);
if (NULL == _socket) {
if (nil != error) {
*error = [[NSError alloc]
initWithDomain:ServerErrorDomain
code:kServerNoSocketsAvailable
userInfo:nil];
}
successful = NO;
}
if(YES == successful) {
// enable address reuse
int yes = 1;
setsockopt(CFSocketGetNative(_socket),
SOL_SOCKET, SO_REUSEADDR,
(void *)&yes, sizeof(yes));
uint8_t packetSize = 128;
setsockopt(CFSocketGetNative(_socket),
SOL_SOCKET, SO_SNDBUF,
(void *)&packetSize, sizeof(packetSize));
setsockopt(CFSocketGetNative(_socket),
SOL_SOCKET, SO_RCVBUF,
(void *)&packetSize, sizeof(packetSize));
struct sockaddr_in addr4;
memset(&addr4, 0, sizeof(addr4));
addr4.sin_len = sizeof(addr4);
addr4.sin_family = AF_INET;
addr4.sin_port = htons(CHAT_SERVER_PORT);
addr4.sin_addr.s_addr = htonl(INADDR_ANY);
NSData *address4 = [NSData dataWithBytes:&addr4 length:sizeof(addr4)];
if (kCFSocketSuccess != CFSocketSetAddress(_socket, (CFDataRef)address4)) {
if (error) *error = [[NSError alloc]
initWithDomain:ServerErrorDomain
code:kServerCouldNotBindToIPv4Address
userInfo:nil];
if (_socket) CFRelease(_socket);
_socket = NULL;
successful = NO;
} else {
// now that the binding was successful, we get the port number
NSData *addr = [(NSData *)CFSocketCopyAddress(_socket) autorelease];
memcpy(&addr4, [addr bytes], [addr length]);
self.port = ntohs(addr4.sin_port);
// 將socket 輸入源加入到當前的runloop
CFRunLoopRef cfrl = CFRunLoopGetCurrent();
CFRunLoopSourceRef source4 = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
CFRunLoopAddSource(cfrl, source4, kCFRunLoopDefaultMode);
CFRelease(source4);
//標准輸入,當在命令行中輸入時,回調函數便會被調用
CFFileDescriptorContext context = {0,self,NULL,NULL,NULL};
CFFileDescriptorRef stdinFDRef = CFFileDescriptorCreate(kCFAllocatorDefault, STDIN_FILENO, true, FileDescriptorCallBack, &context);
CFFileDescriptorEnableCallBacks(stdinFDRef,kCFFileDescriptorReadCallBack);
CFRunLoopSourceRef stdinSource = CFFileDescriptorCreateRunLoopSource(kCFAllocatorDefault, stdinFDRef, 0);
CFRunLoopAddSource(cfrl, stdinSource, kCFRunLoopDefaultMode);
CFRelease(stdinSource);
CFRelease(stdinFDRef);
CFRunLoopRun();
}
}
return successful;
}
當有客戶端連接請求過來時, SocketConnectionAcceptedCallBack這個回調函數會被調用,根據新的全相關的socket,生成輸入輸出流,並設置輸入輸出流的delegate方法,將其添加到當前的runloop,這樣流中有數據過來的時候,delegate方法會被調用。
SocketConnectionAcceptedCallBack函數
static void SocketConnectionAcceptedCallBack(CFSocketRef socket,
CFSocketCallBackType type,
CFDataRef address,
const void *data, void *info) {
ChatServer *theChatServer = (ChatServer *)info;
if (kCFSocketAcceptCallBack == type) {
// 摘自kCFSocketAcceptCallBack的文檔,New connections will be automatically accepted and the callback is called with the data argument being a pointer to a CFSocketNativeHandle of the child socket. This callback is usable only with listening sockets.
CFSocketNativeHandle nativeSocketHandle = *(CFSocketNativeHandle *)data;
// create the read and write streams for the connection to the other process
CFReadStreamRef readStream = NULL;
CFWriteStreamRef writeStream = NULL;
CFStreamCreatePairWithSocket(kCFAllocatorDefault, nativeSocketHandle,
&readStream, &writeStream);
if(NULL != readStream && NULL != writeStream) {
CFReadStreamSetProperty(readStream,
kCFStreamPropertyShouldCloseNativeSocket,
kCFBooleanTrue);
CFWriteStreamSetProperty(writeStream,
kCFStreamPropertyShouldCloseNativeSocket,
kCFBooleanTrue);
NSInputStream *inputStream = (NSInputStream *)readStream;//toll-free bridging
NSOutputStream *outputStream = (NSOutputStream *)writeStream;//toll-free bridging
inputStream.delegate = theChatServer;
[inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[inputStream open];
outputStream.delegate = theChatServer;
[outputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[outputStream open];
Client *aClient = [[Client alloc] init];
aClient.inputStream = inputStream;
aClient.outputStream = outputStream;
aClient.sock_fd = nativeSocketHandle;
[theChatServer.clients setValue:aClient
forKey:[NSString stringWithFormat:@"%d",inputStream]];
NSLog(@"有新客戶端(sock_fd=%d)加入",nativeSocketHandle);
} else {
close(nativeSocketHandle);
}
if (readStream) CFRelease(readStream);
if (writeStream) CFRelease(writeStream);
}
}
當客戶端有數據傳過來時,相應的NSInputStream的delegate方法被調用
- (void) stream:(NSStream*)stream handleEvent:(NSStreamEvent)eventCode {
switch (eventCode) {
case NSStreamEventOpenCompleted: {
break;
}
case NSStreamEventHasBytesAvailable: {
Client *client = [self.clients objectForKey:[NSString stringWithFormat:@"%d",stream]];
NSMutableData *data = [NSMutableData data];
uint8_t *buf = calloc(128, sizeof(uint8_t));
NSUInteger len = 0;
while([(NSInputStream*)stream hasBytesAvailable]) {
len = [(NSInputStream*)stream read:buf maxLength:128];
if(len > 0) {
[data appendBytes:buf length:len];
}
}
free(buf);
if ([data length] == 0) {
//客戶端退出
NSLog(@"客戶端(sock_fd=%d)退出",client.sock_fd);
[self.clients removeObjectForKey:[NSString stringWithFormat:@"%d",stream]];
close(client.sock_fd);
}else{
NSLog(@"收到客戶端(sock_fd=%d)消息:%@",client.sock_fd,[[[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding] autorelease]);
}
break;
}
case NSStreamEventHasSpaceAvailable: {
break;
}
case NSStreamEventEndEncountered: {
break;
}
case NSStreamEventErrorOccurred: {
break;
}
default:
break;
}
}
當在debug窗口中輸入內容並回車時,標准輸入緩沖區中便有數據了,這個時候回調函數FileDescriptorCallBack將被調用,處理標准輸入。
static void FileDescriptorCallBack(CFFileDescriptorRef f,
CFOptionFlags callBackTypes,
void *info){
int fd = CFFileDescriptorGetNativeDescriptor(f);
ChatServer *theChatServer = (ChatServer *)info;
if (fd == STDIN_FILENO) {
NSData *inputData = [[NSFileHandle fileHandleWithStandardInput] availableData];
NSString *inputString = [[[NSString alloc] initWithData:inputData encoding:NSUTF8StringEncoding] autorelease];
NSLog(@"准備發送消息:%@",inputString);
for (Client *client in [theChatServer.clients allValues]) {
[client.outputStream write:[inputData bytes] maxLength:[inputData length]];
}
//處理完數據之后必須重新Enable 回調函數
CFFileDescriptorEnableCallBacks(f,kCFFileDescriptorReadCallBack);
}
}