多個進程綁定(bind)同一個端口,當客戶斷發起連接(connect)時,內核會通過一個hash算法決定分配到那個進程上。
Linux 4.5之前的reuseport查找實現(4.3內核)
以下是未優化前的Linux 4.3內核的實現,可見是多么地不直觀。它采用了遍歷HASH沖突鏈表的方式進行reuseport套接字的精確定位:
result = NULL; badness = 0; udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) { score = compute_score2(sk, net, saddr, sport, daddr, hnum, dif); if (score > badness) { // 冒泡排序 // 找到了更加合適的socket,需要重新hash result = sk; badness = score; reuseport = sk->sk_reuseport; if (reuseport) { hash = udp_ehashfn(net, daddr, hnum, saddr, sport); matches = 1; } } else if (score == badness && reuseport) { // reuseport套接字散列定位 // 找到了同樣reuseport的socket,進行定位 matches++; if (reciprocal_scale(hash, matches) == 0) result = sk; hash = next_pseudo_random32(hash); } }
Linux 4.5(針對UDP)/4.6(針對TCP)的reuseport查找實現
我們來看看在4.5和4.6內核中對於reuseport的查找增加了一些什么神奇的新東西:
result = NULL; badness = 0; udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) { score = compute_score2(sk, net, saddr, sport, daddr, hnum, dif); if (score > badness) { // 在reuseport情形下,意味着找到了更加合適的socket組,需要重新hash result = sk; badness = score; reuseport = sk->sk_reuseport; if (reuseport) { hash = udp_ehashfn(net, daddr, hnum, saddr, sport); if (select_ok) { struct sock *sk2; // 找到了一個組,接着進行組內hash。 sk2 = reuseport_select_sock(sk, hash, skb, sizeof(struct udphdr)); if (sk2) { result = sk2; select_ok = false; goto found; } } matches = 1; } } else if (score == badness && reuseport) { // 這個else if分支的期待是,在分層查找不適用的時候,尋找更加匹配的reuseport組,注意4.5/4.6以后直接尋找的是一個reuseport組。 // 在某種意義上,這回退到了4.5之前的算法。 matches++; if (reciprocal_scale(hash, matches) == 0) result = sk; hash = next_pseudo_random32(hash); } }
struct sock *reuseport_select_sock(struct sock *sk, u32 hash, struct sk_buff *skb, int hdr_len) { ... prog = rcu_dereference(reuse->prog); socks = READ_ONCE(reuse->num_socks); if (likely(socks)) { /* paired with smp_wmb() in reuseport_add_sock() */ smp_rmb(); if (prog && skb) // 可以用BPF來從用戶態注入自己的定位邏輯,更好實現基於策略的負載均衡 sk2 = run_bpf(reuse, socks, prog, skb, hdr_len); else // reciprocal_scale簡單地將結果限制在了[0,socks)這個區間內 sk2 = reuse->socks[reciprocal_scale(hash, socks)]; } ... }
單機上的 連接服務器 則可以用端口復用的方式實現負載均衡;也完美解決了nginx之前的驚群現象,也不需要像nginx后來的做法去避免驚群。
下面給出測試用的demo
#include <stdio.h> #include <stdlib.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/types.h> #include <unistd.h> #include <pthread.h> #define MAXLINE 100 void* thread_(void* agr) { int listenfd,connfd; struct sockaddr_in servaddr; char buff[MAXLINE+1]; time_t ticks; unsigned short port; int flag=1,len=sizeof(int); port=10013; if( (listenfd=socket(AF_INET,SOCK_STREAM,0)) == -1) { perror("socket"); exit(1); } bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.sin_addr.s_addr=htonl(INADDR_ANY); servaddr.sin_port=htons(port); //SO_REUSEPORT if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, len) == -1) { perror("SO_REUSEADDR"); exit(1); } if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &flag, len) == -1) { perror("SO_REUSEPORT"); exit(1); } if( bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) ==-1) { perror("bind"); exit(1); } else printf("bind call OK!\n"); if( listen(listenfd,5) == -1) { perror("listen"); exit(1); } char buf[] = "hello world."; for(;;) { if( (connfd=accept(listenfd,(struct sockaddr*)NULL,NULL)) == -1) { perror("accept"); exit(1); } send(connfd,buf,sizeof(buf),0); close(connfd); printf("pid %d : once\n",pthread_self()); } } int main(int argc, char** argv) { pthread_t pt; if( 0!=pthread_create(&pt,NULL,thread_,(void*)0) ) { perror("pthread_create"); } thread_((void*)1); return 0; }