多個進程綁定(bind)同一個端口,當客戶斷發起連接(connect)時,內核會通過一個hash算法決定分配到那個進程上。
Linux 4.5之前的reuseport查找實現(4.3內核)
以下是未優化前的Linux 4.3內核的實現,可見是多么地不直觀。它采用了遍歷HASH沖突鏈表的方式進行reuseport套接字的精確定位:
result = NULL;
badness = 0;
udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) {
score = compute_score2(sk, net, saddr, sport,
daddr, hnum, dif);
if (score > badness) { // 冒泡排序
// 找到了更加合適的socket,需要重新hash
result = sk;
badness = score;
reuseport = sk->sk_reuseport;
if (reuseport) {
hash = udp_ehashfn(net, daddr, hnum,
saddr, sport);
matches = 1;
}
} else if (score == badness && reuseport) { // reuseport套接字散列定位
// 找到了同樣reuseport的socket,進行定位
matches++;
if (reciprocal_scale(hash, matches) == 0)
result = sk;
hash = next_pseudo_random32(hash);
}
}
Linux 4.5(針對UDP)/4.6(針對TCP)的reuseport查找實現
我們來看看在4.5和4.6內核中對於reuseport的查找增加了一些什么神奇的新東西:
result = NULL;
badness = 0;
udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) {
score = compute_score2(sk, net, saddr, sport,
daddr, hnum, dif);
if (score > badness) {
// 在reuseport情形下,意味着找到了更加合適的socket組,需要重新hash
result = sk;
badness = score;
reuseport = sk->sk_reuseport;
if (reuseport) {
hash = udp_ehashfn(net, daddr, hnum,
saddr, sport);
if (select_ok) {
struct sock *sk2;
// 找到了一個組,接着進行組內hash。
sk2 = reuseport_select_sock(sk, hash, skb,
sizeof(struct udphdr));
if (sk2) {
result = sk2;
select_ok = false;
goto found;
}
}
matches = 1;
}
} else if (score == badness && reuseport) {
// 這個else if分支的期待是,在分層查找不適用的時候,尋找更加匹配的reuseport組,注意4.5/4.6以后直接尋找的是一個reuseport組。
// 在某種意義上,這回退到了4.5之前的算法。
matches++;
if (reciprocal_scale(hash, matches) == 0)
result = sk;
hash = next_pseudo_random32(hash);
}
}
struct sock *reuseport_select_sock(struct sock *sk,
u32 hash,
struct sk_buff *skb,
int hdr_len)
{
...
prog = rcu_dereference(reuse->prog);
socks = READ_ONCE(reuse->num_socks);
if (likely(socks)) {
/* paired with smp_wmb() in reuseport_add_sock() */
smp_rmb();
if (prog && skb) // 可以用BPF來從用戶態注入自己的定位邏輯,更好實現基於策略的負載均衡
sk2 = run_bpf(reuse, socks, prog, skb, hdr_len);
else
// reciprocal_scale簡單地將結果限制在了[0,socks)這個區間內
sk2 = reuse->socks[reciprocal_scale(hash, socks)];
}
...
}
單機上的 連接服務器 則可以用端口復用的方式實現負載均衡;也完美解決了nginx之前的驚群現象,也不需要像nginx后來的做法去避免驚群。
下面給出測試用的demo
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#define MAXLINE 100
void* thread_(void* agr)
{
int listenfd,connfd;
struct sockaddr_in servaddr;
char buff[MAXLINE+1];
time_t ticks;
unsigned short port;
int flag=1,len=sizeof(int);
port=10013;
if( (listenfd=socket(AF_INET,SOCK_STREAM,0)) == -1)
{
perror("socket");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
servaddr.sin_port=htons(port);
//SO_REUSEPORT
if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, len) == -1)
{
perror("SO_REUSEADDR");
exit(1);
}
if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &flag, len) == -1)
{
perror("SO_REUSEPORT");
exit(1);
}
if( bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) ==-1)
{
perror("bind");
exit(1);
}
else
printf("bind call OK!\n");
if( listen(listenfd,5) == -1)
{
perror("listen");
exit(1);
}
char buf[] = "hello world.";
for(;;) {
if( (connfd=accept(listenfd,(struct sockaddr*)NULL,NULL)) == -1)
{
perror("accept");
exit(1);
}
send(connfd,buf,sizeof(buf),0);
close(connfd);
printf("pid %d : once\n",pthread_self());
}
}
int main(int argc, char** argv)
{
pthread_t pt;
if( 0!=pthread_create(&pt,NULL,thread_,(void*)0) )
{
perror("pthread_create");
}
thread_((void*)1);
return 0;
}
