(轉):從內核代碼聊聊pipe的實現


來源: http://luodw.cc/2016/07/09/pipeof/

用linux也有兩年多了,從命令,系統調用,到內核原理一路學過來,我發現我是深深喜歡上這個系統;使用起來就是一個字“爽”;當初在看 linux內核原理時,對linux內核源碼有種敬畏的心理,不敢涉入,主要是看不懂,直到最近實習的時候,在某次分享會上,某位老師分享了OOM機制, 我很感興趣,就去看內核代碼,發現,原來我能看懂了;所以想寫篇博客,分享下從內核代碼分析pipe的實現;

廈大上弦場

這部分內容說簡單也很簡單,說難也難,其實就是需要了解linux內核一些原理,例如系統調用嵌入內核,虛擬文件系統等等;

接下來,我會從以下小點介紹管道

  • 用戶態管道的使用;
  • 虛擬文件系統
  • 內核態管道的實現原理;
  • fifo命名管道實現
  • 總結;

管道的使用


一開始接觸linux,相信很多人都是從命令開始;當一個命令的輸出,需要作為另一個命令的輸入時,我們就會使用管道來實現這個功能;例如,我們經常需要在某個文檔中查找是否存在某個單詞,我們就可以用如下方式:

cat test.txt | grep 'hello'

這行命令表示在test.txt文件中查找包含單詞'hello'的句子。我們先解釋下這行命令是怎么實現的;

我們知道終端也是一個進程,當我們輸入一個命令執行時,其實是終端程序調用fork和exec產生一個子進程執行命令程序;當終端在執行這行命令時,會先解析輸入的參數,當發現輸入的命令行中有‘|’符號時,就會知道在命令行中包含了管道,因此,在終端程序中,

  • 會先fork出一個子進程,並執行exec將cat載入內存;
  • 接着在cat程序中,用函數pipe定義出管道;
  • 在定義出管道之后,再調用fork,生成一個子進程;
  • 在父進程cat中關閉管道讀端,將cat進程的標准輸出重定向到管道的寫端;
  • 在子進程中將管道的寫端關閉,將標准輸入重定向到管道的讀端,再調用exec將grep進程載入內存;
  • 最后,cat的輸出就可以最為grep的輸入了;

這里需要說明的是,父進程cat對管道的操作必須在fork之前,否則父進程cat對管道的操作會繼承到子進程,這樣會導致子進程無法讀取父進程的數據;我們可以用一個簡單的程序來模擬上述過程,為了簡單起見,例子簡單地將字符串從小寫轉為大寫;程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>

int main(void){
int fd[2];
int ret=pipe(fd);//創建管道
if (ret==-1){
fprintf(stderr, "%s\n", "pipe error!");
exit(-1);
}
int pid=fork();
if(pid<0){
fprintf(stderr, "%s\n", "fork error!");
exit(-1);
}
if(pid==0){//在子進程中
close(fd[1]);
dup2(fd[0],STDIN_FILENO);//將子進程的標准輸入重定向到fd[0]
ret=execl("./toUpper","toUpper", "",NULL);//執行子進程
if(ret==-1){
fprintf(stderr, "%s\n", "execl error!");
exit(-1);
}
}

// 以下是父進程
close(fd[0]);
dup2(fd[1],STDOUT_FILENO);// 將父進程的標准輸出重定向到fd[1]
char buf[1024];
int n=read(STDIN_FILENO,buf,1024);// 從標准輸入讀取數據
if (n<0) {
fprintf(stderr, "%s\n", "read error!");
exit(-1);
}// 將數據寫入管道緩沖區中
write(STDOUT_FILENO, buf,n);
sleep(1);
return 0;
}

上述為主程序;在主程序中通過fork函數創建出一個子進程;在父進程中關閉管道讀端,將標准輸出重定向到管道寫端;當在父進程有數據輸出到標准輸出時,就可以輸出到管道的緩沖區;在子進程中,關閉管道寫端,將標准輸入重定向到管道讀端,這樣子進程從標准輸入讀取時,就可以從管道緩沖區讀取;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>

#define BUFSIZE 1024

char toUp(char ch){
if (ch>'a' && ch <'z'){
ch = ch - 32;
}else{
ch = ch;
}
return ch;
}
int main(){
int i=0;
char buf[BUFSIZE];
// 從標准輸入讀取數據,其實就是從管道緩沖區讀取數據
int n=read(STDIN_FILENO,buf,BUFSIZE);
if (n<0){
fprintf(stderr, "%s\n", "read error!");
exit(-1);
}
for (;i<n;i++)
{
buf[i]=toUp(buf[i]);
}
printf("%s\n", buf);
return 0;
}

上述程序為父進程調用的子程序,先從管道緩沖區讀取數據,然后將每個字母轉換為大寫字母,最后輸出到標准輸出;例子很簡單,當然,也可以使用C語言io庫封裝好的popen函數來實現上述功能;

虛擬文件系統


在講管道之前,必須先介紹下linux虛擬文件系統,否則很難說清楚在這里;虛擬文件系統是linux內核四大模塊之一,我們知道linux下面everything is file。例如磁盤文件,管道,套接字,設備等等;我們都可以通過read和write函數來讀取上述文件的數據;為了支持這一特性,linux引入虛擬文件系統,就是通過一層文件系統虛擬層,屏蔽不同文件系統的差異,實現相同的函數接口操作;linux支持非常多的文件系統,我們可以通過查看

cat /proc/filesystems

包括基於磁盤的文件ext4,ext3等,基於內存的文件系統proc,pipefs,sysfs,ramf以及tmpfs,和套接字文件系統sockfs;

當我們在用戶態調用read函數讀取一個文件描述符時,主要過程如下:

  1. 首先通過軟中斷嵌入內核,調用系統相應服務例程sys_read,sys_read函數如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    asmlinkage ssize_t sys_read(unsigned int fd, char __user * buf, size_t count)
    {
    struct file *file;
    ssize_t ret = -EBADF;
    int fput_needed;
    // fget_light函數從當前進程的文件描述符表中,通過文件描述符,
    // 獲取file結構體
    file = fget_light(fd, &fput_needed);
    if (file) {
    loff_t pos = file_pos_read(file);//獲取讀取文件的偏移量
    ret = vfs_read(file, buf, count, &pos);//調用虛擬文件系統調用層
    file_pos_write(file, pos);// 更新當前文件的偏移量
    fput_light(file, fput_needed);// 更新文件的引用計數
    }

    return ret;
    }

我們可以看到sys_read服務例程的參數和系統調用read的參數是一樣的,首先通過fd從當前的文件數組中獲取file實例,接着獲取當前的讀偏移量,然后進入虛擬文件系統vfs_read調用;

  1. 接下來看看vf_read虛擬層調用的過程:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    ssize_t vfs_read(struct file *file, char __user *buf, size_t count, loff_t *pos)
    {
    struct inode *inode = file->f_dentry->d_inode;
    ssize_t ret;
    if (!(file->f_mode & FMODE_READ))
    return -EBADF;
    if (!file->f_op || (!file->f_op->read && !file->f_op->aio_read))
    return -EINVAL;
    ret = locks_verify_area(FLOCK_VERIFY_READ, inode, file, *pos, count);
    if (!ret) {
    ret = security_file_permission (file, MAY_READ);
    if (!ret) {
    if (file->f_op->read)
    // 進入具體文件系統
    ret = file->f_op->read(file, buf, count, pos);
    else
    ret = do_sync_read(file, buf, count, pos);
    if (ret > 0)
    dnotify_parent(file->f_dentry, DN_ACCESS);
    }
    }
    return ret;
    }

在這個函數中,一開始先屬性檢查以及安全性檢查,然后通過下面代碼進入具體的文件系統

ret = file->f_op->read(file, buf, count, pos);

每種文件系統的file->f_op->read是不一樣的,像基於磁盤的文件系統,file->f_op->read函數是先到緩存緩存獲取數據,如果緩存沒有數據,則到磁盤獲取;基於內存的文件系統,file->f_op->read則是直接在內核緩存獲取數據,而不會到磁盤獲取數據.

所以虛擬文件系統類似於面向對象多態的實現,首先設計好接口,不同的文件系統分別實現這些接口,這樣就可以調用相同的接口,實現不同的操作;

而這個file->f_op主要是從inode->i_fop中獲得,因此對於不同的文件系統,inode也結構也是有區別的.當創建一個inode時,針對不同的文件系統需要設置不同的屬性,最主要就是各種操作函數指針結構體,例如inode->i_op和inode->i_fop;這樣不同的文件系統,就可以在f->f_op->read調用中,實現不同的操作.

內核管道的實現


上面給出了管道簡單的操作以及稍微介紹了虛擬文件系統,pipefs主要的系統調用就是pipe,read和write。下面來分析內核是怎么實現管道的;linux下的進程的用戶態地址空間都是相互獨立的,因此兩個進程在用戶態是沒法直接通信的,因為找不到彼此的存在;而內核是進程間共享的,因此進程間想通信只能通過內核作為中間人,來傳達信息。下圖顯示了兩個進程間通過內核緩存進行通信的過程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
                   |
| 入 +-------+
+--------------+------------< |

| | | 進程1 |
+---v----+ | | |
| | | +-------+
| 緩 存 ||
| (page) ||
| | |
+---v----+ | +-------+
| | | |

| | | 進程2 |
+--------------+------------> |
| 取 +-------+
|

pipe的實現就是和上述圖示一樣,在pipefs文件系統的inode中有一個屬性

struct pipe_inode_info *i_pipe;

這個結構體定義如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//pipe_fs_i.h
struct pipe_inode_info {
wait_queue_head_t wait;
char *base;//指向管道緩存首地址
unsigned int len;//管道緩存使用的長度
unsigned int start;//讀緩存開始的位置
unsigned int readers;
unsigned int writers;
unsigned int waiting_writers;
unsigned int r_counter;
unsigned int w_counter;
struct fasync_struct *fasync_readers;
struct fasync_struct *fasync_writers;
};

這個結構體定義了管道的緩存,由base指向,緩存大小為一個內存頁,有如下定義

#define PIPE_SIZE PAGE_SIZE

其實到現在我們大概可以猜得到管道的是實現原理,在一個進程中,向管道中寫入數據時,其實就是寫入這個緩存中;然后在另一個進程讀取管道時,其實就是從這個緩存讀取,實現進程的通信.

這個緩存也可以解釋為什么管道是單通道的:

因為只有一個緩存,如果是雙通道,那么兩個進程同時向這塊緩存寫數據時,這樣會導致數據覆蓋,即一個進程的數據被另一個進程的數據覆蓋.而向套接字有讀寫緩存,因此套接字是雙通道的.

ok,接下來,從pipe函數開始,看看內核是如何創建管道的.pipe系統調用在內核對應的服務例程為sys_pipe,在sys_pipe函數中,接着調用do_pipe創建兩個管道描述符,一個用於寫,另一個用於讀;我們來看下do_pipe都做了什么.

do_pipe函數

一開始先獲得兩個空file實例,一個對應管道讀描述符,另一個對應管道寫描述符

1
2
3
4
5
6
7
8
error = -ENFILE;
f1 = get_empty_filp();
if (!f1)
goto no_files;

f2 = get_empty_filp();
if (!f2)
goto close_f1;

接着通過調用get_pipe_inode來實例化一個帶有pipe屬性的inode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
struct inode* pipe_new(struct inode* inode)
{
unsigned long page;
// 申請一個內存頁,作為pipe的緩存
page = __get_free_page(GFP_USER);
if (!page)
return NULL;
// 為pipe_inode_info結構體分配內存
inode->i\_pipe = kmalloc(sizeof(struct pipe_inode\_info), GFP_KERNEL);
if (!inode->i_pipe)
goto fail_page;

// 初始化pipe_inode_info屬性
init_waitqueue_head(PIPE_WAIT(*inode));
PIPE_BASE(*inode) = (char*) page;
PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
PIPE_WAITING_WRITERS(*inode) = 0;
PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;
*PIPE_FASYNC_READERS(*inode) = *PIPE_FASYNC_WRITERS(*inode) = NULL;

return inode;
fail_page:
free_page(page);
return NULL;
}
//----------------------------------------------------------------
static struct inode * get_pipe_inode(void)
{
// 從pipefs超級塊中分配一個inode
struct inode *inode = new_inode(pipe_mnt->mnt_sb);

if (!inode)
goto fail_inode;
// pipe_new函數主要用來為這個inode初始化pipe屬性,就是pipe_inode_info結構體
if(!pipe_new(inode))
goto fail_iput;
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;
inode->i_fop = &rdwr_pipe_fops;//設置pipefs的inode操作函數集合,rdwr_pipe_fops
// 為結構體,包含讀寫管道所有操作

inode->i_state = I_DIRTY;
inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
inode->i_uid = current->fsuid;
inode->i_gid = current->fsgid;
inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
inode->i_blksize = PAGE_SIZE

return inode;
}

然后,在當前進程的files_struct結構中獲取兩個空的文件描述符,分別存儲在i和j

1
2
3
4
5
6
7
8
9
error = get_unused_fd();
if (error < 0)
goto close_f12_inode;
i = error;

error = get_unused_fd();
if (error < 0)
goto close_f12_inode_i;
j = error;

下一步就是為這個inode分配dentry目錄項,dentry主要用於將file和inode連接起來,以及設置f1和f2的vfsmnt,dentry,mapping屬性

1
2
3
4
5
6
7
8
9
10
11
12
sprintf(name, "[%lu]", inode->i_ino);
this.name = name;
this.len = strlen(name);
this.hash = inode->i_ino; /* will go */
dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &this);
if (!dentry)
goto close_f12_inode_i_j;
dentry->d\_op = &pipefs_dentry_operations;
d_add(dentry, inode);
f1->f\_vfsmnt = f2->f\_vfsmnt = mntget(mntget(pipe_mnt));
f1->f\_dentry = f2->f_dentry = dget(dentry);
f1->f\_mapping = f2->f\_mapping = inode->i_mapping;

最后,針對讀寫file實例設置不同的屬性,並且將兩個fd和兩個file實例關聯起來

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* read file */
f1->f\_pos = f2->f_pos = 0;
f1->f\_flags = O_RDONLY;//f1這個file實例只可讀
f1->f\_op = &read_pipe_fops;//這是這個可讀file的操作函數集合結構體
f1->f\_mode = FMODE_READ;
f1->f_version = 0;

/* write file */
f2->f_flags = O_WRONLY;//f2這個file實例只可寫
f2->f_op = &write_pipe_fops;//這是這個只可寫的file操作函數集合結構體
f2->f_mode = FMODE_WRITE;
f2->f_version = 0;

fd_install(i, f1);//將i(fd)和f1(file)關聯起來
fd_install(j, f2);// 將j(fd)和f2(file)關聯起來
fd[0] = i;
fd[1] = j;
return 0;

到這里,do_pipe函數就算結束了,並且用i和j文件描述符填充了fd[2]數組,最后在sys_pipe函數中通過copy_to_user將fd[2]數組返回給用戶程序;

總結下do_pipe函數的執行過程:

  1. 實例化兩個空file結構體;
  2. 創建帶有pipe屬性的inode結構;
  3. 在當前進程文件描述符表中找出兩個未使用的文件描述符;
  4. 為這個inode分配dentry結構體,關聯file和inode;
  5. 針對可讀和可寫file結構,分別設置相應屬性,主要是操作函數集合屬性;
  6. 關聯文件描述符和file結構
  7. 將兩個文件描述符返回給用戶;

pipe讀操作

當通過pipe函數獲取到兩個文件描述符,即可使用read和write函數分別對這兩個描述符進行讀寫;我們先來看下read操作;

有之前虛擬文件系統知道,當用戶態調用read函數時,對應於內核態sys_read,然后在sys_read函數中調用vfs_read函數,在vfs_read函數中調用file->f_op->read,由上述do_pipe函數可以知道,pipefs的read(file)實例對應的file->f_op為read_pipe_fpos,這個read_pipe_fpos結構體定義如下:

1
2
3
4
5
6
7
8
9
10
11
struct file_operations read_pipe_fops = {
.llseek = no_llseek,
.read = pipe_read,
.readv = pipe_readv,
.write = bad_pipe_w,
.poll = pipe_poll,
.ioctl = pipe_ioctl,
.open = pipe_read_open,
.release = pipe_read_release,
.fasync = pipe_read_fasync,
};

因此,在vfs_read函數中調用的(pipe)file->f_op->read即為pipe_read函數,這個函數定義在fs/pipe.c文件中,

1
2
3
4
5
6
static ssize_t
pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
{
struct iovec iov = { .iov\_base = buf, .iov_len = count };
return pipe_readv(filp, &iov, 1, ppos);
}

pipe_read函數將用戶程序的接收數據緩沖區和大小轉換為iovec結構,然后調用pipe_readv函數從緩沖區獲取數據;在pipe_readv函數中,最主要部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int size = PIPE_LEN(*inode);
if (size) {
// 獲取管道緩沖區讀首地址
char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
// 緩沖區可讀最大值=PIPE\_SIZE - PIPE\_START(inode)
ssize_t chars = PIPE_MAX_RCHUNK(*inode);

// 下面兩個if語句用於比較緩沖區可讀最大值,緩沖區數據長度以及
// 用戶態緩沖區的長度,取最小值
if (chars > total_len)
chars = total_len;
if (chars > size)
chars = size;
// 調用如下函數把數據拷貝到用戶態
if (pipe_iov_copy_to_user(iov, pipebuf, chars)) {
if (!ret) ret = -EFAULT;
break;
}
ret += chars;
// 更新緩沖區讀首地址
PIPE_START(*inode) += chars;
// 對緩沖區長度取模
PIPE_START(*inode) &= (PIPE_SIZE - 1);
// 更新緩沖區數據長度
PIPE_LEN(*inode) -= chars;
// 更新用戶態緩沖區長度
total_len -= chars;
do_wakeup = 1;
if (!total_len)
break; /* 如果用戶態緩沖區已滿,則讀取成功 */
}

上述代碼是在一個循環中,直到用戶態緩沖區已滿,或者管道緩沖區全部數據讀取完畢;當然這還涉及到如果緩沖區為空,則當前進程阻塞(切換到其他進程)等等;我們來看下pipe_iov_copy_to_user函數

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static inline int
pipe_iov_copy_to_user(struct iovec *iov, const void *from, unsigned long len)
{
unsigned long copy;

while (len > 0) {
while (!iov->iov_len)
iov++;
copy = min_t(unsigned long, len, iov->iov_len);

if (copy_to_user(iov->iov_base, from, copy))
return -EFAULT;
from += copy;
len -= copy;
iov->iov_base += copy;
iov->iov_len -= copy;
}
return 0;
}

這個函數很簡單,其實就是在一個循環中,將緩沖區中數據通過copy_to_user函數寫到用戶態空間緩沖區中。最后在用戶態read函數返回之后,即可在緩沖區中讀取到管道中數據。

pipe的寫過程其實就是和read的過程相反,首先也是通過系統調用嵌入內核write->sys_write->vfs_write,在vfs_write函數中調用file->f_op->write函數,而這個函數對應管道寫file實例的pipe_write函數。后面的過程就是將用戶態緩沖區的數據拷貝到內核管道緩沖區,不再敘述;

fifo命名管道的實現

因為pipe只能用在兩個有親緣關系的進程上,例如父子進程;如果要在兩個沒有關系的進程上用管道通信時,這時pipe就派不上用場了。我們可以思考一個問題,如何讓兩個不相干的進程找到帶有pipe屬性的inode了?我們自然就想到利用磁盤文件。因為linux下兩個進程訪問同一個文件時,雖然各自的file是不一樣的,但是都是指向同一個inode節點。所以將pipe和磁盤文件結合,就產生了fifo命名管道;

fifo的實現原理和pipe一樣,我們可以看下fifo和pipe的read函數操作集合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//read_fifo_fpos
struct file_operations read_fifo_fops = {
.llseek = no_llseek,
.read = pipe_read,
.readv = pipe_readv,
.write = bad_pipe_w,
.poll = fifo_poll,
.ioctl = pipe_ioctl,
.open = pipe_read_open,
.release = pipe_read_release,
.fasync = pipe_read_fasync,
};
// read_pipe_fops
struct file_operations read_pipe_fops = {
.llseek = no_llseek,
.read = pipe_read,
.readv = pipe_readv,
.write = bad_pipe_w,
.poll = pipe_poll,
.ioctl = pipe_ioctl,
.open = pipe_read_open,
.release = pipe_read_release,
.fasync = pipe_read_fasync,
};

可以看出來,二者操作函數一樣,說明對fifo的讀寫操作也是對管道緩沖區進行讀寫;唯一不同點是輪詢函數,其實fifo_poll和pipe_poll也是一樣的

#define fifo_poll pipe_poll

而fifo創建的文件只是讓讀寫進程能找到相同的inode,進而操作相同的pipe緩沖區。

總結


這篇文章,主要從內核代碼介紹了pipe的實現,總結一點就是兩個進程對同一塊內存的操作,和進程內部多個線程操作同一個塊內存類似。我這只是簡單的說明pipe的實現原理,當然,實際上還有許多內容,例如管道阻塞和非阻塞,管道輪詢等等。此外還介紹了fifo命名管道的實現原理。







免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM