【原創】xenomai內核解析--xenomai與普通linux進程之間通訊XDDP(三)--實時與非實時數據交互


前面兩篇文章我們看了xddp在xenomai內核里涉及的數據結構、RTDM對於協議類實時設備的管理方式,以及實時端創建一個XDDP通道后(xddp必須由實時端來創建),實時端與非實時端是如何聯系起來的,本文從linux端打開創建好的xddp通道開始,來詳細看整個通訊過程。

1.概述

【原創】實時IPC概述
【原創】xenomai與普通linux進程之間通訊XDDP(一)--實時端socket創建流程
【原創】xenomai與普通linux進程之間通訊XDDP(二)--實時與非實時關聯(bind流程)

前面兩篇文章我們看了xddp在xenomai內核里涉及的數據結構、RTDM對於協議類實時設備的管理方式,以及實時端創建一個XDDP通道后(xddp必須由實時端來創建),實時端與非實時端是如何聯系起來的。

rtipc-arch

筆者在分析源碼完后發現,XDDP過程就是這么個過程,下面的總結已經足夠概括,所以先放總結,個人分析源碼過程的粗糙記錄可看后面部分。

以上工作做好后,下面可以進行數據交互了,本文從linux端打開創建好的xddp通道開始,來詳細看整個通訊過程。

  1. 實時端創建xddp socket,通過bind指定socket使用的端口號,或者給socket設置一個label,端口號自動分配。實時與非實時通過socke使用的端口號來關聯,在linux端,端口號即xnpip設備的次設備號。

  2. 通過指定端口通訊時,linux通過直接讀寫xnpipe設備(/dev/rtpN,N為端口號)來通訊。使用label時,由於實時端端口號為自動分配,所以只能linux端只能通過讀寫文件/proc/xenomai/registry/rtipc/xddp/%s來通訊,%s為通訊使用的label。

  3. 非實時向實時端發送數據:通訊過程中,由於xnpipe可看做一個全雙工設備,有兩個數據鏈表,命名以實時端為主,inq表示接收數據報鏈表(NRT->RT),outq為發送數據報鏈表(RT->NRT)。對於linux端,每次發送的數據都作為一個數據報節點插入到鏈表inq尾,實時端讀取時從鏈表頭取數據,符合FIFO。

  4. 實時向非實時發送數據,分三種數據:

    • 不帶標識的數據包會作為一個單獨的數據報節點插入鏈表outq尾。
    • 使用MSG_OOB標識時,表示這是一個緊急的數據,需要優先被linux端讀取,這時會作為一個單獨的數據報節點插入鏈表outq。liunx端讀取時從鏈表頭取數據,所以除MSG_OOB標識的數據外,符合FIFO。
    • 使用MSG_MORE標識時,表示還有數據要與該數據一起發送,暫時不作為單獨數據包發送(不放到outq),先積累到數據緩沖區,待緩沖區滿或者發送的數據沒有MSG_MORE時,將整個緩沖區作為一個大的數據包插入鏈表outq尾。

    整個XDDP使用過程中:

    1. 建立xddp通道時,所有數據結構需要的內存均已申請。數據收發過程中,數據交互使用的內存從xnheap申請釋放,同步、互斥、喚醒使用的是xenomai內核機制,所以整個通訊由xenomai內核管理,保證了xenomai的實時性;
    2. 對於linux向xenomai發送的數據,xenomai任務在xenomai的調度下能很快讀取,看任務具體優先級等。
    3. 對於xenomai發送給linux的數據,如果非實時任務阻塞讀,會使用ipip虛擬中斷機制APC來通知linux喚醒該任務,待linux得到cpu時,自會處理虛擬中斷APC,喚醒接收的非實時任務處理數據,整體框圖如下。

xddp_global

粗糙記錄見下文,按需查看。

2.linux端設備節點創建

前面說到,通過指定端口通訊時,linux通過直接讀寫xnpipe設備(/dev/rtpN,N為端口號)來通訊。使用label時,由於實時端端口號為自動分配,所以只能linux端只能通過讀寫文件/proc/xenomai/registry/rtipc/xddp/%s來通訊,%s為通訊使用的label。

xnpipe設備(/dev/rtpN,N為端口號),創建流程如下:

前面說到注冊XDDP字符設備的時候,會調用 register_chrdev_region, 然后,cdev_add 會將這個字符設備添加到內核中一個叫作 struct kobj_map *cdev_map 的結構,來統一管理所有字符設備。用戶空間的udev守護進程會完成設備文件的創建。udev守護進程會完成設備文件的創建通過mknod系統調用來完成,mknod系統調用定義如下:

/*fs\namei.c*/
SYSCALL_DEFINE3(mknod, const char __user *, filename, umode_t, mode, unsigned, dev)
{
  return sys_mknodat(AT_FDCWD, filename, mode, dev);
}


SYSCALL_DEFINE4(mknodat, int, dfd, const char __user *, filename, umode_t, mode,
    unsigned, dev)
{
  struct dentry *dentry;
  struct path path;
......
  dentry = user_path_create(dfd, filename, &path, lookup_flags);
......
  switch (mode & S_IFMT) {
......
    case S_IFCHR: case S_IFBLK:
      error = vfs_mknod(path.dentry->d_inode,dentry,mode,
          new_decode_dev(dev));
      break;
......
  }
}

可以在這個系統調用里看到,在文件系統上,順着路徑找到 /dev/xxx 所在的文件夾,然后為這個新創建的設備文件創建一個 dentry。這是維護文件和 inode 之間的關聯關系的結構。接下來,如果是字符文件 S_IFCHR 或者設備文件 S_IFBLK,我們就調用 vfs_mknod。


int vfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, dev_t dev)
{
......
  error = dir->i_op->mknod(dir, dentry, mode, dev);
......
}

這里需要調用對應的文件系統的 inode_operations。應該調用哪個文件系統呢?如果我們在 linux 下面執行 mount 命令,能看到下面這一行:

devtmpfs on /dev type devtmpfs (rw,nosuid,size=3989584k,nr_inodes=997396,mode=755)

也就是說,/dev 下面的文件系統的名稱為 devtmpfs,可以在內核中找到它。


static struct dentry *dev_mount(struct file_system_type *fs_type, int flags,
          const char *dev_name, void *data)
{
#ifdef CONFIG_TMPFS
  return mount_single(fs_type, flags, data, shmem_fill_super);
#else
  return mount_single(fs_type, flags, data, ramfs_fill_super);
#endif
}


static struct file_system_type dev_fs_type = {
  .name = "devtmpfs",
  .mount = dev_mount,
  .kill_sb = kill_litter_super,
};

從這里可以看出,devtmpfs 在掛載的時候,有兩種模式,一種是 ramfs,一種是 shmem 都是基於內存的文件系統。

static const struct inode_operations ramfs_dir_inode_operations = {
......
  .mknod    = ramfs_mknod,
};


static const struct inode_operations shmem_dir_inode_operations = {
#ifdef CONFIG_TMPFS
......
  .mknod    = shmem_mknod,
};

這兩個 mknod 雖然實現不同,但是都會調用到同一個函數 init_special_inode。

void init_special_inode(struct inode *inode, umode_t mode, dev_t rdev)
{
  inode->i_mode = mode;
  if (S_ISCHR(mode)) {
    inode->i_fop = &def_chr_fops;
    inode->i_rdev = rdev;
  } else if (S_ISBLK(mode)) {
    inode->i_fop = &def_blk_fops;
    inode->i_rdev = rdev;
  } else if (S_ISFIFO(mode))
    inode->i_fop = &pipefifo_fops;
  else if (S_ISSOCK(mode))
    ;  /* leave it no_open_fops */
}

顯然這個文件是個特殊文件,inode 也是特殊的。這里這個 inode 可以關聯字符設備、塊設備、FIFO 文件、Socket 等。這里只看字符設備。

這里的 inode 的 file_operations 指向一個 def_chr_fops,這里面只有一個 open,就等着你打開它。另外,inode 的 i_rdev 指向這個設備的 dev_t。

通過這個 dev_t,可以找到我們剛在加載的字符設備 cdev。


const struct file_operations def_chr_fops = {
  .open = chrdev_open,
};

到目前為止,只是創建了 /dev 下面的一個文件,並且和相應的設備號關聯起來。但是,我們還沒有打開這個 /dev 下面的設備文件。

3.linux端打開設備

現在我們來打開它。打開文件的進程的 task_struct 里,有一個數組代表它打開的文件,下標就是文件描述符 fd,每一個打開的文件都有一個 struct file 結構,會指向一個 dentry 項。dentry 可以用來關聯 inode。

這個 dentry 就是上面 mknod 的時候創建的。在進程里面調用 open 函數,最終會調用到這個特殊的 inode 的 open 函數,也就是 chrdev_open。


static int chrdev_open(struct inode *inode, struct file *filp)
{
  const struct file_operations *fops;
  struct cdev *p;
  struct cdev *new = NULL;
  int ret = 0;


  p = inode->i_cdev;
  if (!p) {
    struct kobject *kobj;
    int idx;
    kobj = kobj_lookup(cdev_map, inode->i_rdev, &idx);
    new = container_of(kobj, struct cdev, kobj);
    p = inode->i_cdev;
    if (!p) {
      inode->i_cdev = p = new;
      list_add(&inode->i_devices, &p->list);
      new = NULL;
    } 
  } 
......
  fops = fops_get(p->ops);
......
  replace_fops(filp, fops);
  if (filp->f_op->open) {
    ret = filp->f_op->open(inode, filp);
......
  }
......
}

在這個函數里面,首先看這個 inode 的 i_cdev,是否已經關聯到 cdev。如果第一次打開,沒有沒關聯,inode 里面有 i_rdev ,也就是有 dev_t。可以通過它在 cdev_map 中找 cdev。因為我們上面注冊過了,所以能夠找到。找到后就將 inode 的 i_cdev,關聯到找到的 cdev new。

找到 cdev 后。cdev 里面有 file_operations,這是設備驅動程序自己定義的。可以通過它來操作設備驅動程序,把它付給 struct file 里面的 file_operations。這樣以后操作文件描述符,就是直接操作設備了。

最后,我們需要調用設備驅動程序的 file_operations 的 open 函數,真正打開設備。

對於XDDP,最終會調用到 xnpipe_open

4. 實時端發送

RT應用使用來向send/sendto/sendmsg NRT發送消息。

     char *msg="hello world!";
     int len=strlen(msg);
	for (b = 0; b < len; b++) {
            /*MSG_MORE表示:一字節一字節的將數據存到緩沖區*/
            ret = sendto(s, msg + b, 1, MSG_MORE, NULL, 0);
            .....
    }
/*如果不使用MSG_MORE,每個字母將作為一個數據包。Linux端段每次讀取只能讀取到一個字母,且符合FIFO*/
    ret = sendto(s, msg, len, 0, NULL, 0);
    ....
    /*使用MSG_OOB 發送高優先級數據*/
    ret = sendmsg(s, msg[n] + b, 1, MSG_OOB, NULL, 0);
  	....

示例中,MSG_MORE表示:將每次發送數據累積存到緩沖區作為一整個包,否則每次發送作為一個單獨的數據包且,符合FIFO;MSG_OOB表示該數據包優先級高,優先被nrt任務讀取(插隊);

同樣,這幾個函數在libcobalt中實現:

/*庫函數:xenomai-3.x.x\lib\cobalt\rtdm.c*/
COBALT_IMPL(ssize_t, sendto, (int fd, const void *buf, size_t len, int flags,
			      const struct sockaddr *to, socklen_t tolen))
{								
	struct iovec iov = { 
		.iov_base = (void *)buf,
		.iov_len = len,
	};
	struct msghdr msg = {
		.msg_name = (struct sockaddr *)to,
		.msg_namelen = tolen,
		.msg_iov = &iov,
		.msg_iovlen = 1,  /*msg_iov個數*/
		.msg_control = NULL,
		.msg_controllen = 0,
	};
	int ret;

	ret = do_sendmsg(fd, &msg, flags);
	if (ret != -EBADF && ret != -ENOSYS)
		return set_errno(ret);
	
	return __STD(sendto(fd, buf, len, flags, to, tolen));
}
COBALT_IMPL(ssize_t, sendmsg, (int fd, const struct msghdr *msg, int flags))
{
	int ret;

	ret = do_sendmsg(fd, msg, flags);
	if (ret != -EBADF && ret != -ENOSYS)
		return set_errno(ret);

	return __STD(sendmsg(fd, msg, flags));
}
COBALT_IMPL(ssize_t, send, (int fd, const void *buf, size_t len, int flags))
{
	struct iovec iov = {
		.iov_base = (void *)buf,
		.iov_len = len,
	};
	struct msghdr msg = {
		.msg_name = NULL,
		.msg_namelen = 0,
		.msg_iov = &iov,
		.msg_iovlen = 1,
		.msg_control = NULL,
		.msg_controllen = 0,
	};
	int ret;

	ret = do_sendmsg(fd, &msg, flags);
	if (ret != -EBADF && ret != -ENOSYS)
		return set_errno(ret);

	return __STD(send(fd, buf, len, flags));
}	

它們都是socket發送數據的接口,區別在於sendmsg()使用struct msghdr,msg->msg_iovlen內保存着數據包數量,能一次性發送多條數據。sendto()與send()需要構造包含一個數據包的struct msghdr ,然后再調用do_sendmsg()發起實時系統調用發送。

static ssize_t do_sendmsg(int fd, const struct msghdr *msg, int flags)
{
	int ret, oldtype;
	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
	ret = XENOMAI_SYSCALL3(sc_cobalt_sendmsg, fd, msg, flags);
	pthread_setcanceltype(oldtype, NULL);

	return ret;
}

實時系統調用sc_cobalt_sendmsxg先將將用戶空間數據拷貝到內核空間,然后調用rtdm_fd_sendmsg()如下:

COBALT_SYSCALL(sendmsg, handover,
	       (int fd, struct user_msghdr __user *umsg, int flags))
{
	struct user_msghdr m;
	int ret;
	ret = cobalt_copy_from_user(&m, umsg, sizeof(m)); /*拷貝到內核空間*/
	return ret ?: rtdm_fd_sendmsg(fd, &m, flags);
}
ssize_t rtdm_fd_sendmsg(int ufd, const struct user_msghdr *msg, int flags)
{
	struct rtdm_fd *fd;
	ssize_t ret;

	fd = get_fd_fixup_mode(ufd);
	......
	if (fd->oflags & O_NONBLOCK)
		flags |= MSG_DONTWAIT;

	if (ipipe_root_p)
		ret = fd->ops->sendmsg_nrt(fd, msg, flags);
	else
		ret = fd->ops->sendmsg_rt(fd, msg, flags);//xddp_sendmsg
	......
	return ret;
}

與前面解析一樣,根據ufd取出rtdm_fd,然后調用xddp數據發送函數xddp_sendmsg()。

static ssize_t xddp_sendmsg(struct rtdm_fd *fd,
			    const struct user_msghdr *msg, int flags)
{
	struct rtipc_private *priv = rtdm_fd_to_private(fd);
	struct iovec iov_fast[RTDM_IOV_FASTMAX], *iov;
	struct xddp_socket *sk = priv->state;
	struct sockaddr_ipc daddr;//目標地址
	ssize_t ret;
	/*不能同時使用MSG_MORE、MSG_OOB標志*/
	if ((flags & (MSG_MORE | MSG_OOB)) == (MSG_MORE | MSG_OOB))
		return -EINVAL;

	if (msg->msg_name) {
		if (msg->msg_namelen != sizeof(struct sockaddr_ipc))
			return -EINVAL;

		/* Fetch the destination address to send to. */
		if (rtipc_get_arg(fd, &daddr, msg->msg_name, sizeof(daddr)))
			return -EFAULT;

		if (daddr.sipc_port < 0 ||
		    daddr.sipc_port >= CONFIG_XENO_OPT_PIPE_NRDEV)
			return -EINVAL;
	} else {
		......
		daddr = sk->peer;
		.....
	}

	if (msg->msg_iovlen >= UIO_MAXIOV)
		return -EINVAL;

	/* Copy I/O vector in */
	ret = rtdm_get_iovec(fd, &iov, msg, iov_fast);
	......

	ret = __xddp_sendmsg(fd, iov, msg->msg_iovlen, flags, &daddr);/*發送iovlen個iov*/
	......

	/* Copy updated I/O vector back */
	return rtdm_put_iovec(fd, iov, msg, iov_fast) ?: ret;
}

先檢查參數,不能同時使用MSG_MORE、MSG_OOB標志,看struct user_msghdr有沒有包含目標地址,如果沒有則使用sk->peer,檢查本次發送的數據包個數,如果大於1024則返回錯誤。

接着調rtdm_get_iovec()處理數據包,將每條數包信息保存到struct iovec iov_fast[16],如果數據條數大於16條,則不使用iov_fast[16],會向xenomai系統內存池Cobalt_heap分配更大的內存來保存這些信息。保存每條數據信息的struct iovec 如下。

struct iovec
{
	void __user *iov_base;	/* BSD uses caddr_t (1003.1g requires void *) */
	__kernel_size_t iov_len; /* Must be size_t (1003.1g) */
};

其中iov_base保存着數據在用戶空間的地址,iov_len為數據的長度。

得到本次發送每條數據的信息后,調用__xddp_sendmsg()進行發送,參數分別是執行發送的rtdm_fd,存儲每條數據信息的iov指針,iovlen表示本次發送數據條數,flags發送flags,daddr目標地址信息。

static ssize_t __xddp_sendmsg(struct rtdm_fd *fd,
			      struct iovec *iov, int iovlen, int flags,
			      const struct sockaddr_ipc *daddr)
{
    struct rtipc_private *priv = rtdm_fd_to_private(fd);
	ssize_t len, rdlen, wrlen, vlen, ret, sublen;
	struct xddp_socket *sk = priv->state;
	struct xddp_message *mbuf;
	struct xddp_socket *rsk;
	struct rtdm_fd *rfd;
	int nvec, to, from;
	struct xnbufd bufd;
	rtdm_lockctx_t s;
    ....

	len = rtdm_get_iov_flatlen(iov, iovlen);
	.....

	from = sk->name.sipc_port;
	to = daddr->sipc_port;
	.....
	rfd = portmap[to];
	.....
    rsk = rtipc_fd_to_state(rfd);
	.....
    if (flags & MSG_MORE) {
        /*數據累積到緩沖區rsk->buffer*/
        goto done;
    }
    mbuf = xnheap_alloc(rsk->bufpool, sublen + sizeof(*mbuf));
	.....

	/*
	 * Move "sublen" bytes to mbuf->data from the vector cells
	 */
	for (rdlen = sublen, wrlen = 0; nvec < iovlen && rdlen > 0; nvec++) {
		/*處理每條數據,構造xnpipe數據包xddp_message*/
	}
	/*發送*/
	ret = xnpipe_send(rsk->minor, &mbuf->mh,
			  sublen + sizeof(*mbuf),
			  (flags & MSG_OOB) ?
			  XNPIPE_URGENT : XNPIPE_NORMAL);
    done:
	rtdm_fd_unlock(rfd);

	return len;
}

__xddp_sendmsg()先調用rtdm_get_iov_flatlen()計算本次要發送的數據總長度len,取出源端口from,目標端口to,根據目標端口to從端口rtdm_fd映射表中取出接收端的rtdm_fd結構指針保存到rfd,再根據rfd得到對應的xddp對象xddp_socket,記得前面我們解析過,緩沖區是由xddp_socket管理着的,所以這里先判斷本次發送有沒有標志MSG_MORE,有的話那數據不能直接使用xnpipe發送出去,需要先在緩沖區中緩存。如果沒有標志MSG_MORE,則構造xnpipe數據包,然后再調用xnpipe_send()執行發送操作。

該流程使用一個示例來解釋,假如xenomai任務通過sendmsg()發送data1、data2兩條數據,且設置了falsg MSG_MORE,xddp緩沖區大小設置為130Byte,此時緩沖區內沒有任何數據。解析到此如下所示:

將數據積累到數據緩沖區處理代碼如下。

static ssize_t __xddp_sendmsg(struct rtdm_fd *fd,
			      struct iovec *iov, int iovlen, int flags,
			      const struct sockaddr_ipc *daddr)
{
.......	
sublen = len;
	nvec = 0;
	if (flags & MSG_MORE) {
		for (rdlen = sublen, wrlen = 0;
		     nvec < iovlen && rdlen > 0; nvec++) {
			if (iov[nvec].iov_len == 0)
				continue;
			vlen = rdlen >= iov[nvec].iov_len ? iov[nvec].iov_len : rdlen;
			if (rtdm_fd_is_user(fd)) {/*用戶空間程序使用的rtdm_fd*/
				xnbufd_map_uread(&bufd, iov[nvec].iov_base, vlen);
				ret = __xddp_stream(rsk, from, &bufd);
				xnbufd_unmap_uread(&bufd);
			} else {
				xnbufd_map_kread(&bufd, iov[nvec].iov_base, vlen);
				ret = __xddp_stream(rsk, from, &bufd);
				xnbufd_unmap_kread(&bufd);
			}
			if (ret < 0)
				goto fail_unlock;
			wrlen += ret;
			rdlen -= ret;
			iov[nvec].iov_base += ret;
			iov[nvec].iov_len -= ret;
			/*
			 * In case of a short write to the streaming
			 * buffer, send the unsent part as a
			 * standalone datagram.
			 */
			if (ret < vlen) {/*緩沖區已滿將剩余數據作為單獨包發送*/
				sublen = rdlen;
				goto nostream;
			}
		}
		len = wrlen;
		goto done;
	}
nostream:
   ..... /*單獨包發送*/
done:
	rtdm_fd_unlock(rfd);

	return len;
}

對數組iov[]里的每個長度非0數據,先調用xnbufd_map_kread()構造一個緩沖區描述符struct xnbufd,然后通過函數__xddp_stream()從用戶空間拷貝到緩沖區,並調整iov[nvec]里的信息,iov[nvec].iov_len減去已發送的數據信息。

struct xnbufd {
	caddr_t b_ptr;		/* 源/目標地址*/
	size_t b_len;		/*  buffer總長度 */
	off_t b_off;		/* 讀或寫字節數*/
	struct mm_struct *b_mm;	/*  源或目標地址空間 */
	caddr_t b_carry;	/* pointer to carry over area*/
	char b_buf[64];		/* fast carry over area */
};

對data1構造xnbufd后,調用__xddp_stream()將數據拷貝到緩沖區。

static ssize_t __xddp_stream(struct xddp_socket *sk,
			     int from, struct xnbufd *bufd)
{
	struct xddp_message *mbuf;
	size_t fillptr, rembytes;
	rtdm_lockctx_t s;
	ssize_t outbytes;
	int ret;

	if (sk->curbufsz == 0 ||
	    (sk->buffer_port >= 0 && sk->buffer_port != from)) {
		/*This will end up into a standalone datagram. */
		outbytes = 0;
		goto out;
	}

	mbuf = sk->buffer;
	rembytes = sk->curbufsz - sizeof(*mbuf) - sk->fillsz;
	outbytes = bufd->b_len > rembytes ? rembytes : bufd->b_len;
	if (likely(outbytes > 0)) {
	repeat:
		/* Mark the beginning of a should-be-atomic section. */
		__set_bit(_XDDP_ATOMIC, &sk->status);
		fillptr = sk->fillsz; 
		sk->fillsz += outbytes;

		rtdm_lock_put_irqrestore(&sk->lock, s);
		ret = xnbufd_copy_to_kmem(mbuf->data + fillptr,
					  bufd, outbytes);
		rtdm_lock_get_irqsave(&sk->lock, s);
		......
		/* We haven't been atomic, let's try again. */
		if (!__test_and_clear_bit(_XDDP_ATOMIC, &sk->status))
			goto repeat;

		if (__test_and_set_bit(_XDDP_SYNCWAIT, &sk->status))
			outbytes = xnpipe_mfixup(sk->minor,
						 &mbuf->mh, outbytes);
		else {
			sk->buffer_port = from;
			outbytes = xnpipe_send(sk->minor, &mbuf->mh,
					       outbytes + sizeof(*mbuf),
					       XNPIPE_NORMAL);
			if (outbytes > 0)
				outbytes -= sizeof(*mbuf);
		}
	}

out:
	rtdm_lock_put_irqrestore(&sk->lock, s);

	return outbytes;
}

先判斷緩沖區大小,如果curbufsz等於0說明沒有使用setsocketopt()為xddp設置緩沖區大小,則flag MSG_MORE無效,數據將作為一個單獨的包發送出去,直接跳轉至out。

然后計算緩沖區剩余大小rembytes,看剩余空間rembytes是否能存下本次數據,如果不能,緩沖區剩余空間多少就先拷貝多少,拷貝大小outbytes就是rembytes;反之,outbytes就是本次數據長度 bufd->b_len

將用戶空間數據拷貝到緩沖區之前,先計算緩沖區拷貝起始偏移量fillptr,拷貝后偏移量sk->fillsz(拷貝后緩沖區內數據的長度)。接着調用xnbufd_copy_to_kmem()進行數據拷貝。xnbufd_copy_to_kmem()根據bufd->b_mm判斷數據位於內核空間(mm==NULL)還是用戶空間,內核空間則直接拷貝,如果數據在用戶空間則使用cobalt_copy_from_user()拷貝。

完成數據拷貝至緩沖區后,sk->status置位bit _XDDP_SYNCWAIT並返回原來的值,如果原值為0,則發生了以下情況之一,需要將緩沖區數據全部發送出去,調用xnpipe_send()完成發送。

  • Linux域中接收器被喚醒接收數據,
  • 不同的源端口嘗試將數據發送到相同的目標端口,
  • 發送標志中沒有MSG_MORE,
  • 緩沖區已滿。

這里是第一次將數據保存到緩沖區,執行分支1:

if (__test_and_set_bit(_XDDP_SYNCWAIT, &sk->status))
    outbytes = xnpipe_mfixup(sk->minor,
                             &mbuf->mh, outbytes);/**/
else {
    ......
}

xnpipe_mfixup()取出xnpipe對象,更新緩沖區數據長度ionrd。

ssize_t xnpipe_mfixup(int minor, struct xnpipe_mh *mh, ssize_t size)
{
	struct xnpipe_state *state;
	spl_t s;
.......
	state = &xnpipe_states[minor];
	.......
	xnpipe_m_size(mh) += size;
	state->ionrd += size;

	xnlock_put_irqrestore(&nklock, s);

	return (ssize_t) size;
}

返回__xddp_sendmsg()接着處理我們的iov[1],此時data1已保存到緩沖區,iov[0].b_len 為0 下次循環就會跳過,各結構數據如下所示:

由於下面將data2保存到緩沖區,我們分配的緩沖區長度只有130Byte,目前data1占用了20Byte,緩沖區剩余空間110Byte,不能完全存下data2的數據。先將data2前110Byte數據存到緩沖區,剩余的20Byte數據作為單獨包發送出去。

看數據作為單獨包發送的流程:

static ssize_t __xddp_sendmsg(struct rtdm_fd *fd,
			      struct iovec *iov, int iovlen, int flags,
			      const struct sockaddr_ipc *daddr)
{
.......	
sublen = len;
	nvec = 0;
	if (flags & MSG_MORE) {	/*將數據累積到緩沖區*/
		.......
			if (ret < vlen) {/*緩沖區已滿將剩余數據作為單獨包發送*/
				sublen = rdlen;
				goto nostream;
			}
		}
		len = wrlen;
		goto done;
	}
nostream:
  /*單獨包發送*/
	mbuf = xnheap_alloc(rsk->bufpool, sublen + sizeof(*mbuf));/*從bufpool 分配*/
	......
	/*
	 * Move "sublen" bytes to mbuf->data from the vector cells
	 */
	for (rdlen = sublen, wrlen = 0; nvec < iovlen && rdlen > 0; nvec++) {
		if (iov[nvec].iov_len == 0)
			continue;
		vlen = rdlen >= iov[nvec].iov_len ? iov[nvec].iov_len : rdlen;
		if (rtdm_fd_is_user(fd)) {/*需要從用戶空間拷貝*/
			.....
			ret = xnbufd_copy_to_kmem(mbuf->data + wrlen, &bufd, vlen);
			.....
		} else {
			......
			ret = xnbufd_copy_to_kmem(mbuf->data + wrlen, &bufd, vlen);
			......
		}
		......
		iov[nvec].iov_base += vlen;
		iov[nvec].iov_len -= vlen;
		rdlen -= vlen;
		wrlen += vlen;
	}
	
	/*xnpipe發送*/
	ret = xnpipe_send(rsk->minor, &mbuf->mh,
			  sublen + sizeof(*mbuf),
			  (flags & MSG_OOB) ?
			  XNPIPE_URGENT : XNPIPE_NORMAL);
done:
	rtdm_fd_unlock(rfd);

	return len;
}

先從xddp_socket.bufpool指向的內存池中分配數據包大小的內存mbuf(本次待發送數據大小sublen+消息頭大小 sizeof(*mbuf)),mbuf與緩沖區ddp_socket.buffer不同,緩沖區是配置socket時分配的,數據區大小固定,只有一個,會伴隨socket的整個生命周期。而這個mbuf是動態分配的,有多少個數據包發送就會分配多少個mbuf,直到數據被linux端讀取后該內存就會被釋放。

分配一個xddp_message空間后,與前面一樣,將要發送的數據一個一個的拷貝到數據區,這里將data2剩余的20Byte數據拷貝到mbuf。

然后調用xnpipe_send()進行數據發送。

ssize_t xnpipe_send(int minor, struct xnpipe_mh *mh, size_t size, int flags)
{
	struct xnpipe_state *state;
	int need_sched = 0;
	spl_t s;
	......
	state = &xnpipe_states[minor];

	xnlock_get_irqsave(&nklock, s);
	......
	xnpipe_m_size(mh) = size - sizeof(*mh);/*該包數據區長度*/
	xnpipe_m_rdoff(mh) = 0;		/*該包已讀數據偏移*/
	state->ionrd += xnpipe_m_size(mh);/*更新鏈表數據總長度*/

	if (flags & XNPIPE_URGENT)/*高優先級數據,添加到隊列頭*/
		list_add(&mh->link, &state->outq);/*頭插*/
	else
		list_add_tail(&mh->link, &state->outq);/*低優先級數據添加到鏈表尾*/

	state->nroutq++;/*更新數據包數*/

	if ((state->status & XNPIPE_USER_CONN) == 0) {/**/
		xnlock_put_irqrestore(&nklock, s);
		return (ssize_t) size;
	}

	if (state->status & XNPIPE_USER_WREAD) {/*如果LInux此時等待讀*/
		/*
		 * Wake up the regular Linux task waiting for input
		 * from the Xenomai side.
		 * 喚醒常規的Linux任務,以等待Xenomai方面的輸入。
		 */
		state->status |= XNPIPE_USER_WREAD_READY;/*置位能讀標志*/
		need_sched = 1;
	}

	if (state->asyncq) {	/* Schedule asynch sig.調度異步信號。 */
		state->status |= XNPIPE_USER_SIGIO;
		need_sched = 1;
	}

	if (need_sched)
		xnpipe_schedule_request(); //xnpipe_wakeup_apc

	xnlock_put_irqrestore(&nklock, s);

	return (ssize_t) size;
}

根據minor,從xnpipe_states中該xddp使用的xnpipe對象xnpipe_state,初始化消息頭mh的數據區長度size、已讀偏移rdoff,當linux端應用來讀取數據時,該數據包內的數據沒有被完全讀取時,會更新該成員變量。更新待發送數據總長度ionrd。

如果該數據具有XNPIPE_URGENT標識,也就是用戶發送函數sendmsg使用了MSG_OOB,標識該數據包優先級高,需要優先被發送,mbuf就會插入鏈表頭,linux端讀取時就能優先讀取該數據。否則,mbuf插入鏈表尾。鏈表節點數state->nroutq +1。

如果此時linux沒有連接(沒有打開設備節點/dev/rtpX),就直接返回。否則根據state->status狀態判斷linux端是否等待讀數據,執行異步調用xnpipe_wakeup_apc,喚醒linux任務(apc工作原理查看ipipr虛擬中斷小節)。

如果xenomai端再使用MSG_OOB標識發送了一個高優先級數據data3,長度50Byte,由於是高優先數據,插入發送鏈表頭如下:

如果還沒有linux讀操作,xenomai端再使用MSG_OOB標識發送了一個高優先級數據data4,長度50Byte,由於是高優先數據,插入發送鏈表頭,data4就會變成最先被linux任務讀取的數據包。

5.linux端讀

對於讀字符設備,就是用文件系統的標准接口 read,參數文件描述符 fd,在內核里面調用的 sys_read,在 sys_read 里面根據文件描述符 fd 得到 struct file 結構。接下來再調用 vfs_write。最終會調用到xnpipe_read()

static ssize_t xnpipe_read(struct file *file,
			   char *buf, size_t count, loff_t *ppos)
{
	struct xnpipe_state *state = file->private_data;
	int sigpending, err = 0;
	size_t nbytes, inbytes;
	struct xnpipe_mh *mh;
	ssize_t ret;
	spl_t s;
    if (list_empty(&state->outq)) {
		if (file->f_flags & O_NONBLOCK) {  
			xnlock_put_irqrestore(&nklock, s);
			return -EWOULDBLOCK;
		}

		sigpending = xnpipe_wait(state, XNPIPE_USER_WREAD, s,
					 !list_empty(&state->outq));

	......
	}
    mh = list_get_entry(&state->outq, struct xnpipe_mh, link);
	state->nroutq--;
    
    inbytes = 0;
	for (;;) {
		nbytes = xnpipe_m_size(mh) - xnpipe_m_rdoff(mh);

		if (nbytes + inbytes > count)
			nbytes = count - inbytes;

		if (nbytes == 0)
			break;

		xnlock_put_irqrestore(&nklock, s);

		/* More data could be appended while doing this:
		*/
		err = __copy_to_user(buf + inbytes,
				     xnpipe_m_data(mh) + xnpipe_m_rdoff(mh),
				     nbytes);

		xnlock_get_irqsave(&nklock, s);
	......

		inbytes += nbytes;
		xnpipe_m_rdoff(mh) += nbytes;
	}
    state->ionrd -= inbytes;
	ret = inbytes;

	if (xnpipe_m_size(mh) > xnpipe_m_rdoff(mh)) {
		list_add(&mh->link, &state->outq);
		state->nroutq++;
	} else {
		if (state->ops.output)
			state->ops.output(mh, state->xstate);
		xnlock_put_irqrestore(&nklock, s);
		state->ops.free_obuf(mh, state->xstate); 
		xnlock_get_irqsave(&nklock, s);
		if (state->status & XNPIPE_USER_WSYNC) {
			state->status |= XNPIPE_USER_WSYNC_READY;
			xnpipe_schedule_request();
		}
	}
.........
}

先看xnpipe outq鏈表是否為空,如果為空且設置了非阻塞,直接返回無,否則阻塞到wait_queue_head_t readq上等待,並置位status為XNPIPE_USER_WREAD。

取出outq上的第一個節點,節點數-1,循環拷貝節點內數據到緩沖區,inbytes記錄着已拷貝到buf中的數據長度,nbytes表示該節點內未讀的數據長度

  • 如果數據包節點內數據不足要讀取的長度,那數據包內有多少數據就拷貝多少,例如需要讀取60Byte數據,但節點內未只有50Byte數據,那本次只能讀取50Byte。
  • 如果節點內的數據長度大於要讀取的長度,會更新節點內消息頭的已讀偏移量rdoff,將該節點重新插入鏈表outq,下次讀的時候從偏移量rdoff開始拷貝數據。

等節點內數據都被讀取后,如果xenomai端設置了回調函數ops.output()則執行ops.output(),然后執行state->ops.free_obuf()來釋放消息節點,釋放時對於非緩沖區節點,直接free;對於緩沖區節點,不需要釋放,清理統計量即可。

static void __xddp_free_handler(void *buf, void *skarg) /* nklock free */
{
	struct xddp_socket *sk = skarg;
	rtdm_lockctx_t s;

	if (buf != sk->buffer) {
		xnheap_free(sk->bufpool, buf);
		return;
	}
    /* Reset the streaming buffer. */
	rtdm_lock_get_irqsave(&sk->lock, s);

	sk->fillsz = 0;
	sk->buffer_port = -1;
	__clear_bit(_XDDP_SYNCWAIT, &sk->status);
	__clear_bit(_XDDP_ATOMIC, &sk->status);

	.....
}

接上面的圖示一次read()調用讀取50Byte數據后如下:

6.linux端寫

接下來就是對XDDP設備的讀寫。寫入一個字符設備,就是用文件系統的標准接口 write,參數文件描述符 fd,在內核里面調用的 sys_write,在 sys_write 里面根據文件描述符 fd 得到 struct file 結構。接下來再調用 vfs_write。


ssize_t __vfs_write(struct file *file, const char __user *p, size_t count, loff_t *pos)
{
  if (file->f_op->write)
    return file->f_op->write(file, p, count, pos);
  else if (file->f_op->write_iter)
    return new_sync_write(file, p, count, pos);
  else
    return -EINVAL;
}

可以看到,在 __vfs_write 里面,我們會調用 struct file 結構里的 file_operations 的 write 函數。上面我們打開字符設備的時候,已經將 struct file 結構里面的 file_operations 指向了設備驅動程序的 file_operations 結構,所以這里的 write 函數最終會調用到xnpipe_write()

static ssize_t xnpipe_write(struct file *file,
			    const char *buf, size_t count, loff_t *ppos)
{
	struct xnpipe_state *state = file->private_data;
	struct xnpipe_mh *mh;
	int pollnum, ret;
	spl_t s;
	......
retry:
	......
	pollnum = state->nrinq + state->nroutq;
	xnlock_put_irqrestore(&nklock, s);

	mh = state->ops.alloc_ibuf(count + sizeof(*mh), state->xstate);/*分配消息內存*/
	.......
	if (mh == NULL) {
		if (file->f_flags & O_NONBLOCK)
			return -EWOULDBLOCK;

		xnlock_get_irqsave(&nklock, s);
		if (xnpipe_wait(state, XNPIPE_USER_WSYNC, s,
				pollnum > state->nrinq + state->nroutq)) {
			xnlock_put_irqrestore(&nklock, s);
			return -ERESTARTSYS;
		}
		goto retry;
	}

	xnpipe_m_size(mh) = count;
	xnpipe_m_rdoff(mh) = 0;

	if (copy_from_user(xnpipe_m_data(mh), buf, count)) {
		....
	}

	xnlock_get_irqsave(&nklock, s);

	list_add_tail(&mh->link, &state->inq);
	state->nrinq++;

	/* Wake up a Xenomai sleeper if any. 。*/
	if (xnsynch_wakeup_one_sleeper(&state->synchbase))
		xnsched_run();

	if (state->ops.input) {
		ret = state->ops.input(mh, 0, state->xstate);//__xddp_input_handler
		if (ret)
			count = (size_t)ret;
	}

	if (file->f_flags & O_SYNC) {/*等待對方收*/
		if (!list_empty(&state->inq)) {
			if (xnpipe_wait(state, XNPIPE_USER_WSYNC, s,
					list_empty(&state->inq)))
				count = -ERESTARTSYS;
		}
	}

	xnlock_put_irqrestore(&nklock, s);

	return (ssize_t)count;
}

向xenomai發送任務與xenomai向linux發類似。先計算xnpipe讀和寫未決數據總長度pollnum,然后為要發送的數據申請節點空間,如果申請失敗表示xenomai內存池內存非配完了,此時,要么直接返回寫入失敗,要么調用xnpipe_wait阻塞到隊列syncq,解阻塞條件為pollnum > state->nrinq + state->nroutq,也就是xnpipe有未決數據被處理,節點就被釋放,嘗試再次分配內存,比如上面xnpipe_read()函數中60-62行就是該作用。

分配節點mh后,設置消息頭內數據長度mh.size,將用戶空間數據拷貝到消息節點數據區,接着將消息節點mh掛到input鏈表inq,由linux寫入的數據都是插入鏈表尾。更新input隊列數nrinq;

完成消息節點掛接后,如果有xenomai任務阻塞等待數據,則開始xenomai調度。

如果xddp socket設置有hook函數ops.input(),則執行ops.input()。

接着如果用戶設置了寫同步標志O_SYNC,則阻塞等待直到實時任務將數據讀取。

7. 實時端接收

xenomai任務接收數據,:

 /*接收數據*/
    ret = recvfrom(s, buf, sizeof(buf), 0, NULL, 0);
    if (ret <= 0)
        fail("recvfrom");

根據前面的分析一樣,先進入libcobalt:

COBALT_IMPL(ssize_t, recvfrom, (int fd, void *buf, size_t len, int flags,
				struct sockaddr *from, socklen_t *fromlen))
{
	struct iovec iov = {
		.iov_base = buf,
		.iov_len = len,
	};
	struct msghdr msg = {
		.msg_name = from,
		.msg_namelen = from != NULL ? *fromlen : 0,
		.msg_iov = &iov,
		.msg_iovlen = 1,
		.msg_control = NULL,
		.msg_controllen = 0,
	};
	int ret;

	ret = do_recvmsg(fd, &msg, flags);
	if (ret != -EBADF && ret != -ENOSYS) {
		......
		return ret;
	}

	return __STD(recvfrom(fd, buf, len, flags, from, fromlen));
}
COBALT_IMPL(ssize_t, recv, (int fd, void *buf, size_t len, int flags))
{
	struct iovec iov = {
		.iov_base = (void *)buf,
		.iov_len = len,
	};
	struct msghdr msg = {
		.msg_name = NULL,
		.msg_namelen = 0,
		.msg_iov = &iov,
		.msg_iovlen = 1,
		.msg_control = NULL,
		.msg_controllen = 0,
	};
	int ret;

	ret = do_recvmsg(fd, &msg, flags);
	if (ret != -EBADF && ret != -ENOSYS)
		return set_errno(ret);

	return __STD(recv(fd, buf, len, flags));
}

再由do_recvmsg()發起實時系統調用最終執行xddp_recvmsg().

static ssize_t xddp_recvmsg(struct rtdm_fd *fd,
			    struct user_msghdr *msg, int flags)
{
	struct iovec iov_fast[RTDM_IOV_FASTMAX], *iov;
	struct sockaddr_ipc saddr;
	ssize_t ret;
	......
	/* Copy I/O vector in */
	ret = rtdm_get_iovec(fd, &iov, msg, iov_fast);
	.....
	ret = __xddp_recvmsg(fd, iov, msg->msg_iovlen, flags, &saddr);
	.......
	/* Copy the updated I/O vector back */
	if (rtdm_put_iovec(fd, iov, msg, iov_fast))
		return -EFAULT;

	/* Copy the source address if required. */
	if (msg->msg_name) {
		if (rtipc_put_arg(fd, msg->msg_name, &saddr, sizeof(saddr)))
			return -EFAULT;
		msg->msg_namelen = sizeof(struct sockaddr_ipc);
	}

	return ret;
}

sendmsg()一樣先調用rtdm_get_iovec()處理要讀取的數據條數及每條數據長度,再調用__xddp_recvmsg()讀取數據。

static ssize_t __xddp_recvmsg(struct rtdm_fd *fd,
			      struct iovec *iov, int iovlen, int flags,
			      struct sockaddr_ipc *saddr)
{
	struct rtipc_private *priv = rtdm_fd_to_private(fd);
	struct xddp_message *mbuf = NULL; /* Fake GCC */
	struct xddp_socket *sk = priv->state;
	ssize_t maxlen, len, wrlen, vlen;
	nanosecs_rel_t timeout;
	struct xnpipe_mh *mh;
	int nvec, rdoff, ret;
	struct xnbufd bufd;
	spl_t s;

	.....
	maxlen = rtdm_get_iov_flatlen(iov, iovlen);  /*要讀取的長度*/
	.......
	timeout = (flags & MSG_DONTWAIT) ? RTDM_TIMEOUT_NONE : sk->timeout;
	/* Pull heading message from the input queue. */
	len = xnpipe_recv(sk->minor, &mh, timeout);
	.......
}

計算本次讀取的總長度maxlen,再調用xnpipe_recv()從xnpipe輸入隊列中取一個消息節點,取消息節點過程如下:

ssize_t xnpipe_recv(int minor, struct xnpipe_mh **pmh, xnticks_t timeout)
{
	struct xnpipe_state *state;
	struct xnpipe_mh *mh;
	xntmode_t mode;
	ssize_t ret;
	int info;
	spl_t s;
	......
	state = &xnpipe_states[minor];
	......
	/*
	 * If we received a relative timespec, rescale it to an
	 * absolute time value based on the monotonic clock.
	 */
	mode = XN_RELATIVE;
	if (timeout != XN_NONBLOCK && timeout != XN_INFINITE) {
		mode = XN_ABSOLUTE;
		timeout += xnclock_read_monotonic(&nkclock);
	}

	for (;;) {
		if (!list_empty(&state->inq))/*有數據*/
			break;

		if (timeout == XN_NONBLOCK) { /*非阻塞*/
			ret = -EWOULDBLOCK;
			goto unlock_and_exit;
		}

		info = xnsynch_sleep_on(&state->synchbase, timeout, mode); /*在xnsynch上睡眠*/
		if (info & XNTIMEO) {  /*超時*/
			ret = -ETIMEDOUT;
			goto unlock_and_exit;
		}
		if (info & XNBREAK) {   /*被其他強制喚醒*/
			ret = -EINTR;
			goto unlock_and_exit;
		}
		if (info & XNRMID) {  /*該資源已被不存在*/
			ret = -EIDRM;
			goto unlock_and_exit;
		}
	}

	mh = list_get_entry(&state->inq, struct xnpipe_mh, link);/*從inq取下該消息節點*/
	*pmh = mh;
	state->nrinq--;                  /*統計減一*/
	ret = (ssize_t)xnpipe_m_size(mh);/*數據長度*/

	if (state->status & XNPIPE_USER_WSYNC) {/*如果linux線程正在等待SYNC*/
		state->status |= XNPIPE_USER_WSYNC_READY;
		xnpipe_schedule_request(); /*發送xnpipe_wakeup_apc,喚醒非實時任務*/
	}
unlock_and_exit:

	xnlock_put_irqrestore(&nklock, s);

	return ret;
}

如果輸入隊列inq為空,說名沒有消息可接收,用戶如果設置的超時等待,則阻塞等待,否則直接返回。

從輸入隊列取出一個消息節點后,如果linux發送消息時使用寫同步(O_SYNC)阻塞,需要調用xnpipe_schedule_request()發送xnpipe_wakeup_apc,告訴linux 阻塞的任務可以喚醒了,至於什么時候喚醒需要等到linux得到運行,然后返回該消息節點內數據長度len。

回到__xddp_recvmsg(),接着將消息節點內的數據拷貝到用戶緩沖區iov[nvec].iov_base中,拷貝后釋放。最后是異步io相關處理以后再說。

	len = xnpipe_recv(sk->minor, &mh, timeout);
	if (len < 0)
		return len == -EIDRM ? 0 : len;
	if (len > maxlen) {
		ret = -ENOBUFS;
		goto out;
	}
....
	mbuf = container_of(mh, struct xddp_message, mh);

	if (saddr)
		*saddr = sk->name;

	/* Write "len" bytes from mbuf->data to the vector cells */
	for (ret = 0, nvec = 0, rdoff = 0, wrlen = len;
	     nvec < iovlen && wrlen > 0; nvec++) {
		if (iov[nvec].iov_len == 0)
			continue;
		vlen = wrlen >= iov[nvec].iov_len ? iov[nvec].iov_len : wrlen;
		if (rtdm_fd_is_user(fd)) {/*用戶空間應用發起的讀*/
			xnbufd_map_uread(&bufd, iov[nvec].iov_base, vlen);
			ret = xnbufd_copy_from_kmem(&bufd, mbuf->data + rdoff, vlen);
			xnbufd_unmap_uread(&bufd);
		} else {	/*內核空間應用發起的讀*/
			xnbufd_map_kread(&bufd, iov[nvec].iov_base, vlen);
			ret = xnbufd_copy_from_kmem(&bufd, mbuf->data + rdoff, vlen);
			xnbufd_unmap_kread(&bufd);
		}
		if (ret < 0)
			goto out;
		iov[nvec].iov_base += vlen;
		iov[nvec].iov_len -= vlen;
		wrlen -= vlen;
		rdoff += vlen;
	}
out:
	xnheap_free(sk->bufpool, mbuf); /*釋放節點*/
	cobalt_atomic_enter(s);
	if ((__xnpipe_pollstate(sk->minor) & POLLIN) == 0 &&
	    xnselect_signal(&priv->recv_block, 0))
		xnsched_run();
	cobalt_atomic_leave(s);

	return ret ?: len;
}

8. 實時端關閉

用戶空間程序:

/* 關閉套接字*/
close(s);

libcoblt:

COBALT_IMPL(int, close, (int fd))
{
	int oldtype;
	int ret;

	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);

	ret = XENOMAI_SYSCALL1(sc_cobalt_close, fd);

	pthread_setcanceltype(oldtype, NULL);

	if (ret != -EBADF && ret != -ENOSYS)
		return set_errno(ret);

	return __STD(close(fd));
}

實時調用sc_cobalt_close:

COBALT_SYSCALL(close, lostage, (int fd))
{
	return rtdm_fd_close(fd, 0);
}

進一步調用rtdm_fd_close().

int rtdm_fd_close(int ufd, unsigned int magic)
{
	struct rtdm_fd_index *idx;
	struct cobalt_ppd *ppd;
	struct rtdm_fd *fd;
	spl_t s;

	secondary_mode_only();
	....
	ppd = cobalt_ppd_get(0);

	xnlock_get_irqsave(&fdtree_lock, s);
	idx = fetch_fd_index(ppd, ufd);
	.....
	fd = idx->fd;
	......
	
	__fd_close(ppd, idx, s);
	__close_fd(current->files, ufd);

	return 0;
}

無論是內核線程還是用戶線程,先得到cobalt_ppd,要關閉的ufd對應的rtdm_fd_index,接着調用__fd_close()釋放ufd:

static void
__fd_close(struct cobalt_ppd *p, struct rtdm_fd_index *idx, spl_t s)
{
	xnid_remove(&p->fds, &idx->id);
	__put_fd(idx->fd, s);

	kfree(idx);
}

__fd_close()中先將rdmt_fd對應的節點從cobalt_ppd管理任務所有rtdm_fd的紅黑樹fds上刪除,接着調用__put_fd(idx->fd, s)。

static void __put_fd(struct rtdm_fd *fd, spl_t s)
{
	int destroy;

	destroy = --fd->refs == 0;
	xnlock_put_irqrestore(&fdtree_lock, s);

	if (!destroy)
		return;

	if (ipipe_root_p)
		fd->ops->close(fd);/*rtipc_close*/
	else {
		struct lostage_trigger_close closework = {
			.work = {
				.size = sizeof(closework),
				.handler = lostage_trigger_close,
			},
		};

		xnlock_get_irqsave(&fdtree_lock, s);
		list_add_tail(&fd->cleanup, &rtdm_fd_cleanup_queue);
		xnlock_put_irqrestore(&fdtree_lock, s);

		ipipe_post_work_root(&closework, work);
	}
}

先將rtdm_fd引用計數減一,再決定是否釋放該rtdm_fd。釋放需要先判斷當前所處域,如果現在就在root域直接調用fd->ops->close(fd),也就是rtipc_close();如果現在處於head域,則需要延遲到root在處理,先將該rtdm_fd掛載到rtdm_fd_cleanup_queue。向linux發送一個work---closework,通知linux對rtdm_fd_cleanup_queue上的所有rtdm_fd執行close操作,等linux得到運行的時候也就會調用fd->ops->close(fd),也就是rtipc_close()

不管所處域如何,最終都是rtipc_close()

static void rtipc_close(struct rtdm_fd *fd)
{
	struct rtipc_private *priv = rtdm_fd_to_private(fd);
	priv->proto->proto_ops.close(fd);
	xnselect_destroy(&priv->recv_block);
	xnselect_destroy(&priv->send_block);
}

其中priv->proto->proto_ops.close(fd)是我們綁定的協議xddp對應的xddp_close()

static void xddp_close(struct rtdm_fd *fd)
{
	struct rtipc_private *priv = rtdm_fd_to_private(fd);
	struct xddp_socket *sk = priv->state;
	rtdm_lockctx_t s;

	sk->monitor = NULL;

	if (!test_bit(_XDDP_BOUND, &sk->status))
		return;

	cobalt_atomic_enter(s);
	portmap[sk->name.sipc_port] = NULL;
	cobalt_atomic_leave(s);

	if (sk->handle)
		xnregistry_remove(sk->handle);

	xnpipe_disconnect(sk->minor);
}

都是對xddp_socket成員變量的清理,最后調用xnpipe_disconnect()對XNPIPIE對象清零。

int xnpipe_disconnect(int minor)
{
	struct xnpipe_state *state;
	int need_sched = 0;
	spl_t s;
.....
	state = &xnpipe_states[minor];
.....
	state->status &= ~XNPIPE_KERN_CONN;

	state->ionrd -= xnpipe_flushq(state, outq, free_obuf, s);
.....
	xnpipe_flushq(state, inq, free_ibuf, s);

	if (xnsynch_destroy(&state->synchbase) == XNSYNCH_RESCHED)
		xnsched_run();

	if (state->status & XNPIPE_USER_WREAD) {
		state->status |= XNPIPE_USER_WREAD_READY;
		need_sched = 1;
	}

	if (state->asyncq) {	/* Schedule asynch sig. 安排異步信號。*/
		state->status |= XNPIPE_USER_SIGIO;
		need_sched = 1;
	}

cleanup:
	/*
	 * If xnpipe_release() has not fully run, enter lingering
	 * close. This will prevent the extra state from being wiped
	 * out until then.
	 */
	if (state->status & XNPIPE_USER_CONN)
		state->status |= XNPIPE_KERN_LCLOSE;
	else {
		xnlock_put_irqrestore(&nklock, s);
		state->ops.release(state->xstate);
		xnlock_get_irqsave(&nklock, s);
		xnpipe_minor_free(minor);
	}

	if (need_sched)
		xnpipe_schedule_request();

	xnlock_put_irqrestore(&nklock, s);

	return 0;
}

其中xnpipe_flushq()釋放xnpipe上的未決消息節點,如果linux阻塞在xnpipe上,喚醒它讓它給應用發送關閉信號。最后調用state->ops.release(state->xstate)xnpipe_release()來完成底層xnpipe對象的清理.

回到rtdm_fd_close(),接着完成rtdm_fd相關清理后。執行__close_fd(current->files, ufd),關閉ufd對應的文件。ufd是創建socket時linux 在用戶空間定義的[rtdm-socket]文件描述符。發出通知以后釋放rtdm_fd_index。

版權聲明:本文為本文為博主原創文章,轉載請注明出處。如有問題,歡迎指正。博客地址:https://www.cnblogs.com/wsg1100/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM