生產者消費者問題 多進程共享內存


問題描述

  • 一個大小為3的緩沖區,初始為空

  • 2個生產者隨機等待一段時間,往緩沖區添加數據,若緩沖區已滿,等待消費者取走數據后再添加,重復6次

  • 3個消費者隨機等待一段時間,從緩沖區讀取數據,若緩沖區為空,等待生產者添加數據后再讀取,重復4次

說明:

  • 顯示每次添加和讀取數據的時間及緩沖區里的數據
  • 生產者和消費者用進程模擬

思路

這道題目涉及到的知識點有:

  • 進程控制管理,包括進程的創建與銷毀等
  • 進程通信技術,如管道、共享內存等

解決思路主要是:

  • 一個主進程負責創建和銷毀子進程,負責創建共享內存區和公用信號量
  • 五個子進程,兩個是生產者,三個是消費者,通過信號量對共享內存進行互斥讀寫

創建互斥訪問量

創建信號量如下:

信號量:EMPTY, FILLED, RW
初始化:EMPTY=3,FILLED=0,RW=1

說明:

EMPTY信號量指示緩沖區有多少個空位置沒有被占用,因此初始值等於緩沖區數量;

FILLED指示緩沖區有多少個位置被占用,因此初始值等於0;

RW指示是否允許對共享內存進行讀寫操作,為防止進程並發執行導致的數據共享錯誤,每次僅允許一個進程對共享內存進行操作,因此初始值為1.

生產者消費者執行操作

生產者

P(EMPTY);//首先詢問是否有空閑緩沖區,沒有則阻塞,等待消費者拿出數據釋放一個緩沖區;有則EMPTY-=1
P(RW);//是否可以對共享內存進行讀寫操作
WRITE();//寫入數據
V(RW);//釋放讀寫鎖
V(FILLED);//這里要注意釋放的不是EMPTY,因為生產者是消耗EMPTY空間的,
//每消耗一個EMPTY則有一個緩沖區被占用,故而FILLED+=1

消費者

P(FILLED);//首先詢問緩沖區里面是否有數據
P(RW);
READ();
V(RW);
V(EMPTY);

無論是Linux還是Windows,對共享內存的訪問基本都是符合上述規則的,只是具體實現和API調用不同。Windows進程管理和共享內存管理相比於Linux要復雜的多,當然也提供了更為有效的異常處理機制。下面介紹Linux下和Windows具體實現的API。

關於進程創建和管理不再贅述,主要講解共享內存和信號量方面的內容。

共享內存

所謂共享內存,是指不同進程間出於通信或者避免冗余信息拷貝而申請的可以被不同進程共同訪問的內存區域。

Linux

在Linux平台上,相關API是由POSIX提供的。主要API有shmat,shmctl,shmget.

其中shmget用於創建共享內存;shmat用於將共享內存鏈接到當前進程地址空間中來;shmdt用於將共享內存從當前進程分離;shmctl用於控制共享內存,可以銷毀共享內存。

1. shmget

int shmget(key_t key, size_t size, int shmflg);
  • key_t key 第一個參數是用於共享內存命名,不同進程通過key進行共享內存識別。需要指出的是,shmget並不只是用於創建共享內存。這里返回的描述符有可能是已經創建好的命名為key的共享內存的描述符。具體shmget的作用以及共享內存的權限由shmflg指定。
  • size_t size 指定需要的共享內存空間大小
  • int shmflg 權限標志。如果想要在key標識的共享內存不存在時創建的話可以使用IPC_CREAT進行指示,如果還要指定該共享內存的權限的話(該權限含義與Linux下普通文件的含義相同),可以與權限做或運算。如要創建新的共享內存且指定共享內存的權限為0644(表示該進程創建的共享內存對於該用戶創建的其他內存而言具有rw權限,同用戶組以及其他用戶權限均只有r權限),則可以指定shmflgIPC_CREAT | 0644
  • 執行成功則返回一個與key相關的共享內存描述符(正整數),用戶需要記錄該描述符以后續使用。調用失敗返回-1

2. shmat

第一次創建完共享內存時,它還不能被任何進程訪問,shmat函數的作用就是用來啟動對該共享內存的訪問,並把共享內存連接到當前進程的地址空間。它的原型如下:

void *shmat(int shm_id, const void *shm_addr, int shmflg);
  • shm_id就是shmget函數返回的描述符
  • const void *shm_addr指定共享內存鏈接到當前進程中的地址位置,通常為空,讓系統自己選擇
  • int shmflg是一組標志位,常為0
  • 調用成功則返回一個指向共享內存首地址,調用失敗則返回值為(void*)-1

3. shmdt

該函數用於將共享內存從當前進程中分離。注意,將共享內存分離並不是刪除它,只是使該共享內存對當前進程不再可用。它的原型如下:

int shmdt(const void* shmaddr);
  • const void *shaddr要分離的共享內存首地址,即是shmat函數的返回值

  • 調用成功時返回0,失敗時返回-1

4. shmctl

控制共享內存,它的原型如下:

int shmctl(int shm_id, int command, struct shmid_ds *buf);
  • int shm_idshmget函數返回的共享內存描述符

  • int command 要采取的操作

  • struct shmid_ds *buf 是一個結構體指針,指向共享內存的shmid_ds結構至少包括以下成員:

    struct shmid_ds
    {
        uid_t shm_perm.uid;
        uid_t shm_perm.gid;
        mode_t shm_perm.mode;
    };
    

Windows

1. CreateFileMapping

通過這個API函數 將創建一個內存映射文件的內核對象,用於映射文件到內存。與虛擬內存一樣,內存映射文件可以用來保留一個地址空間的區域,並將物理存儲器提交給該區域。它們之間的差別是,物理存儲器來自一個已經位於磁盤上的文件,而不是系統的頁文件。

HANDLE CreateFileMapping(
  HANDLE hFile,              // handle to file to map
  LPSECURITY_ATTRIBUTES lpFileMappingAttributes,
                             // optional security attributes
  DWORD flProtect,           // protection for mapping object
  DWORD dwMaximumSizeHigh,   // high-order 32 bits of object size
  DWORD dwMaximumSizeLow,    // low-order 32 bits of object size
  LPCTSTR lpName             // name of file-mapping object
);
  • hFile:用於標識你想要映射到進程地址空間中的文件句柄。該句柄可以通過調用CreateFile函數返回。這里,我們並不需要一個實際的文件,所以,就不需要調用 CreateFile 創建一個文件, hFile 這個參數可以填寫 INVALID_HANDLE_VALUE

  • lpFileMappingAttributes:參數是指向文件映射內核對象的 SECURITY_ATTRIBUTES結構的指針,通常傳遞的值是 NULL

  • flProtect:對內存映射文件的安全設置

    • PAGE_READONLY 以只讀方式打開映射;
    • PAGE_READWRITE 以可讀、可寫方式打開映射;
    • PAGE_WRITECOPY 為寫操作留下備份
  • dwMaximumSizeHigh:文件映射的最大長度的高32位。

  • dwMaximumSizeLow:文件映射的最大長度的低32位。如這個參數和dwMaximumSizeHigh都是零,就用磁盤文件的實際長度。

  • lpName:指定文件映射對象的名字,別的進程就可以用這個名字去調用 OpenFileMapping 來打開這個 FileMapping 對象。

  • 如果創建成功,返回創建的內存映射文件的句柄,如果已經存在,則也返回其句柄,但是調用 GetLastError()返回的錯誤碼是:183(ERROR_ALREADY_EXISTS),如果創建失敗,則返回NULL

2. MapViewOfFile

如果調用CreateFileMapping成功,則調用MapViewOfFile函數,將內存映射文件映射到進程的虛擬地址中

LPVOID MapViewOfFile(
  HANDLE hFileMappingObject,  // file-mapping object to map into 
                              // address space
  DWORD dwDesiredAccess,      // access mode
  DWORD dwFileOffsetHigh,     // high-order 32 bits of file offset
  DWORD dwFileOffsetLow,      // low-order 32 bits of file offset
  DWORD dwNumberOfBytesToMap  // number of bytes to map
);
  • hFileMappingObjectCreateFileMapping()返回的文件映像對象句柄。
  • dwDesiredAccess: 映射對象的文件數據的訪問方式,而且同樣要與CreateFileMapping()函數所設置的保護屬性相匹配。
  • dwFileOffsetHigh: 表示文件映射起始偏移的高32位.
  • dwFileOffsetLow: 表示文件映射起始偏移的低32位.
  • dwNumberOfBytesToMap :文件中要映射的字節數。為0表示映射整個文件映射對象。

3. OpenFileMapping

在數據接收進程中,首先調用OpenFileMapping()函數打開一個命名的文件映射內核對象,得到相應的文件映射內核對象句柄hFileMapping;如果打開成功,則調用MapViewOfFile()函數映射對象的一個視圖,將文件映射內核對象hFileMapping映射到當前應用程序的進程地址,進行讀取操作。(當然,這里如果用CreateFileMapping也是可以獲取對應的句柄)

HANDLE OpenFileMapping(
  DWORD dwDesiredAccess,  // access mode
  BOOL bInheritHandle,    // inherit flag
  LPCTSTR lpName          // pointer to name of file-mapping object
);
  • dwDesiredAccess:同MapViewOfFile函數
  • dwDesiredAccess參數
  • bInheritHandle :如這個函數返回的句柄能由當前進程啟動的新進程繼承,則這個參數為TRUE
  • lpName :指定要打開的文件映射對象名稱。

4. 清理內核對象

//取消本進程地址空間的映射;   
UnmapViewOfFile(pLocalMem);  
      
pLocalMem=NULL;   
//關閉文件映射內核文件  
CloseHandle(hFileMapping);

信號量

為了防止出現因多個程序同時訪問一個共享資源而引發的一系列問題,我們需要一種方法,它可以通過生成並使用令牌來授權,在任一時刻只能有一個執行線程訪問代碼的臨界區域。臨界區域是指執行數據更新的代碼需要獨占式地執行。而信號量就可以提供這樣的一種訪問機制,讓一個臨界區同一時間只有一個線程在訪問它,也就是說信號量是用來調協進程對共享資源的訪問的。

信號量是一個特殊的變量,程序對其訪問都是原子操作,且只允許對它進行等待(即P(信號變量))和發送(即V(信號變量))信息操作。最簡單的信號量是只能取0和1的變量,這也是信號量最常見的一種形式,叫做二進制信號量。而可以取多個正整數的信號量被稱為通用信號量。

Linux

1. semget

創建一個新信號量或取得一個已有信號量

int semget(key_t key, int num_sems, int sem_flags);
  • key_t key 信號量命名,不相關的進程可以通過它訪問一個信號量,它代表程序可能要使用的某個資源,程序對所有信號量的訪問都是間接的,程序先通過調用semget函數並提供一個鍵,再由系統生成一個相應的信號標識符(semget函數的返回值),只有semget函數才直接使用信號量鍵,所有其他的信號量函數使用由semget函數返回的信號量標識符。如果多個程序使用相同的key值,key將負責協調工作。
  • num_sems 需要的信號量數目
  • sem_flags 是一組標志,當想要當信號量不存在時創建一個新的信號量,可以和值IPC_CREAT做按位或操作。設置了IPC_CREAT標志后,即使給出的鍵是一個已有信號量的鍵,也不會產生錯誤。而IPC_CREAT | IPC_EXCL則可以創建一個新的,唯一的信號量,如果信號量已存在,返回一個錯誤。
  • 成功返回一個相應信號標識符(非零),失敗返回-1

2. semop

它的作用是改變信號量的值,原型為:

int semop(int sem_id, struct sembuf *sem_opa, size_t num_sem_ops);
  • int sem_id 是由semget 返回的信號量描述符

  • struct sembuf定義如下

    struct sembuf{
        short sem_num;//信號量組中要修改的信號量下標
        short sem_op;//信號量在一次操作中需要改變的數據,通常是兩個數,一個是-1,即P(等待)操作,
                        //一個是+1,即V(發送信號)操作。
        short sem_flg;//通常為SEM_UNDO,使操作系統跟蹤信號,
                        //並在進程沒有釋放該信號量而終止時,操作系統釋放信號量
    };
    
  • size_t num_sem_ops 要修改的信號量數量

3. semctl

改變信號量的值,原型為:

int semop(int sem_id, struct sembuf *sem_opa, size_t num_sem_ops);
  • int sem_id 信號量組key

  • struct sembuf

    struct sembuf{
        short sem_num;//除非使用一組信號量,否則它為0
        short sem_op;//信號量在一次操作中需要改變的數據,通常是兩個數,一個是-1,即P(等待)操作,
                        //一個是+1,即V(發送信號)操作。
        short sem_flg;//通常為SEM_UNDO,使操作系統跟蹤信號,
                        //並在進程沒有釋放該信號量而終止時,操作系統釋放信號量
    };
    
  • size_t num_sem_ops 要修改的信號量的數量

Windows

1. CreateSemaphore

創建信號量

HANDLE CreateSemaphore(

  LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,

  LONG lInitialCount,

  LONG lMaximumCount,

  LPCTSTR lpName

);
  • lpSemaphoreAtributes 安全控制,一般直接傳入NULL
  • llnitialCount 初始資源數量
  • lMaximumount 最大並發數量
  • lpName 信號量的名稱,傳入NULL表示匿名信號量

2. OpenSemaphore

打開信號量

HANDLE OpenSemaphore(

  DWORD dwDesiredAccess,

  BOOL bInheritHandle,

  LPCTSTR lpName

);
  • dwDesiredAccess 訪問權限對一般傳入SEMAPHORE_ALL_ACCESS
  • blnheritHandlw 信號量句柄繼承性,一般傳入TRUE即可
  • lpName 名稱,不同進程的各線程可以通過名稱來確保它們訪問同一個信號量

3. ReleaseSemaphore

遞增信號量的當前資源計數

BOOL ReleaseSemaphore(

  HANDLE hSemaphore,

  LONG lReleaseCount,  

  LPLONG lpPreviousCount 

);
  • hSemaphore 信號量句柄
  • lReleaseCount 表示增加個數,必須大於0且不超過最大資源數量
  • lpPreviousCount 可以用來傳出先前的資源計數,設為NULL表示不需要傳出

源代碼實現

Linux

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <time.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/wait.h>

#define NUM_OF_RESOURCE 3

//共享內存緩沖區資源數據結構
typedef struct BUFF
{
    int resource[NUM_OF_RESOURCE];
    int from_ptr;
    int end_ptr;
} RESOURCE;
//簡易的先進先出循環隊列

//信號量數據結構
union semun {
    int value;
};

//用於映射信號量編號與對應意義的映射
enum MUTEX
{
    EMPTY,
    FILLED,
    RW_MUX,
    NUM_MUX
};

//信號量集合描述符
int semid;
//共享空間描述符
int shmid;
//共享空間首地址
RESOURCE *shm;

//綁定共享內存空間
void attach_shm()
{
    //將共享內存連接到當前進程的地址空間
    shm = (RESOURCE *)shmat(shmid, NULL, 0);
    if (shm == (RESOURCE *)-1)
    {
        fprintf(stderr, "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    // printf("Memory attached at %X\n", shm);
}

//解綁共享內存空間
void detach_shm()
{
    //把共享內存從當前進程中分離
    if (shmdt((void *)shm) == -1)
    {
        fprintf(stderr, "shmdt failed\n");
        exit(EXIT_FAILURE);
    }
}

//獲取資源
//對對應信號量實施P操作
void P(short unsigned int num)
{
    struct sembuf sb =
        {
            num, -1, 0 //0表示信號量編號,-1表示P操作,SEM_UNDO表示進程退出后,該進程對sem進行的操作將被撤銷
        };
    //修改集合中,一個或多個信號量值
    semop(semid, &sb, 1);
}

//釋放資源
//對對應信號量實施V操作
void V(short unsigned int num)
{
    struct sembuf sb = {
        num, 1, 0 //
    };
    semop(semid, &sb, 1);
}

// 消費者從共享內存區讀取數據
int read_resource()
{
    int result = 0;
    P(FILLED);
    P(RW_MUX);
    result = shm->resource[shm->from_ptr];
    shm->from_ptr++;
    if (shm->from_ptr >= NUM_OF_RESOURCE)
    {
        shm->from_ptr = 0;
    }
    V(RW_MUX);
    V(EMPTY);
    return result;
}

// 生產者向共享內存區寫入數據
int write_resource()
{
    srand((unsigned int)time(NULL));
    int result = rand() % 100 + 11;
    P(EMPTY);
    P(RW_MUX);
    shm->resource[shm->end_ptr] = result;
    shm->end_ptr++;
    if (shm->end_ptr >= NUM_OF_RESOURCE)
    {
        shm->end_ptr = 0;
    }
    V(RW_MUX);
    V(FILLED);
    return result;
}

// 生產者
void product(int seed)
{
    attach_shm();
    for (int i = 0; i < 6; i++)
    {
        srand((unsigned int)time(NULL) + seed);
        sleep(rand() % 2 + 1);
        time_t start_process_time = time(NULL);
        printf("productor #%2d, write data %3d at %s", getpid(), write_resource(), ctime(&start_process_time));
    }
    detach_shm();
}

// 消費者
void consume(int seed)
{
    attach_shm();
    for (int i = 0; i < 4; i++)
    {
        srand((unsigned int)time(NULL) + seed);
        sleep(rand() % 4 + 1);
        time_t start_process_time = time(NULL);
        printf("consumer  #%2d, read  data %3d at %s", getpid(), read_resource(), ctime(&start_process_time));
    }
    detach_shm();
}

int main(int argc, char const *argv[])
{
    //記錄父進程pid
    pid_t ppid = getpid();

    //信號集名字,信號集中信號量的個數,信號量集合的權限
    semid = semget((key_t)1234, NUM_MUX, IPC_CREAT | 0600); //創建信號量
    if (semid == -1)
    {
        perror("semget");
    }

    // 初始化信號量
    semun s;
    s.value = NUM_OF_RESOURCE; //初始時,緩沖區全為空
    semctl(semid, EMPTY, SETVAL, s);
    s.value = 0;//初始時,緩沖區為空,沒有被填充的數據塊
    semctl(semid, FILLED, SETVAL, s);
    s.value = 1;//剛開始時允許進行讀寫操作
    semctl(semid, RW_MUX, SETVAL, s);

    //創建共享內存
    shmid = shmget((key_t)1234, sizeof(RESOURCE), 0666 | IPC_CREAT);
    if (shmid == -1)
    {
        fprintf(stderr, "shmget failed\n");
        exit(EXIT_FAILURE);
    }
    attach_shm();
    //初始化共享內存
    for (int i = 0; i < NUM_OF_RESOURCE; i++)
    {
        shm->resource[i] = 0;
    }
    shm->from_ptr = 0;
    shm->end_ptr = 0;

    //創建六個進程——一個父進程+兩個生產者+三個消費者
    pid_t child_pid[5];
    int i = 0;
    for (i = 0; i < 5; i++)
    {
        child_pid[i] = fork();
        if (child_pid[i] == 0) //子進程
        {
            break;
        }
    }

    if (i < 2) //生產者
    {
        product(i);
    }
    else if (i < 5) //消費者
    {
        consume(i);
    }

    if (getpid() == ppid)//父進程
    {
        // printf("I am parent processe: #%d\n", ppid);
        for (int i = 0; i < 5; i++)//等待子進程結束
        {
            waitpid(child_pid[i], NULL, 0);
        }
        detach_shm();
        // 刪除共享內存
        if (shmctl(shmid, IPC_RMID, 0) == -1)
        {
            fprintf(stderr, "shmctl(IPC_RMID) failed\n");
            exit(EXIT_FAILURE);
        }
    }
    return 0;
}

運行輸出

1575637491394

Windows

#include <iostream>
#include <string>
#include <wchar.h>
#include <windows.h>
#include <tchar.h>
#include <time.h>
#define NUM_OF_CONSUMER 3
#define NUM_OF_PRODUCTOR 2
#define NUM_OF_PROCESS 5
#define NUM_OF_RESOURCE 3
using namespace std;

//共享內存緩沖區數據結構
typedef struct BUF
{
	int resource[NUM_OF_PROCESS];
	int from_ptr;
	int end_ptr;
} RESOURCE;
//簡易的先入先出循環隊列

//子進程handle
HANDLE handle_of_process[NUM_OF_PROCESS];
// 子進程主線程
HANDLE handle_of_thread[NUM_OF_PROCESS];
string shared_mem_name("SharedMemory");
string empty_mux_name("EMPTY_MUX");
string filled_mux_name("FILLED_MUX");
string rw_mux_name("RW_MUX");
string print_mux_name("PRINT_MUX");

//print_mux用於互斥打印,防止出現輸出混亂
HANDLE empty_mux, filled_mux, rw_mux, print_mux;
//共享內存映射
HANDLE h_map = NULL;
//共享內存首地址
RESOURCE *shm;

//創建子進程
void newProcess(int n_clone);
//創建信號量作為鎖
void create_mux();
//在子進程打開信號量
void open_mux();
//關閉信號量
void close_mux();
//綁定共享內存
void attach_shm();
//解綁定共享內存
void detach_shm();
//消費者
void consume(int n_clone);
//生產者
void product(int n_clone);
//生產者向共享內存寫入
int write_resource();
//消費者從共享內存讀出
int read_resource();
//對信號量進行P操作
void P(HANDLE &mutex);
//對信號量進行V操作
void V(HANDLE &mutex);

int main(int argc, TCHAR *argv[])
{

	if (argc == 1) //父進程不傳參,因此argc == 1
	{
		//創建共享內存
		h_map = ::CreateFileMapping(INVALID_HANDLE_VALUE,
									NULL,
									PAGE_READWRITE,
									0,
									sizeof(RESOURCE),
									shared_mem_name.c_str());
		if (h_map == NULL)
		{
			perror("CreateFileMapping");
			return 0;
		}
		shm = (RESOURCE *)MapViewOfFile(
			h_map,
			FILE_MAP_ALL_ACCESS,
			0,
			0,
			0);
		ZeroMemory(shm, sizeof(RESOURCE));

		create_mux();

		for (int i = 0; i < NUM_OF_PROCESS; i++)
		{
			newProcess(i);
		}

		WaitForMultipleObjects(NUM_OF_PROCESS, handle_of_process, true, INFINITE);

		for (int i = 0; i < NUM_OF_PROCESS; i++)
		{
			CloseHandle(handle_of_thread[i]);
			CloseHandle(handle_of_process[i]);
		}
		detach_shm();
		//關閉共享內存區
		CloseHandle(h_map);
		close_mux();
	}
	else //子進程傳入一個參數
	{
		int n_clone = atof(argv[1]);
		if (n_clone < NUM_OF_PRODUCTOR) //生產者
		{
			product(n_clone + 1);
		}
		else if (n_clone < NUM_OF_PROCESS) //消費者
		{
			consume(n_clone + 1);
		}
	}

	return 0;
}

void newProcess(int n_clone)
{
	PROCESS_INFORMATION pi;
	STARTUPINFO si;
	ZeroMemory(&si, sizeof(si));
	si.cb = sizeof(si);
	ZeroMemory(&pi, sizeof(pi));

	TCHAR application[MAX_PATH];
	//獲取當前進程加載程序所在目錄
	GetModuleFileName(NULL, application, sizeof(application));
	TCHAR cmd_line[MAX_PATH];
	sprintf(cmd_line, "\"%s\" %d", application, n_clone);

	// printf("application: %s\n",application);
	// printf("cmd_line: %s\n", cmd_line);

	if (CreateProcess(
			NULL, //lpApplicationName.若為空,則lpCommandLine必須指定可執行程序
			//若路徑中存在空格,必須使用引號框定
			cmd_line, //lpCommandLine
			//若lpApplicationName為空,lpCommandLine長度不超過MAX_PATH
			NULL,  //指向一個SECURITY_ATTRIBUTES結構體,這個結構體決定是否返回的句柄可以被子進程繼承,進程安全性
			NULL,  //	如果lpProcessAttributes參數為空(NULL),那么句柄不能被繼承。<同上>,線程安全性
			false, //	指示新進程是否從調用進程處繼承了句柄。句柄可繼承性
			0,	 //	指定附加的、用來控制優先類和進程的創建的標識符(優先級)
			//	CREATE_NEW_CONSOLE	新控制台打開子進程
			//	CREATE_SUSPENDED	子進程創建后掛起,直到調用ResumeThread函數
			NULL, //	指向一個新進程的環境塊。如果此參數為空,新進程使用調用進程的環境。指向環境字符串
			NULL, //	指定子進程的工作路徑
			&si,  //	決定新進程的主窗體如何顯示的STARTUPINFO結構體
			&pi   //	接收新進程的識別信息的PROCESS_INFORMATION結構體。進程線程以及句柄
			))
	{
		handle_of_process[n_clone] = pi.hProcess;
		handle_of_thread[n_clone] = pi.hThread;
		return;
	}
	else
	{
		printf("CreateProcess failed (%d).\n", GetLastError());
		return;
	}
}

void consume(int n_clone)
{
	DWORD pid = GetCurrentProcessId(); //當前進程id
	open_mux();
	attach_shm();

	for (int i = 0; i < 4; i++)
	{
		srand((unsigned int)time(NULL) + n_clone);
		Sleep((rand() % 4 + 1) * 1000);

		int read_data = read_resource();

		P(print_mux);
		time_t start_process_time = time(NULL);
		printf("consumer  #%6d, read  data %3d at %s", pid, read_data, ctime(&start_process_time));
		V(print_mux);
	}

	detach_shm();
	close_mux();
}

void product(int n_clone)
{
	DWORD pid = GetCurrentProcessId(); //當前進程id
	open_mux();
	attach_shm();

	for (int i = 0; i < 6; i++)
	{
		srand((unsigned int)time(NULL) + n_clone);
		Sleep((rand() % 2 + 1) * 1000);

		int write_data = write_resource();

		P(print_mux);
		time_t start_process_time = time(NULL);
		printf("productor #%6d, write data %3d at %s", pid, write_data, ctime(&start_process_time));
		V(print_mux);
	}

	detach_shm();
	close_mux();
}

void attach_shm()
{
	h_map = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, shared_mem_name.c_str());
	shm = (RESOURCE *)MapViewOfFile(
		h_map,
		FILE_MAP_ALL_ACCESS,
		0,
		0,
		0);
}

void detach_shm()
{
	//解除映射
	UnmapViewOfFile(shm);
}

void create_mux()
{
	empty_mux = CreateSemaphore(NULL, NUM_OF_RESOURCE, NUM_OF_RESOURCE, empty_mux_name.c_str());
	filled_mux = CreateSemaphore(NULL, 0, NUM_OF_RESOURCE, filled_mux_name.c_str());
	rw_mux = CreateSemaphore(NULL, 1, 1, rw_mux_name.c_str());
	print_mux = CreateSemaphore(NULL, 1, 1, print_mux_name.c_str());
}

void open_mux()
{
	empty_mux = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, empty_mux_name.c_str());
	filled_mux = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, filled_mux_name.c_str());
	rw_mux = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, rw_mux_name.c_str());
	print_mux = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, print_mux_name.c_str());
}

void close_mux()
{
	CloseHandle(empty_mux);
	CloseHandle(filled_mux);
	CloseHandle(rw_mux);
	CloseHandle(print_mux);
}

int write_resource()
{
	srand((unsigned int)time(NULL));
	int result = rand() % 100 + 11;
	P(empty_mux);
	P(rw_mux);
	shm->resource[shm->end_ptr] = result;
	shm->end_ptr++;
	if (shm->end_ptr >= NUM_OF_RESOURCE)
	{
		shm->end_ptr = 0;
	}
	V(rw_mux);
	V(filled_mux);
	return result;
}

int read_resource()
{
	int result = 0;
	P(filled_mux);
	P(rw_mux);
	result = shm->resource[shm->from_ptr];
	shm->from_ptr++;
	if (shm->from_ptr >= NUM_OF_RESOURCE)
	{
		shm->from_ptr = 0;
	}

	V(rw_mux);
	V(empty_mux);
	return result;
}

void P(HANDLE &mutex)
{
	WaitForSingleObject(mutex, INFINITE);
}

void V(HANDLE &mutex)
{
	ReleaseSemaphore(mutex, 1, NULL);
}

運行輸出

1575637575292

參考資料

Linux

Windows


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM