概述
recvmsg系統調用在tcp層的實現是tcp_recvmsg函數,該函數完成從接收隊列中讀取數據復制到用戶空間的任務;函數在執行過程中會鎖定控制塊,避免軟中斷在tcp層的影響;函數會涉及從接收隊列receive_queue,預處理隊列prequeue和后備隊列backlog中讀取數據;其中從prequeue和backlog中讀取的數據,還需要經過sk_backlog_rcv回調,該回調的實現為tcp_v4_do_rcv,實際上是先緩存到隊列中,然后需要讀取的時候,才進入協議棧處理,此時,是在進程上下文執行的,因為會設置tp->ucopy.task=current,在協議棧處理過程中,會直接將數據復制到用戶空間;
代碼分析
1 int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock, 2 int flags, int *addr_len) 3 { 4 struct tcp_sock *tp = tcp_sk(sk); 5 int copied = 0; 6 u32 peek_seq; 7 u32 *seq; 8 unsigned long used; 9 int err; 10 int target; /* Read at least this many bytes */ 11 long timeo; 12 struct task_struct *user_recv = NULL; 13 struct sk_buff *skb, *last; 14 u32 urg_hole = 0; 15 16 if (unlikely(flags & MSG_ERRQUEUE)) 17 return inet_recv_error(sk, msg, len, addr_len); 18 19 if (sk_can_busy_loop(sk) && skb_queue_empty(&sk->sk_receive_queue) && 20 (sk->sk_state == TCP_ESTABLISHED)) 21 sk_busy_loop(sk, nonblock); 22 23 /* 傳輸層上鎖,避免軟中斷影響 */ 24 lock_sock(sk); 25 26 err = -ENOTCONN; 27 /* LISTEN狀態,不允許讀取數據 */ 28 if (sk->sk_state == TCP_LISTEN) 29 goto out; 30 31 /* 獲取阻塞讀取的超時時間,非阻塞為0 */ 32 timeo = sock_rcvtimeo(sk, nonblock); 33 34 /* Urgent data needs to be handled specially. */ 35 /* 帶外數據讀取 */ 36 if (flags & MSG_OOB) 37 goto recv_urg; 38 39 /* 修復模式 */ 40 if (unlikely(tp->repair)) { 41 err = -EPERM; 42 if (!(flags & MSG_PEEK)) 43 goto out; 44 45 if (tp->repair_queue == TCP_SEND_QUEUE) 46 goto recv_sndq; 47 48 err = -EINVAL; 49 if (tp->repair_queue == TCP_NO_QUEUE) 50 goto out; 51 52 /* 'common' recv queue MSG_PEEK-ing */ 53 } 54 55 /* 待讀取的序號 */ 56 seq = &tp->copied_seq; 57 58 /* 只查看數據 */ 59 if (flags & MSG_PEEK) { 60 /* 復制一個序號用於記錄 */ 61 peek_seq = tp->copied_seq; 62 seq = &peek_seq; 63 } 64 65 /* 66 確定讀取長度,設置了MSG_WAITALL則 67 使用用戶輸入的len,否則使用低潮限度 68 */ 69 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); 70 71 do { 72 u32 offset; 73 74 /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */ 75 /* 讀到了帶外數據 */ 76 if (tp->urg_data && tp->urg_seq == *seq) { 77 /* 之前已經讀取了部分數據,跳出 */ 78 if (copied) 79 break; 80 /* 用戶進程有信號待處理,跳出 */ 81 if (signal_pending(current)) { 82 copied = timeo ? sock_intr_errno(timeo) : -EAGAIN; 83 break; 84 } 85 } 86 87 /* Next get a buffer. */ 88 89 /* 獲取隊尾 */ 90 last = skb_peek_tail(&sk->sk_receive_queue); 91 92 /* 遍歷接收隊列,找到滿足讀取的skb */ 93 skb_queue_walk(&sk->sk_receive_queue, skb) { 94 last = skb; 95 /* Now that we have two receive queues this 96 * shouldn't happen. 97 */ 98 /* 隊列中序號比待讀取的大 */ 99 if (WARN(before(*seq, TCP_SKB_CB(skb)->seq), 100 "recvmsg bug: copied %X seq %X rcvnxt %X fl %X\n", 101 *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, 102 flags)) 103 break; 104 105 /* 獲取序號偏移*/ 106 offset = *seq - TCP_SKB_CB(skb)->seq; 107 108 /* 有syn標記,再減1 */ 109 if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) { 110 pr_err_once("%s: found a SYN, please report !\n", __func__); 111 offset--; 112 } 113 /* 偏移小於skb數據長度,找到 */ 114 if (offset < skb->len) 115 goto found_ok_skb; 116 117 /* 有fin標記,跳轉到fin處理 */ 118 if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN) 119 goto found_fin_ok; 120 WARN(!(flags & MSG_PEEK), 121 "recvmsg bug 2: copied %X seq %X rcvnxt %X fl %X\n", 122 *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags); 123 } 124 125 /* Well, if we have backlog, try to process it now yet. */ 126 127 /* 讀完目標數據&& backlog隊列為空 */ 128 if (copied >= target && !sk->sk_backlog.tail) 129 break; 130 131 /* 未讀完目標數據,或者讀完目標數據,隊列不為空 */ 132 133 /* 已經讀取了數據 */ 134 if (copied) { 135 /* 有錯誤或者關閉或者有信號,跳出 */ 136 if (sk->sk_err || 137 sk->sk_state == TCP_CLOSE || 138 (sk->sk_shutdown & RCV_SHUTDOWN) || 139 !timeo || 140 signal_pending(current)) 141 break; 142 } else { 143 /* 會話終結*/ 144 if (sock_flag(sk, SOCK_DONE)) 145 break; 146 147 /* 有錯誤 */ 148 if (sk->sk_err) { 149 copied = sock_error(sk); 150 break; 151 } 152 153 /* 關閉接收端 */ 154 if (sk->sk_shutdown & RCV_SHUTDOWN) 155 break; 156 157 /* 連接關閉 */ 158 if (sk->sk_state == TCP_CLOSE) { 159 /* 不在done狀態,可能再讀一個連接未建立起來的連接 */ 160 if (!sock_flag(sk, SOCK_DONE)) { 161 /* This occurs when user tries to read 162 * from never connected socket. 163 */ 164 copied = -ENOTCONN; 165 break; 166 } 167 break; 168 } 169 170 /* 不阻塞等待 */ 171 if (!timeo) { 172 copied = -EAGAIN; 173 break; 174 } 175 176 /* 有信號待處理 */ 177 if (signal_pending(current)) { 178 copied = sock_intr_errno(timeo); 179 break; 180 } 181 } 182 183 /* 檢查是否需要發送ack */ 184 tcp_cleanup_rbuf(sk, copied); 185 186 /* 未開啟低延遲&& tp的任務為空或者是當前進程 */ 187 if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) { 188 /* Install new reader */ 189 /* 注冊當前進程任務 */ 190 if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) { 191 user_recv = current; 192 tp->ucopy.task = user_recv; 193 tp->ucopy.msg = msg; 194 } 195 196 /* 當前可以使用的用戶緩存大小 */ 197 tp->ucopy.len = len; 198 199 WARN_ON(tp->copied_seq != tp->rcv_nxt && 200 !(flags & (MSG_PEEK | MSG_TRUNC))); 201 202 /* Ugly... If prequeue is not empty, we have to 203 * process it before releasing socket, otherwise 204 * order will be broken at second iteration. 205 * More elegant solution is required!!! 206 * 207 * Look: we have the following (pseudo)queues: 208 * 209 * 1. packets in flight 210 * 2. backlog 211 * 3. prequeue 212 * 4. receive_queue 213 * 214 * Each queue can be processed only if the next ones 215 * are empty. At this point we have empty receive_queue. 216 * But prequeue _can_ be not empty after 2nd iteration, 217 * when we jumped to start of loop because backlog 218 * processing added something to receive_queue. 219 * We cannot release_sock(), because backlog contains 220 * packets arrived _after_ prequeued ones. 221 * 222 * Shortly, algorithm is clear --- to process all 223 * the queues in order. We could make it more directly, 224 * requeueing packets from backlog to prequeue, if 225 * is not empty. It is more elegant, but eats cycles, 226 * unfortunately. 227 */ 228 /* prequeue不為空,處理prequeue */ 229 if (!skb_queue_empty(&tp->ucopy.prequeue)) 230 goto do_prequeue; 231 232 /* __ Set realtime policy in scheduler __ */ 233 } 234 235 /* 目標數據讀取完,處理后備隊列 */ 236 if (copied >= target) { 237 /* Do not sleep, just process backlog. */ 238 release_sock(sk); 239 lock_sock(sk); 240 } 241 /* 未讀取完,進入等待 */ 242 else { 243 sk_wait_data(sk, &timeo, last); 244 } 245 246 /* 用戶空間接收數據 */ 247 if (user_recv) { 248 int chunk; 249 250 /* __ Restore normal policy in scheduler __ */ 251 252 /* 獲取讀取長度 */ 253 chunk = len - tp->ucopy.len; 254 255 /* 記錄剩余讀取長度和已經讀取長度 */ 256 if (chunk != 0) { 257 NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk); 258 len -= chunk; 259 copied += chunk; 260 } 261 262 /* 263 接收到的數據已經全部復制到用戶空間 264 && prequeue不為空 265 */ 266 if (tp->rcv_nxt == tp->copied_seq && 267 !skb_queue_empty(&tp->ucopy.prequeue)) { 268 do_prequeue: 269 /* 處理prequeue */ 270 tcp_prequeue_process(sk); 271 272 /* 獲取讀取長度和剩余長度 */ 273 chunk = len - tp->ucopy.len; 274 if (chunk != 0) { 275 NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk); 276 len -= chunk; 277 copied += chunk; 278 } 279 } 280 } 281 282 /* 只是查看數據,則更新peek_seq */ 283 if ((flags & MSG_PEEK) && 284 (peek_seq - copied - urg_hole != tp->copied_seq)) { 285 net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n", 286 current->comm, 287 task_pid_nr(current)); 288 peek_seq = tp->copied_seq; 289 } 290 continue; 291 292 /* 讀取一個找到的合適的段 */ 293 found_ok_skb: 294 /* Ok so how much can we use? */ 295 296 /* 獲取該skb中可讀的數據長度 */ 297 used = skb->len - offset; 298 299 /* 不需要讀取那么多,則調整為需要的長度 */ 300 if (len < used) 301 used = len; 302 303 /* Do we have urgent data here? */ 304 /* 有帶外數據*/ 305 if (tp->urg_data) { 306 /* 帶外數據偏移 */ 307 u32 urg_offset = tp->urg_seq - *seq; 308 309 /* 偏移在我們要讀取的數據范圍內 */ 310 if (urg_offset < used) { 311 /* 當前正在讀取的數據為帶外數據 */ 312 if (!urg_offset) { 313 /* 不允許放入正常數據流 */ 314 if (!sock_flag(sk, SOCK_URGINLINE)) { 315 /* 調整序號和偏移 */ 316 ++*seq; 317 urg_hole++; 318 offset++; 319 used--; 320 /* 無可讀數據 */ 321 if (!used) 322 goto skip_copy; 323 } 324 } 325 /* 本次只能讀到帶外數據為止 */ 326 else 327 used = urg_offset; 328 } 329 } 330 331 /* 讀取數據 */ 332 if (!(flags & MSG_TRUNC)) { 333 err = skb_copy_datagram_msg(skb, offset, msg, used); 334 if (err) { 335 /* Exception. Bailout! */ 336 if (!copied) 337 copied = -EFAULT; 338 break; 339 } 340 } 341 342 /* 計算讀取和待讀取數據長度 */ 343 *seq += used; 344 copied += used; 345 len -= used; 346 347 tcp_rcv_space_adjust(sk); 348 349 skip_copy: 350 /* 完成對帶外數據的處理 */ 351 if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) { 352 /* 標志清零 */ 353 tp->urg_data = 0; 354 /* 快路檢查 */ 355 tcp_fast_path_check(sk); 356 } 357 358 /* 滿足繼續讀取 */ 359 if (used + offset < skb->len) 360 continue; 361 /* fin處理 */ 362 if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN) 363 goto found_fin_ok; 364 365 /* 數據讀取完,不是查看,則釋放該skb */ 366 if (!(flags & MSG_PEEK)) 367 sk_eat_skb(sk, skb); 368 continue; 369 370 found_fin_ok: 371 /* Process the FIN. */ 372 /* 序號增加 */ 373 ++*seq; 374 /* 不是查看,則釋放skb */ 375 if (!(flags & MSG_PEEK)) 376 sk_eat_skb(sk, skb); 377 break; 378 } while (len > 0); 379 380 /* 用戶空間進程接收數據 */ 381 if (user_recv) { 382 /* prequeue不為空 */ 383 if (!skb_queue_empty(&tp->ucopy.prequeue)) { 384 int chunk; 385 386 /* 調整剩余可用空間 */ 387 tp->ucopy.len = copied > 0 ? len : 0; 388 389 /* 處理prequeue */ 390 tcp_prequeue_process(sk); 391 392 /* 讀取了數據,則重新計算下長度 */ 393 if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) { 394 NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk); 395 len -= chunk; 396 copied += chunk; 397 } 398 } 399 400 /* 用戶空間結束讀取 */ 401 tp->ucopy.task = NULL; 402 tp->ucopy.len = 0; 403 } 404 405 /* According to UNIX98, msg_name/msg_namelen are ignored 406 * on connected socket. I was just happy when found this 8) --ANK 407 */ 408 409 /* Clean up data we have read: This will do ACK frames. */ 410 /* 檢查是否有ack發送 */ 411 tcp_cleanup_rbuf(sk, copied); 412 413 release_sock(sk); 414 return copied; 415 416 out: 417 release_sock(sk); 418 return err; 419 420 recv_urg: 421 /* 帶外數據 */ 422 err = tcp_recv_urg(sk, msg, len, flags); 423 goto out; 424 425 recv_sndq: 426 err = tcp_peek_sndq(sk, msg, len); 427 goto out; 428 }