Netlink 內核實現分析(二):通信


在前一篇博文《Netlink 內核實現分析(一):創建》中已經較為具體的分析了Linux內核netlink子系統的初始化流程、內核netlink套接字的創建、應用層netlink套接字的創建和綁定流程,本文來具體的分析一下內核是怎樣實現netlink消息在內核和應用進程之間全雙工異步通信的。

一、netlink通信數據結構

1、netlink消息報頭:struct nlmsghdr

struct nlmsghdr {
	__u32		nlmsg_len;	/* Length of message including header */
	__u16		nlmsg_type;	/* Message content */
	__u16		nlmsg_flags;	/* Additional flags */
	__u32		nlmsg_seq;	/* Sequence number */
	__u32		nlmsg_pid;	/* Sending process port ID */
};
netlink消息同TCP/UDP消息一樣,也須要遵循協議要求的格式,每一個netlink消息的開頭是固定長度的netlink報頭。報頭后才是實際的載荷。netlink報頭一共占16個字節,詳細內容即同struct nlmsghdr中定義的一樣。
(1)nlmsg_len:整個netlink消息的長度(包括消息頭);

(2)nlmsg_type:消息狀態。內核在include/uapi/linux/netlink.h中定義了下面4種通用的消息類型,它們各自是:

NLMSG_NOOP:不運行不論什么動作,必須將該消息丟棄;

NLMSG_ERROR:消息錯誤發生;

NLMSG_DONE:標識分組消息的末尾;

NLMSG_OVERRUN:緩沖區溢出。表示某些消息已經丟失。

除了這4種類型的消息以外,不同的netlink協議也能夠自行加入自己所特有的消息類型。可是內核定義了類型保留宏(#define NLMSG_MIN_TYPE 0x10)。即小於該值的消息類型值由內核保留,不可用。

(3)nlmsg_flags:消息標記,它們用以表示消息的類型,相同定義在include/uapi/linux/netlink.h中;

#define NLM_F_REQUEST		1	/* It is request message. 	*/
#define NLM_F_MULTI		2	/* Multipart message, terminated by NLMSG_DONE */
#define NLM_F_ACK		4	/* Reply with ack, with zero or error code */
#define NLM_F_ECHO		8	/* Echo this request 		*/
#define NLM_F_DUMP_INTR		16	/* Dump was inconsistent due to sequence change */

/* Modifiers to GET request */
#define NLM_F_ROOT	0x100	/* specify tree	root	*/
#define NLM_F_MATCH	0x200	/* return all matching	*/
#define NLM_F_ATOMIC	0x400	/* atomic GET		*/
#define NLM_F_DUMP	(NLM_F_ROOT|NLM_F_MATCH)

/* Modifiers to NEW request */
#define NLM_F_REPLACE	0x100	/* Override existing		*/
#define NLM_F_EXCL	0x200	/* Do not touch, if it exists	*/
#define NLM_F_CREATE	0x400	/* Create, if it does not exist	*/
#define NLM_F_APPEND	0x800	/* Add to end of list		*/
(4)nlmsg_seq:消息序列號。用以將消息排隊。有些類似TCP協議中的序號(不全然一樣)。可是netlink的這個字段是可選的,不強制使用;
(5)nlmsg_pid:發送port的ID號,對於內核來說該值就是0,對於用戶進程來說就是其socket所綁定的ID號。

2、socket消息數據包結構:struct msghdr

struct user_msghdr {
	void		__user *msg_name;	/* ptr to socket address structure */
	int		msg_namelen;		/* size of socket address structure */
	struct iovec	__user *msg_iov;	/* scatter/gather array */
	__kernel_size_t	msg_iovlen;		/* # elements in msg_iov */
	void		__user *msg_control;	/* ancillary data */
	__kernel_size_t	msg_controllen;		/* ancillary data buffer length */
	unsigned int	msg_flags;		/* flags on received message */
};
應用層向內核傳遞消息能夠使用sendto()或sendmsg()函數。當中sendmsg函數須要應用程序手動封裝msghdr消息結構,而sendto()函數則會由內核代為分配。當中
(1)msg_name:指向數據包的目的地址;
(2)msg_namelen:目的地址數據結構的長度;
(3)msg_iov:消息包的實際數據塊,定義例如以下:
struct iovec
{
	void *iov_base;	/* BSD uses caddr_t (1003.1g requires void *) */
	__kernel_size_t iov_len; /* Must be size_t (1003.1g) */
};
iov_base:消息包實際載荷的首地址;
iov_len:消息實際載荷的長度。
(4)msg_control:消息的輔助數據;
(5)msg_controllen:消息輔助數據的大小;
(6)msg_flags:接收消息的標識。


對於該結構,我們更須要關注的是前三個變量參數。對於netlink數據包來說當中msg_name指向的就是目的sockaddr_nl地址結構實例的首地址,iov_base指向的就是消息實體中的nlmsghdr消息頭的地址,而iov_len賦值為nlmsghdr中的nlmsg_len就可以(消息頭+實際數據)。

3、netlink消息處理宏

#define NLMSG_ALIGNTO	4U
#define NLMSG_ALIGN(len) ( ((len)+NLMSG_ALIGNTO-1) & ~(NLMSG_ALIGNTO-1) )			/* 對len運行4字節對齊 */
#define NLMSG_HDRLEN	 ((int) NLMSG_ALIGN(sizeof(struct nlmsghdr)))				/* netlink消息頭長度 */
#define NLMSG_LENGTH(len) ((len) + NLMSG_HDRLEN)									/* netlink消息載荷len加上消息頭 */
#define NLMSG_SPACE(len) NLMSG_ALIGN(NLMSG_LENGTH(len))								/* 對netlink消息全長運行字節對齊 */
#define NLMSG_DATA(nlh)  ((void*)(((char*)nlh) + NLMSG_LENGTH(0)))					/* 獲取netlink消息實際載荷位置 */
#define NLMSG_NEXT(nlh,len)	 ((len) -= NLMSG_ALIGN((nlh)->nlmsg_len), \
				  (struct nlmsghdr*)(((char*)(nlh)) + NLMSG_ALIGN((nlh)->nlmsg_len)))/* 取得下一個消息的首地址。同一時候len也降低為剩余消息的總長度 */
#define NLMSG_OK(nlh,len) ((len) >= (int)sizeof(struct nlmsghdr) && \
			   (nlh)->nlmsg_len >= sizeof(struct nlmsghdr) && \
			   (nlh)->nlmsg_len <= (len))											/* 驗證消息的長度 */
#define NLMSG_PAYLOAD(nlh,len) ((nlh)->nlmsg_len - NLMSG_SPACE((len)))				/* 返回PAYLOAD的長度 */
Linux為了處理netlink消息方便,在 include/uapi/linux/netlink.h中定義了以上消息處理宏,用於各種場合。

對於Netlink消息來說,處理例如以下格式(見netlink.h):

/* ========================================================================
 *         Netlink Messages and Attributes Interface (As Seen On TV)
 * ------------------------------------------------------------------------
 *                          Messages Interface
 * ------------------------------------------------------------------------
 *
 * Message Format:
 *    <--- nlmsg_total_size(payload)  --->
 *    <-- nlmsg_msg_size(payload) ->
 *   +----------+- - -+-------------+- - -+-------- - -
 *   | nlmsghdr | Pad |   Payload   | Pad | nlmsghdr
 *   +----------+- - -+-------------+- - -+-------- - -
 *   nlmsg_data(nlh)---^                   ^
 *   nlmsg_next(nlh)-----------------------+
 *
 * Payload Format:
 *    <---------------------- nlmsg_len(nlh) --------------------->
 *    <------ hdrlen ------>       <- nlmsg_attrlen(nlh, hdrlen) ->
 *   +----------------------+- - -+--------------------------------+
 *   |     Family Header    | Pad |           Attributes           |
 *   +----------------------+- - -+--------------------------------+
 *   nlmsg_attrdata(nlh, hdrlen)---^
 *
 * ------------------------------------------------------------------------
 *                          Attributes Interface
 * ------------------------------------------------------------------------
 *
 * Attribute Format:
 *    <------- nla_total_size(payload) ------->
 *    <---- nla_attr_size(payload) ----->
 *   +----------+- - -+- - - - - - - - - +- - -+-------- - -
 *   |  Header  | Pad |     Payload      | Pad |  Header
 *   +----------+- - -+- - - - - - - - - +- - -+-------- - -
 *                     <- nla_len(nla) ->      ^
 *   nla_data(nla)----^                        |
 *   nla_next(nla)-----------------------------'
 *
 *=========================================================================
 */

二、應用層向內核發送netlink消息

使用例如以下演示樣例程序可向內核netlink套接字發送消息:
#define TEST_DATA_LEN	16
#DEFINE TEST_DATA		"netlink send test"		/* 僅作為演示樣例,內核NETLINK_ROUTE套接字無法解析 */

struct sockaddr_nl nladdr;
struct msghdr msg;
struct nlmsghdr *nlhdr;
struct iovec iov;

/* 填充目的地址結構 */
memset(&nladdr, 0, sizeof(nladdr));
nladdr.nl_family = AF_NETLINK;
nladdr.nl_pid = 0;						/* 地址為內核 */
nladdr.nl_groups = 0;						/* 單播 */

/* 填充netlink消息頭 */
nlhdr = (struct nlmsghdr *)malloc(NLMSG_SPACE(TEST_DATA_LEN));

nlhdr->nlmsg_len = NLMSG_LENGTH(TEST_DATA_LEN);
nlhdr->nlmsg_flags = NLM_F_REQUEST;
nlhdr->nlmsg_pid = get_pid();					/* 當前套接字所綁定的ID號(此處為本進程的PID) */
nlhdr->nlmsg_seq = 0;

/* 填充netlink消息實際載荷 */
strcpy(NLMSG_DATA(nlhdr), TEST_DATA);
iov.iov_base = (void *)nlhdr;
iov.iov_len = nlhdr->nlmsg_len;
	
/* 填充數據消息結構 */
memset(&msg, 0, sizeof(msg));
msg.msg_name = (void *)&(nladdr);
msg.msg_namelen = sizeof(nladdr);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

/* 發送netlink消息 */
sendmsg (sock, &msg, 0); /* sock描寫敘述符見《Netlink 內核實現分析(一):創建》。為NETLINK_ROUTE類型套接字 */
這里列出了一個調用sendmsg向內核發送消息的演示樣例代碼片段(僅作為演示樣例,發送的消息內核netlink套接字可能無法解析)。首先初始化目的地址數據結構,設置nl_pid和nl_groups為0指定消息的目的地址為內核;然后初始化netlink消息頭指明消息的長度為TEST_DATA_LEN + NLMSG_ALIGN(sizeof(struct nlmsghdr))(包括消息頭),發送端的ID號為發送socket消息所綁定的ID號(這樣內核才知道消息是誰發送的);然后設置消息的實際載荷,將數據復制到緊接消息頭后的實際載荷部分;最后組裝成msg消息就能夠調用sendmsg向內核發送了。


以下尾隨內核的sendmsg系統調用的整個流程來分析消息是怎樣被送到內核的(須要注意的是,在不使用NETLINK_MMAP技術的情況下。整個發送的過程中存在1~2次數據的內存拷貝動作。后面會逐一點出。):


圖1 用戶態netlink數據發送流程
SYSCALL_DEFINE3(sendmsg, int, fd, struct user_msghdr __user *, msg, unsigned int, flags)
{
	if (flags & MSG_CMSG_COMPAT)
		return -EINVAL;
	return __sys_sendmsg(fd, msg, flags);
}
long __sys_sendmsg(int fd, struct user_msghdr __user *msg, unsigned flags)
{
	int fput_needed, err;
	struct msghdr msg_sys;
	struct socket *sock;

	sock = sockfd_lookup_light(fd, &err, &fput_needed);
	if (!sock)
		goto out;

	err = ___sys_sendmsg(sock, msg, &msg_sys, flags, NULL);

	fput_light(sock->file, fput_needed);
out:
	return err;
}
sendmsg系統調用調用__sys_sendmsg來進行實際的操作。這里首先通過fd描寫敘述符找到相應的socket套接字結構實例,然后調用___sys_sendmsg()函數,傳入的參數中第三個和最后一個須要關注一下,當中第三個它是一個內核版的socket消息數據包結構,同應用層的略有不同,定義例如以下:
struct msghdr {
	void		*msg_name;	/* ptr to socket address structure */
	int		msg_namelen;	/* size of socket address structure */
	struct iov_iter	msg_iter;	/* data */
	void		*msg_control;	/* ancillary data */
	__kernel_size_t	msg_controllen;	/* ancillary data buffer length */
	unsigned int	msg_flags;	/* flags on received message */
	struct kiocb	*msg_iocb;	/* ptr to iocb for async requests */
};
當中msg_name、msg_namelen、msg_control、msg_controllen和msg_flags字段同應用層的含義是一樣的。msg_iter為msg_iov和msg_iovlen的合體,最后msg_iocb用於異步請求。最后一個參數是一個struct used_address結構體指針,這個結構體定義例如以下:
struct used_address {
	struct sockaddr_storage name;
	unsigned int name_len;
};
這里的name字段用來存儲消息的地址,name_len字段是消息地址的長度,它們同struct msghdr結構體的前兩個字段一致。該結構體主要用與sendmmsg系統調用(用於同事時向一個socket地址發送多個數據包,能夠避免反復的網絡security檢查。從而提高發送效率)保存多個數據包的目的地址。

如今這里設置為NULL,表示不使用。繼續往下分析,進入___sys_sendmsg()函數內部,這個函數比較長,分段來分析:

static int ___sys_sendmsg(struct socket *sock, struct user_msghdr __user *msg,
			 struct msghdr *msg_sys, unsigned int flags,
			 struct used_address *used_address)
{
	struct compat_msghdr __user *msg_compat =
	    (struct compat_msghdr __user *)msg;
	struct sockaddr_storage address;
	struct iovec iovstack[UIO_FASTIOV], *iov = iovstack;
	unsigned char ctl[sizeof(struct cmsghdr) + 20]
	    __attribute__ ((aligned(sizeof(__kernel_size_t))));
	/* 20 is size of ipv6_pktinfo */
	unsigned char *ctl_buf = ctl;
	int ctl_len;
	ssize_t err;

	msg_sys->msg_name = &address;

	if (MSG_CMSG_COMPAT & flags)
		err = get_compat_msghdr(msg_sys, msg_compat, NULL, &iov);
	else
		err = copy_msghdr_from_user(msg_sys, msg, NULL, &iov);
	if (err < 0)
		return err;
這里的iovstack數組是用來加速用戶數據拷貝的(這里假定用戶數據的iovec個數通常不會超過UIO_FASTIOV個。假設超過會通過kmalloc分配內存)。首先這里推斷flag中是否設置了32bit修正標識。從前文中系統調用的入口處已經能夠看出了。這里顯然不會設置該標識位,所以這里調用copy_msghdr_from_user函數將用戶空間傳入的消息(struct user_msghdr __user *msg)安全的復制到內核空間中(struct msghdr *msg_sys),來簡單的看一下這個函數:
static int copy_msghdr_from_user(struct msghdr *kmsg,
				 struct user_msghdr __user *umsg,
				 struct sockaddr __user **save_addr,
				 struct iovec **iov)
{
	struct sockaddr __user *uaddr;
	struct iovec __user *uiov;
	size_t nr_segs;
	ssize_t err;

	if (!access_ok(VERIFY_READ, umsg, sizeof(*umsg)) ||
	    __get_user(uaddr, &umsg->msg_name) ||
	    __get_user(kmsg->msg_namelen, &umsg->msg_namelen) ||
	    __get_user(uiov, &umsg->msg_iov) ||
	    __get_user(nr_segs, &umsg->msg_iovlen) ||
	    __get_user(kmsg->msg_control, &umsg->msg_control) ||
	    __get_user(kmsg->msg_controllen, &umsg->msg_controllen) ||
	    __get_user(kmsg->msg_flags, &umsg->msg_flags))
		return -EFAULT;

	if (!uaddr)
		kmsg->msg_namelen = 0;

	if (kmsg->msg_namelen < 0)
		return -EINVAL;

	if (kmsg->msg_namelen > sizeof(struct sockaddr_storage))
		kmsg->msg_namelen = sizeof(struct sockaddr_storage);

	if (save_addr)
		*save_addr = uaddr;

	if (uaddr && kmsg->msg_namelen) {
		if (!save_addr) {
			err = move_addr_to_kernel(uaddr, kmsg->msg_namelen,
						  kmsg->msg_name);
			if (err < 0)
				return err;
		}
	} else {
		kmsg->msg_name = NULL;
		kmsg->msg_namelen = 0;
	}

	if (nr_segs > UIO_MAXIOV)
		return -EMSGSIZE;

	kmsg->msg_iocb = NULL;

	return import_iovec(save_addr ?

READ : WRITE, uiov, nr_segs, UIO_FASTIOV, iov, &kmsg->msg_iter); }

函數首先調用access_ok檢查用戶數據的有效性,然后調用__get_user函數運行單數據的復制操作(並沒有復制數據包內容),接着做一些簡單的入參推斷。

然后假設用戶消息中存在目的地址且入參save_addr為空(當前情景中正好就是這類情況)。就調用move_addr_to_kernel()函數將消息地址復制到內核kmsg的結構中。否則將kmsg中的目的地址和長度字段置位空。接下來推斷消息實際載荷iovec結構的個數,這里UIO_MAXIOV值定義為1024,也就是說消息數據iovec結構的最大個數不能超過這個值,這點很重要。
最后調用import_iovec()函數開始運行實際數據從用戶態向內核態的拷貝動作(注意這里並沒有拷貝用戶空間實際消息載荷數據,只檢查了用戶地址有效性並拷貝了長度等字段)。在拷貝完畢后,&kmsg->msg_iter中的數據初始化情況例如以下:

int type:WRITE;
size_t iov_offset:初始化為0;
size_t count:全部iovec結構數據的總長度(即iov->iov_len的總和);
const struct iovec *iov:首個iov結構指針;
unsigned long nr_segs:iovec結構的個數。

數據拷貝完畢后,再回到___sys_sendmsg函數中繼續分析:
	err = -ENOBUFS;

	if (msg_sys->msg_controllen > INT_MAX)
		goto out_freeiov;
	ctl_len = msg_sys->msg_controllen;
	if ((MSG_CMSG_COMPAT & flags) && ctl_len) {
		err =
		    cmsghdr_from_user_compat_to_kern(msg_sys, sock->sk, ctl,
						     sizeof(ctl));
		if (err)
			goto out_freeiov;
		ctl_buf = msg_sys->msg_control;
		ctl_len = msg_sys->msg_controllen;
	} else if (ctl_len) {
		if (ctl_len > sizeof(ctl)) {
			ctl_buf = sock_kmalloc(sock->sk, ctl_len, GFP_KERNEL);
			if (ctl_buf == NULL)
				goto out_freeiov;
		}
		err = -EFAULT;
		/*
		 * Careful! Before this, msg_sys->msg_control contains a user pointer.
		 * Afterwards, it will be a kernel pointer. Thus the compiler-assisted
		 * checking falls down on this.
		 */
		if (copy_from_user(ctl_buf,
				   (void __user __force *)msg_sys->msg_control,
				   ctl_len))
			goto out_freectl;
		msg_sys->msg_control = ctl_buf;
	}
這一段程序是用來拷貝消息輔助數據的,比較直觀,我們前文中的演示樣例程序並沒有傳遞輔助數據。所以這里不具體分析。繼續往下看:
	msg_sys->msg_flags = flags;

	if (sock->file->f_flags & O_NONBLOCK)
		msg_sys->msg_flags |= MSG_DONTWAIT;
	/*
	 * If this is sendmmsg() and current destination address is same as
	 * previously succeeded address, omit asking LSM's decision.
	 * used_address->name_len is initialized to UINT_MAX so that the first
	 * destination address never matches.
	 */
	if (used_address && msg_sys->msg_name &&
	    used_address->name_len == msg_sys->msg_namelen &&
	    !memcmp(&used_address->name, msg_sys->msg_name,
		    used_address->name_len)) {
		err = sock_sendmsg_nosec(sock, msg_sys);
		goto out_freectl;
	}
	err = sock_sendmsg(sock, msg_sys);
	/*
	 * If this is sendmmsg() and sending to current destination address was
	 * successful, remember it.
	 */
	if (used_address && err >= 0) {
		used_address->name_len = msg_sys->msg_namelen;
		if (msg_sys->msg_name)
			memcpy(&used_address->name, msg_sys->msg_name,
			       used_address->name_len);
	}
這里首先保存用戶傳遞的flag標識。然后推斷假設當前的socket已經被配置為非堵塞模式則置位MSG_DONTWAIT標識(定義在include/linux/socket.h中)。接下來通過傳入的used_address指針推斷當前發送消息的目的地址是否同它記錄的一致,假設一致則調用sock_sendmsg_nosec()函數發送數據,否則調用sock_sendmsg()函數發送數據。sock_sendmsg()事實上終於也是通過調用sock_sendmsg_nosec()來發送數據的,它們的差別就在於是否調用安全檢查函數,例如以下:
int sock_sendmsg(struct socket *sock, struct msghdr *msg)
{
	int err = security_socket_sendmsg(sock, msg,
					  msg_data_left(msg));

	return err ?

: sock_sendmsg_nosec(sock, msg); } EXPORT_SYMBOL(sock_sendmsg);

在sendmmsg系統調用每一次發送多個消息時,因為發送的目的地一般都是一致的,所以僅僅須要在發送第一個消息爆時運行檢查就能夠了,通過這樣的策略就能夠加速數據的發送。最后,在發送完數據后,假設傳入的used_address指針非空,就會將本次成功發送數據的目的地址記錄下來。供下次發送數據比較。

接下來進入sock_sendmsg_nosec(sock, msg_sys)內部繼續分析消息的發送流程:
static inline int sock_sendmsg_nosec(struct socket *sock, struct msghdr *msg)
{
	int ret = sock->ops->sendmsg(sock, msg, msg_data_left(msg));
	BUG_ON(ret == -EIOCBQUEUED);
	return ret;
}
這里調用了socket所綁定協議特有的數據發送鈎子函數,當中最后一個參數為msg->msg_iter->count。即消息實際載荷的總長度。在前一篇文章中已經看到了對於netlink類型的套接字來說該函數被注冊為netlink_sendmsg(),以下來分析這個函數,這個函數較長,分段分析:
static int netlink_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
{
	struct sock *sk = sock->sk;
	struct netlink_sock *nlk = nlk_sk(sk);
	DECLARE_SOCKADDR(struct sockaddr_nl *, addr, msg->msg_name);
	u32 dst_portid;
	u32 dst_group;
	struct sk_buff *skb;
	int err;
	struct scm_cookie scm;
	u32 netlink_skb_flags = 0;

	if (msg->msg_flags&MSG_OOB)
		return -EOPNOTSUPP;

	err = scm_send(sock, msg, &scm, true);
	if (err < 0)
		return err;
首先,這里定義了一個struct sockaddr_nl *addr指針,它指向了msg->msg_name表示消息的目的地址(會做地址長度檢查)。然后調用scm_send()發送消息輔助數據(不分析)。
	if (msg->msg_namelen) {
		err = -EINVAL;
		if (addr->nl_family != AF_NETLINK)
			goto out;
		dst_portid = addr->nl_pid;
		dst_group = ffs(addr->nl_groups);
		err =  -EPERM;
		if ((dst_group || dst_portid) &&
		    !netlink_allowed(sock, NL_CFG_F_NONROOT_SEND))
			goto out;
		netlink_skb_flags |= NETLINK_SKB_DST;
	} else {
		dst_portid = nlk->dst_portid;
		dst_group = nlk->dst_group;
	}
這里假設用戶指定了netlink消息的目的地址。則對其進行校驗,然后推斷當前netlink協議的NL_CFG_F_NONROOT_SEND標識是否設置。假設設置了改標識則同意非root用戶發送組播,對於NETLINK_ROUTE類型的netlink套接字,並沒有設置該標識。表明非root用戶不能發送組播消息;然后設置NETLINK_SKB_DST標識。假設用戶沒有指定netlink消息的目的地址,則使用netlink套接字默認的(該值默覺得0,會在調用connect系統調用時在netlink_connect()中被賦值為用戶設置的值)。注意這里dst_group經過ffs的處理后轉化為組播地址位數(找到最低有效位)。
	if (!nlk->bound) {
		err = netlink_autobind(sock);
		if (err)
			goto out;
	} else {
		/* Ensure nlk is hashed and visible. */
		smp_rmb();
	}
接下來推斷當前的netlink套接字是否被綁定過,假設沒有綁定過這里調用netlink_autobind()進行動態綁定。該函數在前一篇文章中已經分析,繼續往下分析
	/* It's a really convoluted way for userland to ask for mmaped
	 * sendmsg(), but that's what we've got...
	 */
	if (netlink_tx_is_mmaped(sk) &&
	    msg->msg_iter.type == ITER_IOVEC &&
	    msg->msg_iter.nr_segs == 1 &&
	    msg->msg_iter.iov->iov_base == NULL) {
		err = netlink_mmap_sendmsg(sk, msg, dst_portid, dst_group,
					   &scm);
		goto out;
	}
假設內核配置了CONFIG_NETLINK_MMAP內核選項。則表示內核空間和應用層的消息發送隊列支持內存映射,然后通過調用netlink_mmap_sendmsg來發送netlink消息,該種方式將降低數據的內存數據的拷貝動作,降低發送時間和資源占用。

現我的環境中並不支持,繼續往下分析:

	err = -EMSGSIZE;
	if (len > sk->sk_sndbuf - 32)
		goto out;
	err = -ENOBUFS;
	skb = netlink_alloc_large_skb(len, dst_group);
	if (skb == NULL)
		goto out;

接下來推斷須要發送的數據是否過長(長於發送緩存大小),然后通過netlink_alloc_large_skb分配skb結構(傳入的參數為消息載荷的長度以及組播地址)。
	NETLINK_CB(skb).portid	= nlk->portid;
	NETLINK_CB(skb).dst_group = dst_group;
	NETLINK_CB(skb).creds	= scm.creds;
	NETLINK_CB(skb).flags	= netlink_skb_flags;

	err = -EFAULT;
	if (memcpy_from_msg(skb_put(skb, len), msg, len)) {
		kfree_skb(skb);
		goto out;
	}

	err = security_netlink_send(sk, skb);
	if (err) {
		kfree_skb(skb);
		goto out;
	}

	if (dst_group) {
		atomic_inc(&skb->users);
		netlink_broadcast(sk, skb, dst_portid, dst_group, GFP_KERNEL);
	}
	err = netlink_unicast(sk, skb, dst_portid, msg->msg_flags&MSG_DONTWAIT);
在成功創建skb結構之后,這里就開始初始化它。這里使用到了skb中的擴展cb字段(char cb[48] __aligned(8),一共48個字節用於存放netlink的地址和標識相關的附加信息足夠了),同一時候使用宏NETLINK_CB來操作這些字段。netlink將skb的cb字段強制定義為struct netlink_skb_parms結構:
struct netlink_skb_parms {
	struct scm_creds	creds;		/* Skb credentials	*/
	__u32			portid;
	__u32			dst_group;
	__u32			flags;
	struct sock		*sk;
};
當中portid表示原端套接字所綁定的id,dst_group表示消息目的組播地址。flag為標識,sk指向原端套接字的sock結構。


這里首先將套接字綁定的portid賦值到skb得cb字段中、同一時候設置組播地址的數量以及netlink_skb標識(這里是已經置位NETLINK_SKB_DST)。接下來調用最關鍵的調用memcpy_from_msg拷貝數據。它首先調用skb_put調整skb->tail指針。然后運行copy_from_iter(data, len, &msg->msg_iter)將數據從msg->msg_iter中傳輸到skb->data中(這是第一次內存拷貝動作。將用戶空間數據直接復制到內核skb中)。

接下來調用security_netlink_send()運行security檢查,最后假設是組播發送則調用netlink_broadcast()發送消息,否則調用netlink_unicast()發送單播消息。我們先尾隨當前的情景來分析單播發送函數netlink_unicast():
int netlink_unicast(struct sock *ssk, struct sk_buff *skb,
		    u32 portid, int nonblock)
{
	struct sock *sk;
	int err;
	long timeo;

	skb = netlink_trim(skb, gfp_any());

	timeo = sock_sndtimeo(ssk, nonblock);
retry:
	sk = netlink_getsockbyportid(ssk, portid);
	if (IS_ERR(sk)) {
		kfree_skb(skb);
		return PTR_ERR(sk);
	}
	if (netlink_is_kernel(sk))
		return netlink_unicast_kernel(sk, skb, ssk);

	if (sk_filter(sk, skb)) {
		err = skb->len;
		kfree_skb(skb);
		sock_put(sk);
		return err;
	}

	err = netlink_attachskb(sk, skb, &timeo, ssk);
	if (err == 1)
		goto retry;
	if (err)
		return err;

	return netlink_sendskb(sk, skb);
}
這里首先調用netlink_trim()又一次裁剪skb的數據區的大小,這可能會clone出一個新的skb結構同一時候又一次分配skb->data的內存空間(這就出現了第三次的內存拷貝動作!)。當然假設原本skb中多余的內存數據區很小或者該內存空間是在vmalloc空間中的就不會運行上述操作。我們如今尾隨的情景上下文中就是后一種情況。並不會又一次分配空間。
接下來記下發送超時等待時間。假設已經設置了MSG_DONTWAIT標識,則等待時間為0。否則返回sk->sk_sndtimeo(該值在sock初始化時由sock_init_data()函數賦值為MAX_SCHEDULE_TIMEOUT)。
接下來依據目的portid號和原端sock結構查找目的端的sock結構:
static struct sock *netlink_getsockbyportid(struct sock *ssk, u32 portid)
{
	struct sock *sock;
	struct netlink_sock *nlk;

	sock = netlink_lookup(sock_net(ssk), ssk->sk_protocol, portid);
	if (!sock)
		return ERR_PTR(-ECONNREFUSED);

	/* Don't bother queuing skb if kernel socket has no input function */
	nlk = nlk_sk(sock);
	if (sock->sk_state == NETLINK_CONNECTED &&
	    nlk->dst_portid != nlk_sk(ssk)->portid) {
		sock_put(sock);
		return ERR_PTR(-ECONNREFUSED);
	}
	return sock;
}
這里首先調用netlink_lookup運行查找工作,查找的命名空間和協議號同原端sock,它會從nl_table[protocol]的哈希表中找到已經注冊的目的端sock套接字。找到以后運行校驗。如若找到的socket已經connect了,則它的目的portid必須是原端的portid。
接下來推斷目的的netlink socket是否是內核的netlink socket:
static inline int netlink_is_kernel(struct sock *sk)
{
	return nlk_sk(sk)->flags & NETLINK_KERNEL_SOCKET;
}
假設目的地址是內核空間,則調用netlink_unicast_kernel向內核進行單播,入參是目的sock、原端sock和數據skb。

否則繼續向下運行。如今的情景中,我們尾隨用戶空間中發送的數據。進入netlink_unicast_kernel()中:

static int netlink_unicast_kernel(struct sock *sk, struct sk_buff *skb,
				  struct sock *ssk)
{
	int ret;
	struct netlink_sock *nlk = nlk_sk(sk);

	ret = -ECONNREFUSED;
	if (nlk->netlink_rcv != NULL) {
		ret = skb->len;
		netlink_skb_set_owner_r(skb, sk);
		NETLINK_CB(skb).sk = ssk;
		netlink_deliver_tap_kernel(sk, ssk, skb);
		nlk->netlink_rcv(skb);
		consume_skb(skb);
	} else {
		kfree_skb(skb);
	}
	sock_put(sk);
	return ret;
}
檢查目標netlink套接字是否注冊了netlink_rcv()接收函數。假設沒有則直接丟棄該數據包,否則繼續發送流程,這里首先設置一些標識:
skb->sk = sk;     /* 將目的sock賦值給skb->sk指針 */
skb->destructor = netlink_skb_destructor;   /* 注冊destructor鈎子函數 */
NETLINK_CB(skb).sk = ssk;   /* 將原端的sock保存早skb的cb擴展字段中 */
最后就調用了nlk->netlink_rcv(skb)函數將消息送到內核中的目的netlink套接字中了。

在前一篇文章中已經看到在內核注冊netlink套接字的時候已經將其接收函數注冊到了netlink_rcv中:

struct sock *
__netlink_kernel_create(struct net *net, int unit, struct module *module,
			struct netlink_kernel_cfg *cfg)
{
	......
	if (cfg && cfg->input)
		nlk_sk(sk)->netlink_rcv = cfg->input;
對於NETLINK_ROUTE類型的套接字來說就是rtnetlink_rcv了,netlink_rcv()鈎子函數會接收並解析用戶傳下來的數據。不同類型的netlink協議各不同樣,這里就不進行分析了。

至此應用層下發單播的netlink數據就下發完畢了。

如今我們回到netlink_sendmsg()函數中簡單分析一下發送組播數據的流程是如何的:
static int netlink_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
{
	......

	if (dst_group) {
		atomic_inc(&skb->users);
		netlink_broadcast(sk, skb, dst_portid, dst_group, GFP_KERNEL);
	}
int netlink_broadcast(struct sock *ssk, struct sk_buff *skb, u32 portid,
		      u32 group, gfp_t allocation)
{
	return netlink_broadcast_filtered(ssk, skb, portid, group, allocation,
		NULL, NULL);
}
EXPORT_SYMBOL(netlink_broadcast);
這里間接調用netlink_broadcast_filtered()函數:
int netlink_broadcast_filtered(struct sock *ssk, struct sk_buff *skb, u32 portid,
	u32 group, gfp_t allocation,
	int (*filter)(struct sock *dsk, struct sk_buff *skb, void *data),
	void *filter_data)
{
	struct net *net = sock_net(ssk);
	struct netlink_broadcast_data info;
	struct sock *sk;

	skb = netlink_trim(skb, allocation);

	info.exclude_sk = ssk;
	info.net = net;
	info.portid = portid;
	info.group = group;
	info.failure = 0;
	info.delivery_failure = 0;
	info.congested = 0;
	info.delivered = 0;
	info.allocation = allocation;
	info.skb = skb;
	info.skb2 = NULL;
	info.tx_filter = filter;
	info.tx_data = filter_data;

	/* While we sleep in clone, do not allow to change socket list */

	netlink_lock_table();

	sk_for_each_bound(sk, &nl_table[ssk->sk_protocol].mc_list)
		do_one_broadcast(sk, &info);
		
	......
}
這里首先初始化netlink組播數據結構netlink_broadcast_data,當中info.group中保存了目的組播地址。然后從nl_table[ssk->sk_protocol].mc_list里邊查找增加組播組的socket,並調用do_one_broadcast()函數依次發送組播數據:
static void do_one_broadcast(struct sock *sk,
				    struct netlink_broadcast_data *p)
{
	struct netlink_sock *nlk = nlk_sk(sk);
	int val;

	if (p->exclude_sk == sk)
		return;

	if (nlk->portid == p->portid || p->group - 1 >= nlk->ngroups ||
	    !test_bit(p->group - 1, nlk->groups))
		return;

	if (!net_eq(sock_net(sk), p->net))
		return;

	if (p->failure) {
		netlink_overrun(sk);
		return;
	}

	......
	} else if ((val = netlink_broadcast_deliver(sk, p->skb2)) < 0) {
		netlink_overrun(sk);
		if (nlk->flags & NETLINK_BROADCAST_SEND_ERROR)
			p->delivery_failure = 1;
	......
}
當然,在發送之前會做一些必要的檢查,比如這里會確保原端sock和目的端sock不是同一個。它們屬於同一個網絡命名空間,目的的組播地址為發送的目的組播地址等等,然后會對skb和組播數據結構netlink_broadcast_data進行一些處理,最后調用
netlink_broadcast_deliver()函數對目的sock發送數據skb:
static int netlink_broadcast_deliver(struct sock *sk, struct sk_buff *skb)
{
	......
		__netlink_sendskb(sk, skb);
	......
}

static int __netlink_sendskb(struct sock *sk, struct sk_buff *skb)
{
	int len = skb->len;

	......
	skb_queue_tail(&sk->sk_receive_queue, skb);
	sk->sk_data_ready(sk);
	return len;
}
能夠看到,這里將要發送的skb加入到目的sock的接收隊列末尾,然后調用sk_data_ready()通知鈎子函數,告知目的sock有數據到達,運行處理流程。

對於內核的netlink來說內核netlink的創建函數中已經將其注冊為

struct sock *
__netlink_kernel_create(struct net *net, int unit, struct module *module,
			struct netlink_kernel_cfg *cfg)
{
	......
	sk->sk_data_ready = netlink_data_ready;
	......
}
static void netlink_data_ready(struct sock *sk)
{
	BUG();
}
很明顯了。內核netlink套接字是不管怎樣也不應該接收到組播消息的。可是對於應用層netlink套接字,該sk_data_ready()鈎子函數在初始化netlink函數sock_init_data()中被注冊為sock_def_readable(),這個函數后面再分析。
至此應用層下發的netlink數據流程就分析結束了。

三、內核向應用層發送netlink消息


圖2 內核發送netlink單播消息

內核能夠通過nlmsg_unicast()函數向應用層發送單播消息,由各個netlink協議負責調用,也有的協議是直接調用netlink_unicast()函數,事實上nlmsg_unicast()也僅是netlink_unicast()的一個封裝而已:
/**
 * nlmsg_unicast - unicast a netlink message
 * @sk: netlink socket to spread message to
 * @skb: netlink message as socket buffer
 * @portid: netlink portid of the destination socket
 */
static inline int nlmsg_unicast(struct sock *sk, struct sk_buff *skb, u32 portid)
{
	int err;

	err = netlink_unicast(sk, skb, portid, MSG_DONTWAIT);
	if (err > 0)
		err = 0;

	return err;
}
這里以非堵塞(MSG_DONTWAIT)的形式向應用層發送消息,這時的portid為應用層套接字所綁定的id號。我們再次進入到netlink_unicast()內部,這次因為目的sock不再是內核,所以要走不同的的分支了
int netlink_unicast(struct sock *ssk, struct sk_buff *skb,
		    u32 portid, int nonblock)
{
	struct sock *sk;
	int err;
	long timeo;

	skb = netlink_trim(skb, gfp_any());

	timeo = sock_sndtimeo(ssk, nonblock);
retry:
	sk = netlink_getsockbyportid(ssk, portid);
	if (IS_ERR(sk)) {
		kfree_skb(skb);
		return PTR_ERR(sk);
	}
	if (netlink_is_kernel(sk))
		return netlink_unicast_kernel(sk, skb, ssk);

	if (sk_filter(sk, skb)) {
		err = skb->len;
		kfree_skb(skb);
		sock_put(sk);
		return err;
	}

	err = netlink_attachskb(sk, skb, &timeo, ssk);
	if (err == 1)
		goto retry;
	if (err)
		return err;

	return netlink_sendskb(sk, skb);
}
EXPORT_SYMBOL(netlink_unicast);
這里首先sk_filter運行防火牆的過濾,確保能夠發送以后調用netlink_attachskb將要發送的skb綁定到netlink sock上。
int netlink_attachskb(struct sock *sk, struct sk_buff *skb,
		      long *timeo, struct sock *ssk)
{
	struct netlink_sock *nlk;

	nlk = nlk_sk(sk);

	if ((atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||
	     test_bit(NETLINK_CONGESTED, &nlk->state)) &&
	    !netlink_skb_is_mmaped(skb)) {
		DECLARE_WAITQUEUE(wait, current);
		if (!*timeo) {
			if (!ssk || netlink_is_kernel(ssk))
				netlink_overrun(sk);
			sock_put(sk);
			kfree_skb(skb);
			return -EAGAIN;
		}

		__set_current_state(TASK_INTERRUPTIBLE);
		add_wait_queue(&nlk->wait, &wait);

		if ((atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||
		     test_bit(NETLINK_CONGESTED, &nlk->state)) &&
		    !sock_flag(sk, SOCK_DEAD))
			*timeo = schedule_timeout(*timeo);

		__set_current_state(TASK_RUNNING);
		remove_wait_queue(&nlk->wait, &wait);
		sock_put(sk);

		if (signal_pending(current)) {
			kfree_skb(skb);
			return sock_intr_errno(*timeo);
		}
		return 1;
	}
	netlink_skb_set_owner_r(skb, sk);
	return 0;
}
假設目的sock的接收緩沖區剩余的的緩存大小小於已經提交的數據量。或者標志位已經置位了堵塞標識NETLINK_CONGESTED,這表明數據不能夠馬上的送到目的端的接收緩存中。因此,在原端不是內核socket且沒有設置非堵塞標識的情況下會定義一個等待隊列並等待指定的時間並返回1,否則直接丟棄該skb數據包並返回失敗。
若目的端的接收緩存區空間足夠。就會調用netlink_skb_set_owner_r進行綁定。

回到netlink_unicast()函數中,能夠看到若運行netlink_attachskb()的返回值為1。就會再次嘗試發送操作。

最后調用netlink_sendskb()運行發送操作:

int netlink_sendskb(struct sock *sk, struct sk_buff *skb)
{
	int len = __netlink_sendskb(sk, skb);

	sock_put(sk);
	return len;
}

這里重新回到了__netlink_sendskb函數運行發送流程:
static int __netlink_sendskb(struct sock *sk, struct sk_buff *skb)
{
	int len = skb->len;

	netlink_deliver_tap(skb);

#ifdef CONFIG_NETLINK_MMAP
	if (netlink_skb_is_mmaped(skb))
		netlink_queue_mmaped_skb(sk, skb);
	else if (netlink_rx_is_mmaped(sk))
		netlink_ring_set_copied(sk, skb);
	else
#endif /* CONFIG_NETLINK_MMAP */
		skb_queue_tail(&sk->sk_receive_queue, skb);
	sk->sk_data_ready(sk);
	return len;
}
這里的sk_data_ready()鈎子函數在初始化netlink函數sock_init_data()中被注冊為sock_def_readable()。進入分析一下:
static void sock_def_readable(struct sock *sk)
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (wq_has_sleeper(wq))
		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
						POLLRDNORM | POLLRDBAND);
	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
	rcu_read_unlock();
}

這里喚醒目的接收端socket的等待隊列。這樣應用層套接字就能夠接收並處理消息了。
以上分析了內核發送單播消息,以下來看一下內核發送多播消息流程。該流程很easy,它的入口是nlmsg_multicast():
static inline int nlmsg_multicast(struct sock *sk, struct sk_buff *skb,
				  u32 portid, unsigned int group, gfp_t flags)
{
	int err;

	NETLINK_CB(skb).dst_group = group;

	err = netlink_broadcast(sk, skb, portid, group, flags);
	if (err > 0)
		err = 0;

	return err;
}
nlmsg_multicast及興許的流程前文中都已分析過了。此處不再贅述。至此內核發送netlink消息已經完畢。下滿來看一下應用層是怎樣接收該消息的。

四、應用層接收內核netlink消息

使用例如以下演示樣例程序能夠以堵塞的方式接收內核發送的netlink消息:
#define TEST_DATA_LEN	16

struct sockaddr_nl nladdr;
struct msghdr msg;
struct nlmsghdr *nlhdr;
struct iovec iov;

/* 清空源地址結構 */
memset(&nladdr, 0, sizeof(nladdr));

/* 清空netlink消息頭 */
nlhdr = (struct nlmsghdr *)malloc(NLMSG_SPACE(TEST_DATA_LEN));
memset(nlhdr, 0, NLMSG_SPACE(TEST_DATA_LEN));

/* 封裝netlink消息 */
iov.iov_base = (void *)nlhdr;					/* 接收緩存地址 */
iov.iov_len = NLMSG_LENGTH(TEST_DATA_LEN);;		/* 接收緩存大小 */
	
/* 填充數據消息結構 */
memset(&msg, 0, sizeof(msg));
msg.msg_name = (void *)&(nladdr);
msg.msg_namelen = sizeof(nladdr);				/* 地址長度由內核賦值 */
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

/* 接收netlink消息 */
recvmsg(sock_fd, &msg, 0);
本演示樣例程序同前文中的發送程序類似,須要有接收端組裝接收msg消息。

同發送流程的不同之處在於:

(1)msg.msg_name地址結構中存放的是消息源的地址信息,由內核負責填充。
(2)iov.iov_base為接收緩存的地址空間。其須要在接收前清空。
(3)iov.iov_len為單個iov接收緩存的長度,須要指明。
(4)msg.msg_namelen:為地址占用長度。有內核負責填充。
(5)msg.msg_iovlen:為接收iov空間的個數,須要指明。
這里用到了recvmsg系統調用,現進入該系統調用分析消息的整個接收的過程(須要注意的是,在不使用NETLINK_MMAP技術的情況下,整個接收的過程中存在1次數據的內存拷貝動作!

):


圖3 用戶態netlink接收流程
SYSCALL_DEFINE3(recvmsg, int, fd, struct user_msghdr __user *, msg,
		unsigned int, flags)
{
	if (flags & MSG_CMSG_COMPAT)
		return -EINVAL;
	return __sys_recvmsg(fd, msg, flags);
}
long __sys_recvmsg(int fd, struct user_msghdr __user *msg, unsigned flags)
{
	int fput_needed, err;
	struct msghdr msg_sys;
	struct socket *sock;

	sock = sockfd_lookup_light(fd, &err, &fput_needed);
	if (!sock)
		goto out;

	err = ___sys_recvmsg(sock, msg, &msg_sys, flags, 0);

	fput_light(sock->file, fput_needed);
out:
	return err;
}
同sendmsg系統調用類似,這里也相同首先通過fd描寫敘述符查找相應的套接字socket結構,然后調用___sys_recvmsg()運行實際的工作,這個函數比較長,分段分析:
static int ___sys_recvmsg(struct socket *sock, struct user_msghdr __user *msg,
			 struct msghdr *msg_sys, unsigned int flags, int nosec)
{
	struct compat_msghdr __user *msg_compat =
	    (struct compat_msghdr __user *)msg;
	struct iovec iovstack[UIO_FASTIOV];
	struct iovec *iov = iovstack;
	unsigned long cmsg_ptr;
	int total_len, len;
	ssize_t err;

	/* kernel mode address */
	struct sockaddr_storage addr;

	/* user mode address pointers */
	struct sockaddr __user *uaddr;
	int __user *uaddr_len = COMPAT_NAMELEN(msg);

	msg_sys->msg_name = &addr;
同sendmsg類似,這里相同定義了一個大小為8的iovstack數組緩存。用來加速消息處理;隨后獲取用戶空間的地址長度字段的地址。
	if (MSG_CMSG_COMPAT & flags)
		err = get_compat_msghdr(msg_sys, msg_compat, &uaddr, &iov);
	else
		err = copy_msghdr_from_user(msg_sys, msg, &uaddr, &iov);
	if (err < 0)
		return err;
	total_len = iov_iter_count(&msg_sys->msg_iter);

	cmsg_ptr = (unsigned long)msg_sys->msg_control;
	msg_sys->msg_flags = flags & (MSG_CMSG_CLOEXEC|MSG_CMSG_COMPAT);
這里接着調用copy_msghdr_from_user拷貝用戶態msg中的數據到內核態msg_sys中。

當然這里主要是為了接收內核的消息,用戶空間並沒有什么實際的數據,這里最基本的作用就是確定用戶須要接收多少數據量。

注意第三個參數已經不再是NULL了。而是指向了uaddr指針的地址。再次進入該函數具體分析一下:

static int copy_msghdr_from_user(struct msghdr *kmsg,
				 struct user_msghdr __user *umsg,
				 struct sockaddr __user **save_addr,
				 struct iovec **iov)
{
	struct sockaddr __user *uaddr;
	struct iovec __user *uiov;
	size_t nr_segs;
	ssize_t err;

	if (!access_ok(VERIFY_READ, umsg, sizeof(*umsg)) ||
<span style="color:#ff0000;">	    __get_user(uaddr, &umsg->msg_name) ||</span>
	    __get_user(kmsg->msg_namelen, &umsg->msg_namelen) ||
	    __get_user(uiov, &umsg->msg_iov) ||
	    __get_user(nr_segs, &umsg->msg_iovlen) ||
	    __get_user(kmsg->msg_control, &umsg->msg_control) ||
	    __get_user(kmsg->msg_controllen, &umsg->msg_controllen) ||
	    __get_user(kmsg->msg_flags, &umsg->msg_flags))
		return -EFAULT;

	if (!uaddr)
		kmsg->msg_namelen = 0;

	if (kmsg->msg_namelen < 0)
		return -EINVAL;

	if (kmsg->msg_namelen > sizeof(struct sockaddr_storage))
		kmsg->msg_namelen = sizeof(struct sockaddr_storage);

<span style="color:#ff0000;">	if (save_addr)
		*save_addr = uaddr;</span>

	if (uaddr && kmsg->msg_namelen) {
<span style="color:#ff0000;">		if (!save_addr) {
			err = move_addr_to_kernel(uaddr, kmsg->msg_namelen,
						  kmsg->msg_name);</span>
			if (err < 0)
				return err;
		}
	} else {
		kmsg->msg_name = NULL;
		kmsg->msg_namelen = 0;
	}

	if (nr_segs > UIO_MAXIOV)
		return -EMSGSIZE;

	kmsg->msg_iocb = NULL;

	return import_iovec(<span style="color:#ff0000;">save_addr ? READ : WRITE</span>, uiov, nr_segs,
			    UIO_FASTIOV, iov, &kmsg->msg_iter);
}
注意到當中加紅的這幾行。當中傳入的uaddr指針被指向了用戶空間msg->msg_name地址處,然后內核也不再會調用move_addr_to_kernel將用戶空間的消息地址字段復制到內核空間了(由於根本不是必需了),然后以READ的方式調用import_iovec()函數。它會檢查用戶空間的消息數據地址能否夠寫入,然后依據用戶須要接收的msg_iovlen長度封裝kmsg->msg_iter結構。再回到___sys_recvmsg()函數中保存接收緩存的總長度到total_len中,然后設置flag標識。
	/* We assume all kernel code knows the size of sockaddr_storage */
	msg_sys->msg_namelen = 0;

	if (sock->file->f_flags & O_NONBLOCK)
		flags |= MSG_DONTWAIT;
	err = (nosec ? sock_recvmsg_nosec : sock_recvmsg)(sock, msg_sys,
							  total_len, flags);
這里將地址的長度字段清零,不使用用戶空間傳入的(這里假定內核知道地址的長度),然后調用依據nosec的值是否為0而調用sock_recvmsg_nosec()或sock_recvmsg()函數接收數據。nosec在recvmsg系統調用傳入的為0,在recvmmsg系統可以調用接收多個消息時傳入已經接受的消息個數。
同發送的sendmsg()和sendmmsg()兩個系統調用一樣,這樣設計也是為了加速消息接收。recvmmsg()就是sock_recvmsg_nosec()的一個封裝而已,僅僅只是會添加security檢查:
int sock_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
		 int flags)
{
	int err = security_socket_recvmsg(sock, msg, size, flags);

	return err ?: sock_recvmsg_nosec(sock, msg, size, flags);
}
EXPORT_SYMBOL(sock_recvmsg);
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
				     size_t size, int flags)
{
	return sock->ops->recvmsg(sock, msg, size, flags);
}
這里調用了接收套接字所在協議的recvmsg接收鈎子函數,對於netlink就是netlink_recvmsg()函數:
static int netlink_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
			   int flags)
{
	struct scm_cookie scm;
	struct sock *sk = sock->sk;
	struct netlink_sock *nlk = nlk_sk(sk);
	int noblock = flags&MSG_DONTWAIT;
	size_t copied;
	struct sk_buff *skb, *data_skb;
	int err, ret;

	if (flags&MSG_OOB)
		return -EOPNOTSUPP;

	copied = 0;

	skb = skb_recv_datagram(sk, flags, noblock, &err);
	if (skb == NULL)
		goto out;

	data_skb = skb;
這里首先調用skb_recv_datagram()從接收socket的緩存中接收消息並通過skb返回。假設設置了MSG_DONTWAIT則在接收隊列中沒有消息時馬上返回。否則會堵塞等待。進入該函數具體分析:
struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned int flags,
				  int noblock, int *err)
{
	int peeked, off = 0;

	return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
				   &peeked, &off, err);
}
EXPORT_SYMBOL(skb_recv_datagram);
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags,
				    int *peeked, int *off, int *err)
{
	struct sk_buff_head *queue = &sk->sk_receive_queue;
	struct sk_buff *skb, *last;
	unsigned long cpu_flags;
	long timeo;
	/*
	 * Caller is allowed not to check sk->sk_err before skb_recv_datagram()
	 */
	int error = sock_error(sk);

	if (error)
		goto no_packet;

	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

	do {
		/* Again only user level code calls this function, so nothing
		 * interrupt level will suddenly eat the receive_queue.
		 *
		 * Look at current nfs client by the way...
		 * However, this function was correct in any case. 8)
		 */
		int _off = *off;

		last = (struct sk_buff *)queue;
		spin_lock_irqsave(&queue->lock, cpu_flags);
		skb_queue_walk(queue, skb) {
			last = skb;
			*peeked = skb->peeked;
			if (flags & MSG_PEEK) {
				if (_off >= skb->len && (skb->len || _off ||
							 skb->peeked)) {
					_off -= skb->len;
					continue;
				}

				skb = skb_set_peeked(skb);
				error = PTR_ERR(skb);
				if (IS_ERR(skb))
					goto unlock_err;

				atomic_inc(&skb->users);
			} else
				__skb_unlink(skb, queue);

			spin_unlock_irqrestore(&queue->lock, cpu_flags);
			*off = _off;
			return skb;
		}
		spin_unlock_irqrestore(&queue->lock, cpu_flags);

		if (sk_can_busy_loop(sk) &&
		    sk_busy_loop(sk, flags & MSG_DONTWAIT))
			continue;

		/* User doesn't want to wait */
		error = -EAGAIN;
		if (!timeo)
			goto no_packet;

	} while (!wait_for_more_packets(sk, err, &timeo, last));

	return NULL;

unlock_err:
	spin_unlock_irqrestore(&queue->lock, cpu_flags);
no_packet:
	*err = error;
	return NULL;
}
首先獲取socket的接收隊列指針到保存到queue變量中,然后獲取等待時長sk->sk_rcvtimeo(它在socket初始化時被設置為MAX_SCHEDULE_TIMEOUT,也可通過set_socketopt改動)。接下來進入一個do while()循環等待從接收緩存中獲取數據。
首先假定當前接收隊列中已經有數據了。這時將隊列上鎖后從隊列中取出一個skb包。然后推斷是否設置了MSG_PEEK標識符(假設已經設置了表明僅獲取該skb包可是不從接收隊列中刪除)。若設置了則調用skb_set_peeked()函數skb_clone出一個skb消息包返回,否則直接調用__skb_unlink將本次取出的skb包從列表中刪除然后返回。
然后再假定當前接收隊列中沒有消息存在,這里會依據是否設置了CONFIG_NET_RX_BUSY_POLL內核選項運行兩種等待方案。若設置了改選項。則會嘗試使用一種忙等待的方案(類似於設備驅動的napi),不用來讓當前運行流睡眠而切出,能夠加速數據的接收速度,它會調用sk_can_busy_loop()和sk_busy_loop()推斷能否夠使用該方案。對於沒有設置該選項。則使用傳統的方案,調用wait_for_more_packets()運行等待操作,它增加等待隊列會讓進程睡眠:
static int wait_for_more_packets(struct sock *sk, int *err, long *timeo_p,
				 const struct sk_buff *skb)
{
	int error;
	DEFINE_WAIT_FUNC(wait, receiver_wake_function);

	prepare_to_wait_exclusive(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);

	......
	
	/* handle signals */
	if (signal_pending(current))
		goto interrupted;
		
	error = 0;
	*timeo_p = schedule_timeout(*timeo_p);
out:
	finish_wait(sk_sleep(sk), &wait);
	return error;
interrupted:
	error = sock_intr_errno(*timeo_p);	
	......
}
該函數會做一些必要的推斷。可是最核心的就是以上這幾行代碼。首先將進程設置為INTERRUPTIBLE狀態,若沒有消息到達或者接收到信號它會一直睡眠(超時時間timeo可設,一般為默認值MAX_SCHEDULE_TIMEOU的情況下會轉而調用schedule())在schedule_timeout()函數上。在前文中已經看到,在內核netlink發送流程的最后將數據送到接收隊列后會喚醒該等待隊列,進程喚醒后返回到__skb_recv_datagram的大循環中去接收消息,若是被信號打斷了,那大循環中無法獲取消息。又會回到這里運行信號退出動作。
在從接收隊列中獲取了一個skb以后,我們回到netlink_recvmsg()函數中繼續往下分析:
	/* Record the max length of recvmsg() calls for future allocations */
	nlk->max_recvmsg_len = max(nlk->max_recvmsg_len, len);
	nlk->max_recvmsg_len = min_t(size_t, nlk->max_recvmsg_len,
				     16384);

	copied = data_skb->len;
	if (len < copied) {
		msg->msg_flags |= MSG_TRUNC;
		copied = len;
	}

	skb_reset_transport_header(data_skb);
	err = skb_copy_datagram_msg(data_skb, 0, msg, copied);
這里更新了最長的的接收數據長度。然后推斷假設獲取到的skb數據長度大於大於本次接收緩存的最大長度。則設置MSG_TRUNC標識,並將本次須要接收數據量設置為接收緩存的長度。


接着調用skb_copy_datagram_msg()函數將skb中的實際數據復制到msg消息中(這里進行了一次數據拷貝動作,將skb中的數據直接復制到msg指向的用戶空間地址處)。

	if (msg->msg_name) {
		DECLARE_SOCKADDR(struct sockaddr_nl *, addr, msg->msg_name);
		addr->nl_family = AF_NETLINK;
		addr->nl_pad    = 0;
		addr->nl_pid	= NETLINK_CB(skb).portid;
		addr->nl_groups	= netlink_group_mask(NETLINK_CB(skb).dst_group);
		msg->msg_namelen = sizeof(*addr);
	}
在拷貝完畢后這里開始初始化地址結構,這里將family這是為AF_NETLINK地址族,然后設置portid號為保存在原端skb擴展cb字段中的portid,對於這里接收內核發送的skb消息來說本字段為0,然后設置組播地址,該值在前文中內核調用nlmsg_multicast()發送組播消息時設置(對於單播來說就為0),netlink_group_mask()函數將組播地址的位號轉換為實際的組播地址(mask),然后這是msg的地址長度為nl_addr的長度。
	if (nlk->flags & NETLINK_RECV_PKTINFO)
		netlink_cmsg_recv_pktinfo(msg, skb);

	memset(&scm, 0, sizeof(scm));
	scm.creds = *NETLINK_CREDS(skb);
	if (flags & MSG_TRUNC)
		copied = data_skb->len;

	skb_free_datagram(sk, skb);

	if (nlk->cb_running &&
	    atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf / 2) {
		ret = netlink_dump(sk);
		if (ret) {
			sk->sk_err = -ret;
			sk->sk_error_report(sk);
		}
	}

	scm_recv(sock, msg, &scm, flags);
out:
	netlink_rcv_wake(sk);
	return err ? : copied;
}
這里假設設置了NETLINK_RECV_PKTINFO標識則將輔助消息頭復制到用戶空間。接着推斷是否設置了MSG_TRUNC標識。假設設置了就又一次設置copied為本次取出的skb中獲取數據的長度(特別注意!

)。然后調用skb_free_datagram()釋放skb消息包。最后在返回接收數據長度。

再回到___sys_recvmsg()函數中繼續往下分析:

	err = (nosec ? sock_recvmsg_nosec : sock_recvmsg)(sock, msg_sys,
							  total_len, flags);
	if (err < 0)
		goto out_freeiov;
	len = err;

	if (uaddr != NULL) {
		err = move_addr_to_user(&addr,
					msg_sys->msg_namelen, uaddr,
					uaddr_len);
		if (err < 0)
			goto out_freeiov;
	}
這里len保存了接收到數據的長度。然后將消息地址信息從內核空間復制到用戶空間。
	err = __put_user((msg_sys->msg_flags & ~MSG_CMSG_COMPAT),
			 COMPAT_FLAGS(msg));
	if (err)
		goto out_freeiov;
	if (MSG_CMSG_COMPAT & flags)
		err = __put_user((unsigned long)msg_sys->msg_control - cmsg_ptr,
				 &msg_compat->msg_controllen);
	else
		err = __put_user((unsigned long)msg_sys->msg_control - cmsg_ptr,
				 &msg->msg_controllen);
	if (err)
		goto out_freeiov;
	err = len;

out_freeiov:
	kfree(iov);
	return err;
}
最后將flag、消息輔助數據等拷貝到用戶空間,至此recvmsg系統調用就向上返回了,應用層也能夠使用獲取到的數據了。

應用層接收netlink消息流程結束。


五、總結

本文和前一篇《Netlink 內核實現分析(一):創建》分析了內核與用戶態netlink的創建、綁定及通信流程。netlink機制可以有效的利用了socket通信機制,實現了內核態與用戶態之間的全雙工通信,此處盡管分析的是netlink的實現方式,可是當中都是使用的Linux標准套接字系統調用接口,其運行的流程對於其它協議類型的套接字也是一樣的。最后。本文僅對netlink的通用收發流程進行了分析,對於不同協議類型的netlink擁有自己特別的消息格式與處理方式。本文沒有詳細分析。

后面會對netlink協議中通用的Genetlink通信協議及通信流程做詳細分析並給出通信演示樣例程序。




















免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM