Linux進程通信之共享內存實現生產者/消費者模式


共享內存 

  共享內存是內核為進程創建的一個特殊內存段,它將出現在進程自己的地址空間中,其它進程可以將同一段共享內存連接(attach)到自己的地址空間。這是最快的進程間通信方式,但是不提供任何同步功能(需要我們信號量實現)。

  

  使用共享內存實現生產者消費者任務模式。

共享內存系統調用 

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
int semget(key_t key, int size, int flag);
void *shmat(int shmid, void *addr, int flag);
int shmdt(void *addr);
int shmctl(int shmid, int cmd, struct shmid_ds *buf);

shmget函數:

  功能:獲得或創建一個共享內存標識符。

int semget(key_t key, size_t size, int shmflag);

 

  • 成功返回一個共享內存標識符,失敗返回-1;
  • 第一個參數key為共享內存段命名(一般由ftok產生);
  • 第二個參數size為需要共享的內存容量。(如果共享內存已存在時,不能不大於該共享內存段的大小);
  • 第三個參數設置訪問權限(低9位)與IPC_CREAT, IPC_EXCL 的按位或。

shmat函數

  功能:將共享內存段連接到一個進程的地址空間中。

void *shmat(int  shm_id, const  void *addr, int shmflg) ;  
  • 成功返回共享存儲段連接的實際地址,失敗返回-1
  • 第一個參數shm_id為shmget返回的共享內存標識符。
  • 第二個參數addr指明共享內存段要連接到的地址(進程空間內部地址),通常指定為空指針,表示讓系統來選擇共享內存在進程地址空間中出現的地址。
  • 第三個參數shmflg可以設置為兩個標志位(通常設置為0)
  • SHM_RND( 表示第二個參數指定的地址應被向下靠攏到內存頁面大小的整數倍)
  • SHM_RDONLY,要連接的共享內存段是只讀的。

shmdt函數

  功能:將共享內存從當前進程中分離。

int shmdt(const  void *shmaddr) ;    //其中shmaddr為shmat返回的地址。

shmctl函數

  功能:查看及修改共享內存段的shmid_ds結構,刪除該結構以及相連的共享存儲段標識。

int shmctl(int  shm_id, int command, struct shmid_ds *buf) ; 
  • 成功返回0,失敗返回-1
  • 第二個參數commad取值:
  • IPC_STAT 獲取當前共享內存段的shmid_ds結構
  • IPC_SET 把共享內存段的當前關聯值設置為shmid_ds結構給出的值
  • IPC_RMID 從系統中刪除該共享存儲段。
  • 第三個參數buf是一個結構指針,它指向共享內存模式和訪問權限的結構

    struct shmid_ds 
    { 
        uid_t shm_perm.uid; 
        uid_t shm_perm.gid; 
        mode_t shm_perm.mode; 
    };

 

生產者/消費者模式

  某個模塊負責產生數據(生產者),另一個模塊來負責處理(消費者)。

生產者任務流程圖

消費者任務流程圖

為什么要使用生產者消費者模式

  • 解耦 ----如果讓生產者直接與消費者交互,那么生產者對於消費者就會產生依賴(也就是耦合);
  • 支持並發----傳統方式,在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。使用了生產者/消費者模式之后,生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據;
  • 支持忙閑不均----當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。

實現共享內存的生產者消費者模式 

  編寫兩個應用程序,其中一個應用程序實現生產者任務,一個應用程序實現消費者任務。
生產者任務和消費者任務之間通過共享內存機制實現跨進程的共享緩沖池;在信號量集合中支持一個信號量(或利用一個POSIX信號量),實現對共享緩沖池的互斥訪問;緩沖區的分配計數通過修改緩沖池結構中的計數變量來實現。 

  緩沖池結構:

struct shared_use_st 
{ 
     //為0表示對應的緩沖區未被生產者使用,可分配但不可消費;為1表示對應    
     的緩沖區以被生產者使用,不可分配但可消費

    //5個字符串緩沖區
    char Buffer[5][100];
    int Index[5];
};    

  緩沖區的互斥訪問,5個緩沖區的緩沖池作為一個臨界資源:

  • 當生產者任務從數據源—文件中讀取數據后將會申請一個緩沖區,並將此數據放入緩沖區中。
  • 消費者任務從一個緩沖區中取走數據,並將其中的內容打印輸出。
  • 當一個生產者任務正在訪問緩沖區時,其他生產者和消費者任務不能訪問緩沖區

 

  • 當一個消費者任務正在訪問緩沖區時,其他其他生產者和消費者任務不能訪問緩沖區
  • 使用信號量集(包含1個信號量,其初始值為1)實現對緩沖池的互斥訪問

 

源碼

#Producer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <semaphore.h>
#include <fcntl.h>
#define TEXT_SZ 1024

//緩沖池結構
struct shared_use_st  
{  
    int Index[5];    //5個緩沖池,為0表示對應的緩沖區未被生產者使用,可分配但不可消費;為1表示對應的緩沖區已被生產者使用,不可分配但可消費
    char Buffer[5][TEXT_SZ];    //5個字符串緩沖區
    sem_t sem;        //信號量,同步功能
};

int main()
{
    int running = 1;
    int i = 0;  
    void *shm = NULL;        //共享存儲段連接的實際地址
    struct shared_use_st *shared = NULL;
    char buffer[BUFSIZ + 1];        //緩沖區存放字符
    int shmid;        //共享內存標識符
    //獲得或創建一個共享內存標識符
    shmid = shmget((key_t)1121, sizeof(struct shared_use_st), 0666|IPC_CREAT);
    if(shmid == -1)        //獲取或創建一個共享內存標識符失敗
    {   
      exit(EXIT_FAILURE);
    }
    shm = shmat(shmid, (void*)0, 0);        //返回共享存儲段連接的實際地址
    if(shm == (void*)-1)
    {
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %ld\n", (intptr_t)shm);
    shared = (struct shared_use_st*)shm;        //緩沖池為共享存儲段連接地址
    for( ; i < 5; i++ )
    {
        shared->Index[i] = 0;        //對緩沖池初始化,Index為0表示可以生產
    }
    sem_init(&(shared->sem),1,1);        //信號量化初始化,且信號量初始值為第二個1
    i = 0;
    while(running)        //制造一個循環
    {
        if(sem_wait(&(shared->sem)) == -1)        //sem_wait為P操作,減少信號量的值
        {
            printf("P操作 ERROR!\n");
            exit(EXIT_FAILURE);
        }
        for(i = 0; i < 5 && shared->Index[i] == 1; i++)
        ;
        if(i == 5)        //Index為1表示緩沖池被消費者占用
        {  
            //當五個空間都被消費者占用時輸出“waiting...”
            sem_post(&shared->sem);        //sem_post為V操作,用來增加信號量的值
            sleep(1);        //sleep一段時間,再次進入循環
            printf("Waiting for some time...\n");
        }
        else
        {
            sem_post(&shared->sem);        //V 操作增加信號量
               printf("Enter some text with keyboard: ");
               fgets(buffer, BUFSIZ, stdin);        //讀取stdin字符流最多BUFSIZ-1個,並存在buffer數組中 其中stdin是鍵盤輸入到緩沖區的字符
               strncpy(shared->Buffer[i%5], buffer,TEXT_SZ);        //讀取的字符串存入緩沖區shared->Buffer中
               shared->Index[i%5] = 1;        //表示該緩沖區被生產者使用了
            if(strncmp(buffer, "end", 3) == 0)        //緩沖區的字符為end時,結束循環
            {
                running = 0;
            }
        }   
     }
     //將共享內存從當前進程中分離
    if(shmdt(shm) == -1)        //失敗
    {
        exit(EXIT_FAILURE);
    }
    /*查看及修改共享內存段的shmid_ds結構,刪除該結構以及相連的共享存儲段標識
    struct shmid_ds  
    {  
        uid_t shm_perm.uid;  
        uid_t shm_perm.gid;  
        mode_t shm_perm.mode;  
    };
    */
    if(shmctl(shmid, IPC_RMID, 0) == -1)        //失敗
    {
        exit(EXIT_FAILURE);
    }  
    exit(EXIT_SUCCESS);
}
#Consumer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/shm.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <fcntl.h>
#define TEXT_SZ 1024

//緩沖池結構
struct shared_use_st  
{  
    int Index[5];    //5個緩沖池,為0表示對應的緩沖區未被生產者使用,可分配但不可消費;為1表示對應的緩沖區被生產者使用,不可分配但可消費
    char Buffer[5][TEXT_SZ];    //5個字符串緩沖區
    sem_t sem;        //信號量,同步功能
};

int main()
{
    int running = 1;
    int i = 0; 
    void *shm = NULL;        //共享存儲段連接的實際地址
    struct shared_use_st *shared = NULL;    //緩沖池
    int shmid;        //聲明共享內存標識符
    
    shmid = shmget((key_t)1121, sizeof(struct shared_use_st), 0666|IPC_CREAT); //獲得或創建一個共享內存標識符
    if(shmid == -1)        //獲取或創建一個共享內存標識符失敗
    {  
        exit(EXIT_FAILURE);
    }
    
    //將共享內存段連接到一個進程的地址空間中,返回void *指針
    shm = shmat(shmid, 0, 0);    //返回共享存儲段連接的實際地址    
    if(shm == (void*)-1)        //失敗
    {
        exit(EXIT_FAILURE);
    }
    printf("Memory attached at %ld\n", (intptr_t)shm);
    shared = (struct shared_use_st*)shm;        //緩沖池為共享存儲段連接地址
    while(running)
    {
        if(sem_wait(&(shared->sem)) == -1)        //sem_wait為P操作,減少信號量的值
        {
            printf("P操作 ERROR!\n");
            exit(EXIT_FAILURE);
        }
        for(i = 0; i < 5 && shared->Index[i] == 0; i++)
           ;
           //五個緩沖區沒有都被生產者占用
        if(i != 5)
        {   
            printf("You wrote: %s\n", shared->Buffer[i%5]);        //打印出生產者寫入的字符
            shared->Index[i%5] = 0;        //為0時,表示已被消費者使用
            sem_post(&shared->sem);        //sem_post為V操作
            sleep(1); 
            if( strncmp(shared->Buffer[i%5], "end", 3) == 0 )        //緩沖區的字符為end時,結束循環
            {
                 running= 0;
            }
        }
        //五個空間都被占用,輸出waiting...  
          else
          {
             sem_post(&shared->sem);        //V操作
             sleep(1);  
            printf("Waiting for some time...\n");
        }
    }
    //將共享內存從當前進程中分離
    if(shmdt(shm) == -1)        //分離失敗
    {
        exit(EXIT_FAILURE);
    }
    if(shmctl(shmid, IPC_RMID, 0) == -1)    //失敗
    {
        exit(EXIT_FAILURE);
    }  
    exit(EXIT_SUCCESS);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM