TCP層recvmsg系統調用的實現分析


概述

recvmsg系統調用在tcp層的實現是tcp_recvmsg函數,該函數完成從接收隊列中讀取數據復制到用戶空間的任務;函數在執行過程中會鎖定控制塊,避免軟中斷在tcp層的影響;函數會涉及從接收隊列receive_queue,預處理隊列prequeue和后備隊列backlog中讀取數據;其中從prequeue和backlog中讀取的數據,還需要經過sk_backlog_rcv回調,該回調的實現為tcp_v4_do_rcv,實際上是先緩存到隊列中,然后需要讀取的時候,才進入協議棧處理,此時,是在進程上下文執行的,因為會設置tp->ucopy.task=current,在協議棧處理過程中,會直接將數據復制到用戶空間;

代碼分析
  1 int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
  2         int flags, int *addr_len)
  3 {
  4     struct tcp_sock *tp = tcp_sk(sk);
  5     int copied = 0;
  6     u32 peek_seq;
  7     u32 *seq;
  8     unsigned long used;
  9     int err;
 10     int target;        /* Read at least this many bytes */
 11     long timeo;
 12     struct task_struct *user_recv = NULL;
 13     struct sk_buff *skb, *last;
 14     u32 urg_hole = 0;
 15 
 16     if (unlikely(flags & MSG_ERRQUEUE))
 17         return inet_recv_error(sk, msg, len, addr_len);
 18 
 19     if (sk_can_busy_loop(sk) && skb_queue_empty(&sk->sk_receive_queue) &&
 20         (sk->sk_state == TCP_ESTABLISHED))
 21         sk_busy_loop(sk, nonblock);
 22 
 23     /* 傳輸層上鎖,避免軟中斷影響 */
 24     lock_sock(sk);
 25 
 26     err = -ENOTCONN;
 27     /* LISTEN狀態,不允許讀取數據 */
 28     if (sk->sk_state == TCP_LISTEN)
 29         goto out;
 30 
 31     /* 獲取阻塞讀取的超時時間,非阻塞為0 */
 32     timeo = sock_rcvtimeo(sk, nonblock);
 33 
 34     /* Urgent data needs to be handled specially. */
 35     /* 帶外數據讀取 */
 36     if (flags & MSG_OOB)
 37         goto recv_urg;
 38 
 39     /* 修復模式 */
 40     if (unlikely(tp->repair)) {
 41         err = -EPERM;
 42         if (!(flags & MSG_PEEK))
 43             goto out;
 44 
 45         if (tp->repair_queue == TCP_SEND_QUEUE)
 46             goto recv_sndq;
 47 
 48         err = -EINVAL;
 49         if (tp->repair_queue == TCP_NO_QUEUE)
 50             goto out;
 51 
 52         /* 'common' recv queue MSG_PEEK-ing */
 53     }
 54 
 55     /* 待讀取的序號 */
 56     seq = &tp->copied_seq;
 57 
 58     /* 只查看數據 */
 59     if (flags & MSG_PEEK) {
 60         /* 復制一個序號用於記錄 */
 61         peek_seq = tp->copied_seq;
 62         seq = &peek_seq;
 63     }
 64 
 65     /* 
 66         確定讀取長度,設置了MSG_WAITALL則
 67         使用用戶輸入的len,否則使用低潮限度 
 68     */
 69     target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
 70 
 71     do {
 72         u32 offset;
 73 
 74         /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
 75         /* 讀到了帶外數據 */
 76         if (tp->urg_data && tp->urg_seq == *seq) {
 77             /* 之前已經讀取了部分數據,跳出 */
 78             if (copied)
 79                 break;
 80             /* 用戶進程有信號待處理,跳出 */
 81             if (signal_pending(current)) {
 82                 copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
 83                 break;
 84             }
 85         }
 86 
 87         /* Next get a buffer. */
 88 
 89         /* 獲取隊尾 */
 90         last = skb_peek_tail(&sk->sk_receive_queue);
 91 
 92         /* 遍歷接收隊列,找到滿足讀取的skb */
 93         skb_queue_walk(&sk->sk_receive_queue, skb) {
 94             last = skb;
 95             /* Now that we have two receive queues this
 96              * shouldn't happen.
 97              */
 98             /* 隊列中序號比待讀取的大 */
 99             if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
100                  "recvmsg bug: copied %X seq %X rcvnxt %X fl %X\n",
101                  *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
102                  flags))
103                 break;
104 
105             /* 獲取序號偏移*/
106             offset = *seq - TCP_SKB_CB(skb)->seq;
107 
108             /* 有syn標記,再減1 */
109             if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
110                 pr_err_once("%s: found a SYN, please report !\n", __func__);
111                 offset--;
112             }
113             /* 偏移小於skb數據長度,找到 */
114             if (offset < skb->len)
115                 goto found_ok_skb;
116 
117             /* 有fin標記,跳轉到fin處理 */
118             if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
119                 goto found_fin_ok;
120             WARN(!(flags & MSG_PEEK),
121                  "recvmsg bug 2: copied %X seq %X rcvnxt %X fl %X\n",
122                  *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
123         }
124 
125         /* Well, if we have backlog, try to process it now yet. */
126 
127         /* 讀完目標數據&& backlog隊列為空 */
128         if (copied >= target && !sk->sk_backlog.tail)
129             break;
130 
131         /* 未讀完目標數據,或者讀完目標數據,隊列不為空 */
132 
133         /* 已經讀取了數據 */
134         if (copied) {
135             /* 有錯誤或者關閉或者有信號,跳出 */
136             if (sk->sk_err ||
137                 sk->sk_state == TCP_CLOSE ||
138                 (sk->sk_shutdown & RCV_SHUTDOWN) ||
139                 !timeo ||
140                 signal_pending(current))
141                 break;
142         } else {
143             /* 會話終結*/
144             if (sock_flag(sk, SOCK_DONE))
145                 break;
146 
147             /* 有錯誤 */
148             if (sk->sk_err) {
149                 copied = sock_error(sk);
150                 break;
151             }
152 
153             /* 關閉接收端 */
154             if (sk->sk_shutdown & RCV_SHUTDOWN)
155                 break;
156 
157             /* 連接關閉 */
158             if (sk->sk_state == TCP_CLOSE) {
159                 /* 不在done狀態,可能再讀一個連接未建立起來的連接 */
160                 if (!sock_flag(sk, SOCK_DONE)) {
161                     /* This occurs when user tries to read
162                      * from never connected socket.
163                      */
164                     copied = -ENOTCONN;
165                     break;
166                 }
167                 break;
168             }
169 
170             /* 不阻塞等待 */
171             if (!timeo) {
172                 copied = -EAGAIN;
173                 break;
174             }
175 
176             /* 有信號待處理 */
177             if (signal_pending(current)) {
178                 copied = sock_intr_errno(timeo);
179                 break;
180             }
181         }
182 
183         /* 檢查是否需要發送ack */
184         tcp_cleanup_rbuf(sk, copied);
185 
186         /* 未開啟低延遲&& tp的任務為空或者是當前進程 */
187         if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
188             /* Install new reader */
189             /* 注冊當前進程任務 */
190             if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
191                 user_recv = current;
192                 tp->ucopy.task = user_recv;
193                 tp->ucopy.msg = msg;
194             }
195 
196             /* 當前可以使用的用戶緩存大小 */
197             tp->ucopy.len = len;
198 
199             WARN_ON(tp->copied_seq != tp->rcv_nxt &&
200                 !(flags & (MSG_PEEK | MSG_TRUNC)));
201 
202             /* Ugly... If prequeue is not empty, we have to
203              * process it before releasing socket, otherwise
204              * order will be broken at second iteration.
205              * More elegant solution is required!!!
206              *
207              * Look: we have the following (pseudo)queues:
208              *
209              * 1. packets in flight
210              * 2. backlog
211              * 3. prequeue
212              * 4. receive_queue
213              *
214              * Each queue can be processed only if the next ones
215              * are empty. At this point we have empty receive_queue.
216              * But prequeue _can_ be not empty after 2nd iteration,
217              * when we jumped to start of loop because backlog
218              * processing added something to receive_queue.
219              * We cannot release_sock(), because backlog contains
220              * packets arrived _after_ prequeued ones.
221              *
222              * Shortly, algorithm is clear --- to process all
223              * the queues in order. We could make it more directly,
224              * requeueing packets from backlog to prequeue, if
225              * is not empty. It is more elegant, but eats cycles,
226              * unfortunately.
227              */
228             /* prequeue不為空,處理prequeue */
229             if (!skb_queue_empty(&tp->ucopy.prequeue))
230                 goto do_prequeue;
231 
232             /* __ Set realtime policy in scheduler __ */
233         }
234 
235         /* 目標數據讀取完,處理后備隊列 */
236         if (copied >= target) {
237             /* Do not sleep, just process backlog. */
238             release_sock(sk);
239             lock_sock(sk);
240         } 
241         /* 未讀取完,進入等待 */
242         else {
243             sk_wait_data(sk, &timeo, last);
244         }
245 
246         /* 用戶空間接收數據 */
247         if (user_recv) {
248             int chunk;
249 
250             /* __ Restore normal policy in scheduler __ */
251 
252             /* 獲取讀取長度 */
253             chunk = len - tp->ucopy.len;
254 
255             /* 記錄剩余讀取長度和已經讀取長度 */
256             if (chunk != 0) {
257                 NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
258                 len -= chunk;
259                 copied += chunk;
260             }
261 
262             /* 
263                 接收到的數據已經全部復制到用戶空間
264                 && prequeue不為空
265             */
266             if (tp->rcv_nxt == tp->copied_seq &&
267                 !skb_queue_empty(&tp->ucopy.prequeue)) {
268 do_prequeue:
269                 /* 處理prequeue */
270                 tcp_prequeue_process(sk);
271 
272                 /* 獲取讀取長度和剩余長度 */
273                 chunk = len - tp->ucopy.len;
274                 if (chunk != 0) {
275                     NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
276                     len -= chunk;
277                     copied += chunk;
278                 }
279             }
280         }
281 
282         /* 只是查看數據,則更新peek_seq */
283         if ((flags & MSG_PEEK) &&
284             (peek_seq - copied - urg_hole != tp->copied_seq)) {
285             net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n",
286                         current->comm,
287                         task_pid_nr(current));
288             peek_seq = tp->copied_seq;
289         }
290         continue;
291 
292 /* 讀取一個找到的合適的段 */
293     found_ok_skb:
294         /* Ok so how much can we use? */
295 
296         /* 獲取該skb中可讀的數據長度 */
297         used = skb->len - offset;
298 
299         /* 不需要讀取那么多,則調整為需要的長度 */
300         if (len < used)
301             used = len;
302 
303         /* Do we have urgent data here? */
304         /* 有帶外數據*/
305         if (tp->urg_data) {
306             /* 帶外數據偏移 */
307             u32 urg_offset = tp->urg_seq - *seq;
308 
309             /* 偏移在我們要讀取的數據范圍內 */
310             if (urg_offset < used) {
311                 /* 當前正在讀取的數據為帶外數據 */
312                 if (!urg_offset) {
313                     /* 不允許放入正常數據流 */
314                     if (!sock_flag(sk, SOCK_URGINLINE)) {
315                         /* 調整序號和偏移 */
316                         ++*seq;
317                         urg_hole++;
318                         offset++;
319                         used--;
320                         /* 無可讀數據 */
321                         if (!used)
322                             goto skip_copy;
323                     }
324                 } 
325                 /* 本次只能讀到帶外數據為止 */
326                 else
327                     used = urg_offset;
328             }
329         }
330 
331         /* 讀取數據 */
332         if (!(flags & MSG_TRUNC)) {
333             err = skb_copy_datagram_msg(skb, offset, msg, used);
334             if (err) {
335                 /* Exception. Bailout! */
336                 if (!copied)
337                     copied = -EFAULT;
338                 break;
339             }
340         }
341 
342         /* 計算讀取和待讀取數據長度 */
343         *seq += used;
344         copied += used;
345         len -= used;
346 
347         tcp_rcv_space_adjust(sk);
348 
349 skip_copy:
350         /* 完成對帶外數據的處理 */
351         if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
352             /* 標志清零 */
353             tp->urg_data = 0;
354             /* 快路檢查 */
355             tcp_fast_path_check(sk);
356         }
357 
358         /* 滿足繼續讀取 */
359         if (used + offset < skb->len)
360             continue;
361         /* fin處理 */
362         if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
363             goto found_fin_ok;
364 
365         /* 數據讀取完,不是查看,則釋放該skb */
366         if (!(flags & MSG_PEEK))
367             sk_eat_skb(sk, skb);
368         continue;
369 
370     found_fin_ok:
371         /* Process the FIN. */
372         /* 序號增加 */
373         ++*seq;
374         /* 不是查看,則釋放skb */
375         if (!(flags & MSG_PEEK))
376             sk_eat_skb(sk, skb);
377         break;
378     } while (len > 0);
379 
380     /* 用戶空間進程接收數據 */
381     if (user_recv) {
382         /* prequeue不為空 */
383         if (!skb_queue_empty(&tp->ucopy.prequeue)) {
384             int chunk;
385 
386             /* 調整剩余可用空間 */
387             tp->ucopy.len = copied > 0 ? len : 0;
388 
389             /* 處理prequeue */
390             tcp_prequeue_process(sk);
391 
392             /* 讀取了數據,則重新計算下長度 */
393             if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) {
394                 NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
395                 len -= chunk;
396                 copied += chunk;
397             }
398         }
399 
400         /* 用戶空間結束讀取 */
401         tp->ucopy.task = NULL;
402         tp->ucopy.len = 0;
403     }
404 
405     /* According to UNIX98, msg_name/msg_namelen are ignored
406      * on connected socket. I was just happy when found this 8) --ANK
407      */
408 
409     /* Clean up data we have read: This will do ACK frames. */
410     /* 檢查是否有ack發送 */
411     tcp_cleanup_rbuf(sk, copied);
412 
413     release_sock(sk);
414     return copied;
415 
416 out:
417     release_sock(sk);
418     return err;
419 
420 recv_urg:
421     /* 帶外數據 */
422     err = tcp_recv_urg(sk, msg, len, flags);
423     goto out;
424 
425 recv_sndq:
426     err = tcp_peek_sndq(sk, msg, len);
427     goto out;
428 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM