進程間通信的方式中,我們將多個進程共享同一塊存儲區來進行數據交換的方式稱為共享內存通信。源於它直接將“內存”共享的特殊機制,它成為最快的一種IPC通信方式;然而它也不完美,它雖快,但是沒有同步機制;通常在一個服務進程對共享存儲區還未完成寫操作之前,客戶進程是不應當去取這些數據的,可沒了同步,那可就亂了套了。
這種情況不是我們所願意看到的,所以基於此 我們常常需要為用到的共享內存段添加上同步的機制,使之“完美”起來。通常呢,實現同步我們很自然地會想到信號量,是的,這里我就是這么干的。用基於信號量的PV操作實現完成一個帶同步機制的共享內存段,這樣它就可以像一個“fifo”隊列一樣了,並且它傳輸效率還會表現得非常不錯,雖然它還比較簡陋~。
信號量及相關函數
先來說說用到的信號量及處理函數吧。信號量和其它的IPC結構(前面總結過管道、消息隊列)不同。它本質就是一個計數器,用於為多個進程提供對共享數據對象的訪問。通常進程為了獲得共享的資源,需要執行以下操作:
①測試控制該資源的信號量
②若此信號量>0,則進程可以使用該資源。此種情況下,進程會將信號量值減1,表明它使用了一個資源單位。
③否則,此信號量的值為0,則使該進程進入休眠狀態,直至信號量值>1。如果有進程正在休眠狀態等待此信號量,則喚醒它們
還有就是為了正確的實現信號量,信號量值的測試及減1操作還應當是原子的。為此,信號量通常是在內核中實現的。一般而言,信號量初值可以是任意一個正值,該值表明有多少個共享資源單位可供共享應用。然而遺憾的是,這里我用到的XSI信號量也是有缺陷的。
這源於①信號量並非是單個非負值,它被定義為一個可能含有多個信號量值的集合。通常在創建信號量的時候,對該集合中信號量數量進行指定 ②信號量創建獨立於它的初始化。這是最致命的,因為這將導致的是不能原子的創建一個信號量集合,並對該集合中的各個信號量值賦初值。③有的程序1在終止時可能並沒有釋放掉已經分配給它的信號量。
而對於信號處理函數通常有3個,首先是
1.semget函數
作用:創建一個新的信號量或取得一個已有的信號量
原型:int semget(key_t key, int nsems, int semflg)
參數:
int nsems //它代表信號量集合中有多少個信號量。如果是創建新集合,則必須指定它;如果是引用現有的信號集(通常客戶進程中),則將其指定為0.
int semflg //和前面IPC機制類似,用來初始化信號集維護的semid_ds結構中的ipc_perm結構成員sem_perm。通常用IPC_CREAT|0644創建,要直接打開已存在的話 也直接填0就好
2.semctl函數
用途:該函數用來直接控制信號量信息.也就是直接刪除信號量或者初始化信號量.
原型:int semctl(int semid, int semnum, int cmd, ...)
參數:
int semid //semget函數返回的信號量標識符.
int semnum, //它代表要給該信號量集中的第幾個信號量設置初值
int cmd //通常用SETVAL/GETVAL設置和獲取信號量集中的一個單獨的信號量。具體還有
IPC_STAT //讀取一個信號量集的數據結構semid_ds,並將其存儲在semun中的buf參數中。 IPC_SET //設置信號量集的數據結構semid_ds中的元素ipc_perm,其值取自semun中的buf參數。 IPC_RMID //將信號量集從內存中刪除。 GETALL //用於讀取信號量集中的所有信號量的值。 GETNCNT //返回正在等待資源的進程數目。 GETPID //返回最后一個執行semop操作的進程的PID。 GETVAL //返回信號量集中的一個單個的信號量的值。 GETZCNT //返回這在等待完全空閑的資源的進程數目。 SETALL //設置信號量集中的所有的信號量的值。 SETVAL //設置信號量集中的一個單獨的信號量的值
如果有第四個參數,取決於所請求的命令,如果使用該參數,它通常是一個union semum結構,定義如下:
union semun { int val; /* Value for SETVAL */通常就要它就夠了 struct semid_ds *buf; unsigned short *arry; };
賦值形式:semun.val = 2 //初值放在val中
執行PV操作
3.semop函數
用途:用來改變信號量的值,該函數是具有原子性的。
原型:int semop(int semid, struct sembuf *sops, size_t nsops)
struct sembuf
{
unsigned short sem_num; /* semaphore number */除非使用一組信號量,否則它為0
short sem_op; /* semaphore operation */ p -1, v 1
short sem_flg; /* operation flags */ 填 0就好 SEM_NOWAIT SEM_UNDO
}
注意這當中的sem_op參數,信號量集合中的各個成員的操作都應由相應的sem_op值規定。此值可正可負可為0,相應的值就代表對於進程中占用的資源數量,
同時這值會加到信號量值上,若指定undo標志,還從信號量調整值上減去sem_op.
共享內存增添同步機制
下面就可以開始操作一段共享內存,使其帶有同步的機制,然后模擬重現我們操作系統書上的那個經典的生產消費者問題,並且解決它。這里我畫個圖幫助整理思路:

首先,定義出一個管理內存段的“shmfifo”結構,該結構中具體可用一個shm_head結構來管理數據的讀、寫位置和大小的信息。同時,shimfifo結構中還維護了3個用於解決互斥和同步的信號量sem_mutex,sem_empty, sem_full。

維護的shm_head結構存放在共享內存的頭部,寫入數據從payload處開始寫入一個數據塊大小,每次寫入之后,便更新頭部的wr_idx位,payload可由_head+1得到;同樣,最開始讀出數據時也是從payload處開始讀,每次讀完便更新wr_idx。好,到這里就有了大致的思路。於是可以實現出來它的頭文件
#ifndef __SHMFIFO_H__
#define __SHMFIFO_H__
#include <sys/ipc.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
typedef struct shmhead {
int rd_idx; // 讀入數據索引
int wr_idx; // 寫數據索引
int blocks; // 存數據塊數量
int blksz; // 每個數據塊大小
}shmhead_t;
typedef struct shmfifo {
shmhead_t *p_head; // 共享內存的起始地址
char * p_payload; // 有效數據的起始地址
int shmid; // 打開的共享內存id
int sem_mutex; // 互斥量
int sem_empty; // 還剩多少個可以消費
int sem_full; // 剩余多少個地方可以生產
}shmfifo_t;
// 初始化函數
shmfifo_t *shmfifo_init(key_t key, int blocks, int blksz);
// 放入數據
void shmfifo_put(shmfifo_t *fifo, const void *buf);
// 取得數據
void shmfifo_get(shmfifo_t *fifo, void *buf);
// 結構銷毀
void shmfifo_destroy(shmfifo_t *fifo);
#endif //__SHMFIFO_H__
緊接着要考慮就該結構的初始化。首先,肯定是先要為結構開出空間來,其大小不難分析應該為shm_head大小加上blocks*blksz,其次就是一步步對這些變量和信號進行初始化了。
緊接着,對於放數據和取數據就依葫蘆畫瓢就好了,值得注意就是對信號量的處理,進行PV操作時,針對寫數據時,我們需要先P(sem_empty)保證先有地方可以放數據,其次才進行P(sem_mutex)保證互斥性。(否則,會因為在放入數據時進行了P(sem_mutex)操作,在還沒來得及讀數據時,就將內存段放滿,進而使取數據操作阻塞在信號量sem_mutex<0條件上,最終導致死鎖。但這里若能保證在內存段還未放滿時,讀數據進程能得到調度,那么就不會有這樣的問題了;比如,這里可以讓寫數據sleep一會,然后執行讀數據操作或者 先進行讀數據,然后再寫數據;你可以去試試看~ )

然后放數據完成,便可進行V(sem_full)接着V(sem_mutex)。在進行取數據操作時同理。基於此,便可有以下代碼:
#include "shmfifo.h"
typedef union semun{
int val;
}semun;
// 初始化
shmfifo_t* shmfifo_init(key_t key, int blocks, int blksz)
{
shmfifo_t *p = malloc(sizeof(shmfifo_t));
int shmid = shmget(key, 0, 0);
int len = sizeof(shmhead_t) + blocks*blksz; //共享內存段大小
if(shmid == -1 ) // 內存段不存在,創建
{
shmid = shmget(key, len, IPC_CREAT|0644);
if ( shmid == -1) perror("shmget"),exit(1);
//初始化內存段頭
p->p_head = shmat(shmid, NULL, 0); //將開出的內存段掛載到進程地址空間
p->p_head->rd_idx = 0;
p->p_head->wr_idx = 0;
p->p_head->blocks = blocks;
p->p_head->blksz = blksz;
//初始化后段
p->p_payload = (char*)(p->p_head+1);
p->shmid = shmid;
p->sem_mutex = semget(key, 1, IPC_CREAT|0644);
p->sem_empty = semget(key+1, 1, IPC_CREAT|0644);
p->sem_full = semget(key+2, 1, IPC_CREAT|0644);
semun su = {1}; //設置互斥信號量初值為1
semctl(p->sem_mutex, 0, SETVAL, su);
su.val = blocks;
semctl(p->sem_empty, 0, SETVAL, su);
su.val = 0; //初始不能消費
semctl(p->sem_full, 0, SETVAL, su);
}
else //內存段存在 ,打開
{
p->p_head = shmat(shmid, NULL, 0);
p->p_payload = (char*)(p->p_head+1);
p->shmid = shmid;
p->sem_mutex = semget(key, 0, 0); //
p->sem_empty = semget(key+1, 0, 0);
p->sem_full = semget(key+2, 0, 0);
}
return p;
}
static void P(int id)
{
struct sembuf sb[1] = {0,-1, 0};
semop(id, sb, 1);
}
static void V(int id)
{
struct sembuf sb[1] = {0, 1, 0};
semop(id, sb, 1);
}
// 放入數據
void shmfifo_put(shmfifo_t *fifo, const void *buf)
{
P(fifo->sem_empty); //有多少地方可供生產,確保有空位生產
P(fifo->sem_mutex); //保證進程互斥
memcpy(fifo->p_payload + fifo->p_head->wr_idx * fifo->p_head->blksz, //寫入位置
buf,
fifo->p_head->blksz); //每次寫入一個數據塊大小
fifo->p_head->wr_idx = (fifo->p_head->wr_idx+1)
%fifo->p_head->blocks; //取模,保證數據存滿時,轉從payload處寫數據
V(fifo->sem_full);
V(fifo->sem_mutex);
}
// 取得數據
void shmfifo_get(shmfifo_t* pFifo, void *buf)
{
P(pFifo->sem_full); //確保有數據可取
P(pFifo->sem_mutex);
//從內存段讀取,拷入buf中
memcpy(buf,
pFifo->p_payload + pFifo->p_head->rd_idx* pFifo->p_head->blksz,
pFifo->p_head->blksz);
pFifo->p_head->rd_idx = (pFifo->p_head->rd_idx+1)
%pFifo->p_head->blocks; //取模,保證數據存滿時,轉從payload處取數據
V(pFifo->sem_empty);
V(pFifo->sem_mutex);
}
// 銷毀
void shmfifo_destroy(shmfifo_t* pFifo)
{
shmdt(pFifo->p_head); //取消內存段掛載
shmctl(pFifo->shmid, IPC_RMID, 0); //釋放掉該內存段
//刪除信號量
semctl(pFifo->sem_mutex, 0, IPC_RMID, 0);
semctl(pFifo->sem_empty, 0, IPC_RMID, 0);
semctl(pFifo->sem_full, 0, IPC_RMID, 0);
free(pFifo);
}
最后就是分別實現get.c和put.c進行驗證,
get.c
#include "shmfifo.h"
#include <unistd.h>
typedef struct Products{
int id;
char pro_name[10];
}Pro;
int main()
{
shmfifo_t* fifo = shmfifo_init(12345, 3, sizeof(Pro));
Pro p;
while( 1){
memset(&p, 0x00, sizeof(p));
shmfifo_get(fifo, &p);
printf("id:%d, 產品名:%s\n", p.id, p.pro_name);
sleep(1);
}
shmfifo_destroy(fifo);
}
put.c
#include "shmfifo.h"
typedef struct Product
{
int id;
char pro_name[10];
}Pro;
int main()
{
shmfifo_t *fifo = shmfifo_init(12345, 4, sizeof(Pro));
Pro p;
for (int i=0; i<20; ++i)
{
memset(&p, 0x00, sizeof(p));
sprintf(p.pro_name, "iphone%d", i);
p.id = i+1;
shmfifo_put(fifo, &p);
printf("put %d ok\n", i);
}
}
驗證同步,當寫進程結束,讀數據進程便阻塞

驗證互斥,進程1寫第9個數據時,我另開啟了一進程寫數據,從右側可見這兩進程是交替進行寫操作的

