聊天程序的底層socket實現我們用開源的GCDAsyncUdpSocket,本文依據GCDAsyncUdpSocket源碼來解析UDP socket通信。
socket通信的整體流程是:
創建並初始化一個socket進行相應的配置 -> 本地地址和端口的綁定 -> 連接socket到被給的地址和端口 -> 發送數據 -> 接收數據 -> 關閉socket
1.創建並初始化socket並進行相應的配置
初始化GCDAsyncUdpSocket時,我們需要設置一個接收數據的delegate和delegateQueue,還需要設置一個發送隊列(socket queue),也可以不指定發送隊列,這時GCDAsyncUdpSocket內不會創建一個默認的發送隊列,發送隊列應為串行隊列,這樣此socket發送數據的操作都會在此串行隊列中操作。
GCDAsyncUdpSocket是支持IPV4和IPV6的,如果DNS只返回IPV4地址,則GCDAsyncUdpSocket自動使用IPV4,如果DNS只返回IPV6地址,則GCDAsyncUdpSocket自動使用IPV6,如果DNS返回IPV4和IPV6地址,則GCDAsyncUdpSocket使用你設置的優先使用的選項。
我們可以設置接收數據包的大小,每次讀取接收數據時,只從接收緩存中讀取之前設定大小的數據。
2.綁定目的地址和端口
應該在服務器socket發送數據前,對socket進行綁定本地的地址和端口號。一般情況下對客戶端而言可以不用綁定地址和端口,在socket發送數據時,操作系統會自動分配一個可用的端口給socket,但這種情況只適用於客戶端先給服務器發送消息,如果是服務器創建socket后先給客戶端發送消息,客戶端也需要給socket綁定端口號,否則客戶端收到消息后也不知道分配給哪個應用程序。
綁定只能進行一次,綁定只能在socket建立連接之前,建立連接后不能在對socket進行綁定
3.socket建立連接
UDP是無連接的協議,建立連接並不是必須的。
建立連接到一個指定的host/port,有如下作用:
-你只能發送數據到連接到的host/port(即發送消息使用sendto函數時不能指定目標地址,可以使用send函數)
-你只能從連接到的host/port接收數據(接收數據時不必使用recvfrom函數來指定對端地址(IP和端口號),可以使用read, recv或recvmsg函數,除了連接的對端地址外的地址到達的數據包都不被傳遞到連接的socket上)
-你只能從連接到的host/port接收ICMP報文消息,像“連接拒絕”(未連接的UDP socket不會收到異步錯誤)
udp socket的connect函數並不會像TCP socket的connect函數一樣與對端進行通信,進行三次握手。相反內核只檢查任何能立即發現的錯誤(如,明顯無法到達的目的地),從傳遞給connect函數的socket地址結構中記錄對等體的ip地址和端口號,並立即返回到調用進程。
多次調用connect函數主要有兩個目的:
-重現指定peername
-斷開連接,即刪除peername(也可能會刪除socketname)
對於第一個目的來說,很簡單,只要設置好正確的套接字地址,傳參給connect即可。
對於第二個目的來說,需要將socket地址結構struct sockaddr中的sin_family成員設置成AF_UNSPEC,如下:
struct sockaddr_in disconnaddr; memset(&disconnaddr, 0, sizeof(disconnaddr)); disconnaddr.sin_family = AF_UNSPEC; // 斷開連接 connect(sockfd, &disconnaddr, sizeof(disconnaddr));
從性能上來說,當應用程序在未連接的UDP socket上調用sendto函數時,Berkeley派生的內核會臨時連接socket,發送數據報,然后取消socket連接。
// 連接兩次調用 sendto sendto(sockfd, buf, 100, 0, &servaddr, sizeof(servaddr)); sendto(sockfd, buf, 200, 0, &servaddr, sizeof(servaddr));
在未連接的UDP socket上調用sendto函數以獲取兩個數據報,內核涉及以下六個步驟:
-連接socket
-輸出第一個數據報
-斷開socket連接
-連接socket
-輸出第二個socket
-斷開socket連接
另一個考慮的因素是路由表的搜索次數。
當應用程序直到它將向同一個地址發送多個數據報時,顯式連接socket更有效。
connect(sockfd, &servaddr, sizeof(servaddr)); write(sockfd, buf, 100); write(sockfd, buf, 200);
調用connect然后調用write兩次,涉及內核以下步驟:
-連接socket
-輸出第一個數據報
-輸出第二個socket
在這種情況下內核只復制一次含有目的IP和port的socket地址,而使用sendto函數時,需要復制兩次,臨時連接未連接的UDP socket大約會消耗每個UDP傳輸三分之一的開銷。
4.發送數據
在發送數據時尤其需要注意的一點是:在發送完數據的回調方法調用之前都不應改變被發送的數據。
在業務上可以在發送時設置過濾器,(外部設置,發送時判斷是否有過濾器,有就先執行過濾器,根據結果執行后續操作)。
5.接收數據
在接收數據時也可設置過濾器,(同樣是外部設置,在接收到數據后,只有通過過濾器的數據才交給上層應用進程)。
在接收數據時,應合理設置接收緩存的大小,設置的過大會浪費存儲空間;設置的過小不足以容納接收回來的數據時,則會丟棄容不下的數據,而且此時recvfrom函數並不會返回一個錯誤的值。
在業務上還可以設置一次接收全部數據,還可以設置分多次接收數據,例如源碼中的receiveOnce和beginReceiving函數可切換是否多次接收數據。
另外我們還可以暫停接收數據,這里需要注意的問題是,因為接收數據是異步進行的,所以調用pauseReceiving方法時,接收數據的代理方法可能已經觸發,此時這些方法仍會繼續調用。
6.關閉socket
主要是關閉發送和接收的stream及注銷其在runloop中注冊的監聽,以及釋放相關資源。
在這里可以選擇立即關閉socket或將未發送數據發送完后再關閉socket。
基於UDP socket的通信還可以發多播(組播)和廣播消息。
IGMP協議是IP組播的基礎。
加入和離開多播組只需要調用以下代碼:
//加入多播組 int status = setsockopt(socket4FD, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&imreq, sizeof(imreq)); //離開多播組 int status = setsockopt(socket4FD, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&imreq, sizeof(imreq));
上面介紹了UDP socket通信的大致流程下面來看GCDAsyncUdpSocket的源碼。
它定義了一個發送數據包的結構GCDAsyncUdpSendPacket,在發送數據包時,用相應的數據去填充該數據結構,然后將其壓縮發送出去。
/** * The GCDAsyncUdpSendPacket encompasses the instructions for a single send/write. **/ @interface GCDAsyncUdpSendPacket : NSObject { @public NSData *buffer; NSTimeInterval timeout; long tag; BOOL resolveInProgress; BOOL filterInProgress; NSArray *resolvedAddresses; NSError *resolveError; NSData *address; int addressFamily; } - (id)initWithData:(NSData *)d timeout:(NSTimeInterval)t tag:(long)i; @end @implementation GCDAsyncUdpSendPacket - (id)initWithData:(NSData *)d timeout:(NSTimeInterval)t tag:(long)i { if ((self = [super init])) { buffer = d; timeout = t; tag = i; resolveInProgress = NO; } return self; } @end
- (void)sendData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag { LogTrace(); if ([data length] == 0) { LogWarn(@"Ignoring attempt to send nil/empty data."); return; } GCDAsyncUdpSendPacket *packet = [[GCDAsyncUdpSendPacket alloc] initWithData:data timeout:timeout tag:tag]; dispatch_async(socketQueue, ^{ @autoreleasepool { [sendQueue addObject:packet]; [self maybeDequeueSend]; }}); }
同時還定義了一個用於連接的數據包的結構GCDAsyncUdpSpecialPacket
@interface GCDAsyncUdpSpecialPacket : NSObject { @public // uint8_t type; BOOL resolveInProgress; NSArray *addresses; NSError *error; } - (id)init; @end @implementation GCDAsyncUdpSpecialPacket - (id)init { self = [super init]; return self; } @end
GCDAsyncUdpSocket底層是基於stream來實現的,在使用socket時我們需要創建4個stream(readStream4, writeStream4, readStream6 , writeStream6),分別用於IPV4和IPV6收發數據:
首先需要獲取readSrteam和writeStream:
- (CFReadStreamRef)readStream { if (! dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)) { LogWarn(@"%@: %@ - Method only available from within the context of a performBlock: invocation", THIS_FILE, THIS_METHOD); return NULL; } NSError *err = nil; if (![self createReadAndWriteStreams:&err]) { LogError(@"Error creating CFStream(s): %@", err); return NULL; } // Todo... if (readStream4) return readStream4; else return readStream6; } - (CFWriteStreamRef)writeStream { if (! dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)) { LogWarn(@"%@: %@ - Method only available from within the context of a performBlock: invocation", THIS_FILE, THIS_METHOD); return NULL; } NSError *err = nil; if (![self createReadAndWriteStreams:&err]) { LogError(@"Error creating CFStream(s): %@", err); return NULL; } if (writeStream4) return writeStream4; else return writeStream6; }
這里默認使用IPV4。
dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey)
上面這個方法是判斷當前是否是在socketQueue,這里涉及到以下兩個方法:
dispatch_queue_set_specific() dispatch_get_specific() // The dispatch_queue_set_specific() and dispatch_get_specific() functions take a "void *key" parameter. // From the documentation: // // > Keys are only compared as pointers and are never dereferenced. // > Thus, you can use a pointer to a static variable for a specific subsystem or // > any other value that allows you to identify the value uniquely. // // We're just going to use the memory address of an ivar. // Specifically an ivar that is explicitly named for our purpose to make the code more readable. // // However, it feels tedious (and less readable) to include the "&" all the time: // dispatch_get_specific(&IsOnSocketQueueOrTargetQueueKey) // // So we're going to make it so it doesn't matter if we use the '&' or not, // by assigning the value of the ivar to the address of the ivar. // Thus: IsOnSocketQueueOrTargetQueueKey == &IsOnSocketQueueOrTargetQueueKey;
以下是創建read/write stream函數:
- (BOOL)createReadAndWriteStreams:(NSError **)errPtr { LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue"); NSError *err = nil; if (readStream4 || writeStream4 || readStream6 || writeStream6) { // Streams already created return YES; } if (socket4FD == SOCKET_NULL && socket6FD == SOCKET_NULL) { err = [self otherError:@"Cannot create streams without a file descriptor"]; goto Failed; } // Create streams LogVerbose(@"Creating read and write stream(s)..."); if (socket4FD != SOCKET_NULL) { CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socket4FD, &readStream4, &writeStream4); if (!readStream4 || !writeStream4) { err = [self otherError:@"Error in CFStreamCreatePairWithSocket() [IPv4]"]; //使用goto語句跳轉到Failed. goto Failed; } } if (socket6FD != SOCKET_NULL) { CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socket6FD, &readStream6, &writeStream6); if (!readStream6 || !writeStream6) { err = [self otherError:@"Error in CFStreamCreatePairWithSocket() [IPv6]"]; goto Failed; } } // Ensure the CFStream's don't close our underlying socket CFReadStreamSetProperty(readStream4, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); CFWriteStreamSetProperty(writeStream4, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); CFReadStreamSetProperty(readStream6, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); CFWriteStreamSetProperty(writeStream6, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse); return YES; Failed: if (readStream4) { CFReadStreamClose(readStream4); CFRelease(readStream4); readStream4 = NULL; } if (writeStream4) { CFWriteStreamClose(writeStream4); CFRelease(writeStream4); writeStream4 = NULL; } if (readStream6) { CFReadStreamClose(readStream6); CFRelease(readStream6); readStream6 = NULL; } if (writeStream6) { CFWriteStreamClose(writeStream6); CFRelease(writeStream6); writeStream6 = NULL; } if (errPtr) *errPtr = err; return NO; }
如果相應的stream已經創建了,直接返回YES,之后判斷socket狀態不都為SOCKET_NULL(為SOCKET_NULL則用goto語句跳轉到Failed),之后調用CFStreamCreatePairWithSocket函數創建read/write streams,並於socket綁定。
