一、epoll簡介
epoll是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量並發連接中只有少量活躍的情況下的系統CPU利用率。
相對於select方法,主要優點有2個:
1. 支持一個進程打開大數目的socket描述符。
2. IO效率不隨FD數目增加而線性下降。
IO效率的提升的:
select/poll會因為監聽fd的數量而導致效率低下,因為它是輪詢所有fd,有數據就處理,沒數據就跳過,所以fd的數量增加會降低效率;而epoll只處理就緒的fd,它有一個就緒設備的隊列,每次只輪詢該隊列的數據,然后進行處理(就緒隊列的信息正是通過用戶自定義的結構來告知的)。
無論是select,poll還是epoll都需要內核把FD消息通知給用戶空間,如何避免不必要的內存拷貝就很重要,在這點上,epoll是通過內核與用戶空間mmap同一塊內存實現的。
而poll與select的主要區別在於,select需要為讀、寫、異常事件分配創建一個描述符集合,最后輪詢的時候,需要分別輪詢這三個集合。而poll只需要一個集合,在每個描述符對應的結構上分別設置讀、寫、異常事件,最后輪詢的時候,可以同時檢查三種事件。poll與select在處理思想上是同一個層次,當然poll相對於select又優化,而epoll,則是完全不同的機制,有本質上的區別。
EPOLL事件有兩種模型 Level Triggered (LT) 和 Edge Triggered (ET):
LT(level triggered,水平觸發模式)是缺省的工作方式,並且同時支持 block 和 non-block socket。在這種做法中,內核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯誤可能性要小一點。
ET(edge-triggered,邊緣觸發模式)是高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變為就緒時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,並且不會再為那個文件描述符發送更多的就緒通知,等到下次有新的數據進來的時候才會再次出發就緒事件。如果一直不對這個fd進行I/O操作,導致fd變為未就緒時,內核同樣不會發送更多的通知,因為only once。所以這種方式下,出錯率比較高,需要增加一些檢測程序。
二、epoll的函數
引用頭文件:
#include <sys/epoll.h>
1. 創建epoll fd函數
int epoll_create(int size);
epoll_create()創建一個epoll的事例,通知內核需要監聽size個fd。size指的並不是最大的后備存儲設備,而是衡量內核內部結構大小的一個提示。當創建成功后,會占用一個fd,所以記得在使用完之后調用close(),否則fd可能會被耗盡。
自從Linux2.6.8版本以后,size值其實是沒什么用的,不過要大於0,因為內核可以動態的分配大小,所以不需要size這個提示了。
另,int epoll_create1(int flag); 是在linux 2.6.27中加入的函數。
當flag是0時,表示和epoll_create函數完全一樣,不需要size的提示了。
當flag = EPOLL_CLOEXEC,創建的epfd會設置FD_CLOEXEC
當flag = EPOLL_NONBLOCK,創建的epfd會設置為非阻塞
一般用法都是使用EPOLL_CLOEXEC.
2. epoll事件的注冊函數
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
第一個參數epfd,為epoll_create返回的的epoll fd。
第二個參數op表示操作值。有三個操作類型,
EPOLL_CTL_ADD // 注冊目標fd到epfd中,同時關聯內部event到fd上 EPOLL_CTL_MOD // 修改已經注冊到fd的監聽事件 EPOLL_CTL_DEL // 從epfd中刪除/移除已注冊的fd,event可以被忽略,也可以為NULL
第三個參數fd表示需要監聽的fd。
第四個參數event表示需要監聽的事件,
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */
events參數是一個枚舉的集合,可以用” | “來增加事件類型,枚舉如下:
EPOLLIN //表示對應的文件描述符可以讀(包括對端SOCKET正常關閉); EPOLLOUT //表示對應的文件描述符可以寫; EPOLLPRI //表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來); EPOLLERR //表示對應的文件描述符發生錯誤;epoll_wait會一直等待這個事件,所以一般沒必要設置這個屬性。 EPOLLHUP //表示對應的文件描述符被掛斷;epoll_wait會一直等待這個事件,所以一般沒必要設置這個屬性。 EPOLLET //將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。epoll的默認工作方式是LT EPOLLRDHUP //(since Linux 2.6.17)表示套接字關閉了連接,或者關閉了正寫一半的連接。 EPOLLONESHOT //(since Linux 2.6.2)只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里。
用戶可以用 epoll_data 這個 union 在 epoll_event 里面附帶一些自定義的信息,這個 epoll_data 會隨着 epoll_wait 返回的 epoll_event 一並返回。
epoll_data是給用戶自由使用的,epoll 不關心里面的內容。一般真正使用起來,事實上第一個就足夠了,也就是void *,用來包裝任何自定義的結構體(epoll_data是一個聯合體,只能使用其中一個,fd等是提供給較簡單的應用場景來方便使用的)。
3. epoll等待事件函數
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);
第一個參數:表示epoll_wait等待epfd上的事件,由epoll_create 生成的epoll專用的文件描述符。
第二個參數:events指針攜帶有epoll_data_t數據,用於回傳代處理事件的數組。
第三個參數:maxevents當前需要監聽的所有socket句柄數。
第四個參數:timeout表示超時時間(單位:毫秒)。
epoll_pwait(since linux 2.6.19)允許一個應用程序安全的等待,直到fd設備准備就緒,或者捕獲到一個信號量。其中sigmask表示要捕獲的信號量。
返回值:函數如果等待成功,則返回fd的數字;0表示等待fd超時,其他錯誤號請查看errno
三、實例
一個發送壓力工具的Demo,主要是演示相關技術的使用。
在linux下使用多線程、epoll編程、socket編程技術實現,系統設置為'ulimit -n' > 10240,可以穩定達到短連接上萬的連接,長連接5000個。
設計要點如下:
1. 主函數中循環讀取請求數據,新建的3個子線程分別去建立連接,發送請求,接收請求。會初始化2個epoll來分別監聽可以寫數據(發送請求)的事件和可以讀數據(接收請求)的事件。
2. 使用的數據結構為,將socket句柄和相關操作封裝在query類中,該query類帶有前向和后向指針,該query類會在注冊epoll監聽隊列時通過用戶結構體按個指針參數傳進去。有3個query類的雙鏈表分別表示空閑隊列freeList,准備好發送請求隊列readyList,和工作隊列workList,這3個隊列會被多線程操作,故使用共享鎖來排斥性訪問。之所以使用雙鏈表結構是便於處理epoll事件時,直接將query切換隊列,而無需去遍歷。
3. 主線程等待freeList不為空,則取下其中一個結點,讀取請求數據,填充到該結點中。並將結點從freeList隊列轉移到readyList隊列。
4. 建立連接子線程會循環等待發送時間到來,以20ms為間隔,每次都新建‘壓力值*20/1000’個socket連接,建立成功后則將這些結點加入到send_epoll的隊列上。並將結點從readyList隊列轉移到workList隊列。
5. 發送請求子線程會循環等待send_epoll上發生的寫事件,並處理就緒的query類,發送請求。請求發送成功后,將該結點從send_epoll的隊列中刪除,注冊到recv_epoll的隊列上。
6. 接收請求子線程會循環等待recv_epoll上發生的讀事件,並處理就緒的query類,接收請求。接收請求成功后,將結點從recv_epoll的隊列上刪除,並將結點放回到freeList或readyList。
/*FileName: myPressTool.cpp*/ #include <stdlib.h> #include <unistd.h> #include <sys/epoll.h> #include <pthread.h> #include <signal.h> #include "conf.h" #include "query.h" #include "query_list.h" #include "control_send.h" #include "send_req.h" #include "recv_res.h" #include "data.h" //全局變量定義 conf_t g_conf; conf_t* g_pconf = NULL; /*配置*/ CQuery* g_pfree_list = NULL; /*無數據的CQuery隊列*/ CQuery* g_pready_list = NULL; /*有數據准備建立連接的CQuery隊列*/ CQuery* g_pwork_list = NULL; /*發送或接收結果狀態的CQuery隊列*/ int g_send_epoll_fd; int g_recv_epoll_fd; bool g_over = false; size_t g_query_num = 0; int check_res_callback(char* buffer,int len){ // //printf("call call_back funtion %s %d\n",buffer,len); return 0; } int init_main() { g_pconf = &g_conf; if(0 != load_config()){return -1;} /*初始化g_pconf變量*/ /*初始化CQuery隊列*/ g_query_num = g_pconf->velocity * g_pconf->BUFFER_NUM; g_pfree_list = new CQuery[g_query_num]; if( NULL == g_pfree_list ){ return -1; } g_pfree_list[0].set_pre_query(NULL); g_pfree_list[0].set_next_query(&g_pfree_list[1]); for(size_t i=1;i<g_query_num-1;i++){ g_pfree_list[i].set_pre_query(&g_pfree_list[i-1]); g_pfree_list[i].set_next_query(&g_pfree_list[i+1]); } g_pfree_list[g_query_num-1].set_pre_query(&g_pfree_list[g_query_num-2]); g_pfree_list[g_query_num-1].set_next_query(NULL); /* init epoll list */ if(0 > (g_send_epoll_fd = epoll_create(g_query_num))){ /*創建發送epoll,其參數已無意義,只需要一個非負數即可*/ return -2; } if(0 > (g_recv_epoll_fd = epoll_create(g_query_num))){ /*創建接收epoll,其參數已無意義,只需要一個非負數即可*/ return -2; } /*初始化共享鎖*/ init_query_list_lock(); return 0; } int clean_main() { CQuery* tp = g_pwork_list; CQuery* tdel = NULL; while(NULL != tp){ tp->close_socket(); tdel=tp; tp=tp->get_next_query(); delete tdel; } tp = g_pready_list; while(NULL != tp){ tp->close_socket(); tdel=tp; tp=tp->get_next_query(); delete tdel; } tp = g_pfree_list; while(NULL != tp){ tp->close_socket(); tdel=tp; tp=tp->get_next_query(); delete tdel; } /*關閉epoll socket*/ close(g_send_epoll_fd); close(g_recv_epoll_fd); /*銷毀共享鎖*/ destroy_query_list_lock(); return 0; } int main() { pthread_t control_pid; pthread_t send_pid; pthread_t recv_pid; if(0 != init_main()){return -1;} /*初始化全局變量*/ //忽略SIGPIPE信號,該信號默認是使當前程序退出。當目標機器的socket已經關閉連接時,再調用write()發送數據會收到一個RST響應,第二次調用write()發送數據時會先調用SIGPIPE響應函數,然后write返回-1,errno號為EPIPE(32) signal(SIGPIPE, SIG_IGN); //新建3個子線程 if(0 != pthread_create(&control_pid, NULL, control_send_main, NULL)){clean_main();return -2;} if(0 != pthread_create(&send_pid, NULL, send_req_main, NULL)){clean_main();return -2;} if(0 != pthread_create(&recv_pid, NULL, recv_res_main, (void*)check_res_callback)){clean_main();return -2;} //循環讀取數據,數據全部讀完返回1,free隊列用完返回0 while(0 == read_data()){ usleep(DEFAULT_SEND_EACH_TIME); /*等待一個發送間隔后再讀取數據*/ //printf("======WARNNING: free list is NULL!\n"); } //等待使用中的隊列為空再結束 while( NULL != g_pwork_list ){ usleep(100000); } g_over = true; /*通知子線程結束*/ pthread_join(send_pid,NULL); pthread_join(recv_pid,NULL); pthread_join(control_pid,NULL); if(0 != clean_main()){return -3;} /*清理全局變量*/ return 0; }
/*FileName: control_send.h*/ #ifndef __CONTROL_SEND_H__ #define __CONTROL_SEND_H__ #include <sys/time.h> #include <unistd.h> #include <sys/epoll.h> #include "query_list.h" #include "conf.h" //定義每次發送時間,默認為10毫秒 #define DEFAULT_SEND_EACH_TIME 10000 void* control_send_main(void *); void prepare_to_send(); bool wait_send_time(); void init_c_data(); extern int g_send_epoll_fd; extern conf_t* g_pconf; extern bool g_over; typedef struct _control_send_thread_data{ struct timeval send_time; struct timeval current_time; int start_time; int compute_time; size_t current_velocity; //修正壓力, float fix_velocity; }control_send_thread_data; #endif
/*FileName: control_send.cpp*/ #include "control_send.h" control_send_thread_data c_data; void init_c_data(){ gettimeofday(&c_data.send_time,NULL); c_data.start_time = c_data.send_time.tv_sec; c_data.compute_time = c_data.start_time; c_data.current_velocity = g_pconf->velocity; c_data.fix_velocity =0.0; //計算進位 if(c_data.send_time.tv_usec + DEFAULT_SEND_EACH_TIME > 999999){ c_data.send_time.tv_usec = c_data.send_time.tv_usec + DEFAULT_SEND_EACH_TIME - 1000000; c_data.send_time.tv_sec++; } else { c_data.send_time.tv_sec = c_data.send_time.tv_sec; c_data.send_time.tv_usec = c_data.send_time.tv_usec + DEFAULT_SEND_EACH_TIME; } } //循環等待到達發送時間,本函數保證每隔DEFAULT_SEND_EACH_TIME=10ms可以准許發送一次 bool wait_send_time(){ struct timeval tv; while(true){ /*等待當前時間到達發送時間,在prepare_to_send()函數中計算下次的發送時間*/ gettimeofday(&tv,NULL); if((tv.tv_usec > c_data.send_time.tv_usec && tv.tv_sec == c_data.send_time.tv_sec) || (tv.tv_sec - c_data.send_time.tv_sec)*1000000 + tv.tv_usec - c_data.send_time.tv_usec > 0 ){ c_data.current_time.tv_sec =tv.tv_sec; /*准許發送,記錄當前時間*/ c_data.current_time.tv_usec = tv.tv_usec; return true; } usleep(DEFAULT_SEND_EACH_TIME/100); /*等待發送間隔的100分之一*/ } return false; } void* control_send_main(void *){ init_c_data(); while(!g_over){ if(wait_send_time()){ prepare_to_send(); } } return NULL; } void prepare_to_send(){ /*計算下次發送的時間*/ int time_pass = (c_data.current_time.tv_sec - c_data.send_time.tv_sec)*1000000 + (c_data.current_time.tv_usec-c_data.send_time.tv_usec); if(0 > time_pass){ time_pass =0; /*出錯情況,修正后繼續發送*/ } if(c_data.current_time.tv_usec + DEFAULT_SEND_EACH_TIME - time_pass > 999999){ /*跨秒情況*/ c_data.send_time.tv_usec = c_data.current_time.tv_usec + DEFAULT_SEND_EACH_TIME - time_pass - 1000000; c_data.send_time.tv_sec = c_data.current_time.tv_sec+1; } else { /*未跨秒情況*/ c_data.send_time.tv_sec = c_data.current_time.tv_sec; c_data.send_time.tv_usec = c_data.current_time.tv_usec + DEFAULT_SEND_EACH_TIME - time_pass; } /*計算本次需要發送的請求數*/ int velocity = c_data.current_velocity * ( DEFAULT_SEND_EACH_TIME)/1000000 ; c_data.fix_velocity += float(c_data.current_velocity) * DEFAULT_SEND_EACH_TIME /1000000 - velocity; /*使用浮點數計算對壓力值的誤差進行積累,並在超過1后進行修正*/ if(c_data.fix_velocity > 1.0f){ velocity += 1; c_data.fix_velocity -= 1.0f; } size_t try_num=0; size_t keep_alive_num=0; CQuery* work_query; for(int i=0;i<velocity;try_num++){ /*為每個請求建立TCP連接*/ while( NULL == (work_query = get_ready_query()) ){ usleep(DEFAULT_SEND_EACH_TIME/100); //printf("============WARNNING: ready list is NULL!\n"); } /*建立連接*/ if(g_pconf->keepAlive && work_query->is_sock_ok()){ /*長連接,優先使用已有連接*/ //printf("keepAlive use old connection.\n"); struct epoll_event evt; evt.events = EPOLLERR | EPOLLET | EPOLLHUP | EPOLLOUT; evt.data.ptr = work_query; if(0 > epoll_ctl(g_send_epoll_fd,EPOLL_CTL_ADD,work_query->get_socket(),&evt)){ add_free_list(work_query); continue; } keep_alive_num++; } else { /*短連接*/ if( 0 > work_query->make_tcp_connect(g_pconf->ip, g_pconf->port) ){ //printf("make_tcp_connect failed: %d.\n", work_query->check_socket_err()); work_query->close_socket(); add_free_list(work_query); continue; } /*把建立的socket連接添加到send_epoll的監聽隊列上*/ struct epoll_event evt; evt.events = EPOLLERR | EPOLLHUP | EPOLLOUT; evt.data.ptr = work_query; if(0 > epoll_ctl(g_send_epoll_fd,EPOLL_CTL_ADD,work_query->get_socket(),&evt)){ //printf("epoll_ctl failed.\n"); work_query->close_socket(); add_free_list(work_query); continue; } } /*建立連接成功,放入work隊列*/ add_work_list(work_query); i++; } printf("Add to work list, send: %d, keep_alive: %d, try-failed: %d.\n", velocity, keep_alive_num, try_num-velocity); }
/*FileName: send_req.h*/ #ifndef __SEND_REQ_H__ #define __SEND_REQ_H__ #include <stddef.h> #include <stdlib.h> #include <sys/epoll.h> #include "query.h" #include "query_list.h" void* send_req_main(void*); extern int g_send_epoll_fd; extern int g_recv_epoll_fd; extern bool g_over; extern size_t g_query_num; #endif
/*FileName: send_req.cpp*/ #include "send_req.h" void* send_req_main(void*){ CQuery* pQuery = NULL; struct epoll_event ep_evt[g_query_num]; while(!g_over){ int ready_num = epoll_wait(g_send_epoll_fd,ep_evt,g_query_num,TIME_OUT); /*等待事件*/ //printf("send event num: %d.\n", ready_num); size_t fin_num=0, err_num=0; for(int i=0;i<ready_num;i++){ pQuery = (CQuery*)ep_evt[i].data.ptr; epoll_ctl(g_send_epoll_fd,EPOLL_CTL_DEL,pQuery->get_socket(),NULL); /*不再在send_epoll上監聽該socket*/ if(ep_evt[i].events & EPOLLOUT){ /*為寫事件*/ if(!pQuery->is_sock_ok() || 0 > pQuery->send_query()){ /*發送請求數據*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; continue; } struct epoll_event evt; evt.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLPRI; evt.data.ptr = pQuery; if(0 > epoll_ctl(g_recv_epoll_fd,EPOLL_CTL_ADD,pQuery->get_socket(),&evt)){ /*注冊在recv_epoll的監聽隊列上*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; continue; } fin_num++; } else { /*不是寫事件*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; } } //printf("send done. event: %d, finish: %d, error: %d.\n", ready_num, fin_num, err_num); } return NULL; }
/*FileName: recv_res.h*/ #ifndef __RECV_RES_h__ #define __RECV_RES_h__ #include <stddef.h> #include <stdlib.h> #include <sys/epoll.h> #include "conf.h" #include "query.h" #include "query_list.h" #include "data.h" typedef int (*CALL_BACK)(char*, int); void* recv_res_main(void*); extern size_t g_query_num; extern int g_recv_epoll_fd; extern conf_t* g_pconf; extern bool g_over; #endif
/*FileName: recv_res.cpp*/ #include "recv_res.h" void* recv_res_main(void* call_back_funtion){ CQuery* pQuery = NULL; struct epoll_event ep_evt[g_query_num]; while(!g_over){ int ready_num = epoll_wait(g_recv_epoll_fd,ep_evt,g_query_num,TIME_OUT); //printf("recv event num: %d.\n", ready_num); size_t fin_num=0, alive_num=0, err_num=0; for(int i=0;i<ready_num;i++){ /*循環處理每個就緒的事件*/ pQuery =(CQuery*) ep_evt[i].data.ptr; epoll_ctl(g_recv_epoll_fd,EPOLL_CTL_DEL,pQuery->get_socket(),NULL); if(ep_evt[i].events & EPOLLIN){ /*讀事件*/ if( !pQuery->is_sock_ok() || 0 > pQuery->recv_reply()){ /*接收請求數據*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; continue; } //調用callBack函數校驗數據 if(NULL != call_back_funtion){ (*((CALL_BACK)call_back_funtion))(pQuery->get_query_buffer(), pQuery->get_query_len()); } /*接收數據完畢*/ if( g_pconf->keepAlive){ /*長連接模式,重用socket連接,重新裝填數據后放入ready隊列*/ if( 0 != _read_data(pQuery) ){ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; } del_work_list(pQuery); add_ready_list(pQuery); alive_num++; } else { /*非長連接模式,直接關閉socket后放入free隊列*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); fin_num++; } } else { /*非讀事件*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; } } //printf("recv done. event: %d, finish: %d, alive: %d, error: %d.\n", ready_num, fin_num+alive_num, alive_num, err_num); } return NULL; }
/*FileName: conf.h*/ #ifndef __CONF_H__ #define __CONF_H__ #include <string.h> typedef struct _conf_t{ int velocity; /*目標壓力*/ char ip[16]; /*目標主機ip*/ int port; /*目標主機端口*/ bool keepAlive; /*是否保存長連接*/ int BUFFER_NUM; /*申請buffer的基數,buffer數=(BUFFER_NUM*velocity) */ }conf_t; int load_config(); extern conf_t* g_pconf; #endif
/*FileName: conf.cpp*/ #include "conf.h" int load_config() { g_pconf->velocity = 5000; strncpy(g_pconf->ip, "10.26.97.46", sizeof(g_pconf->ip)); g_pconf->port = 8183; g_pconf->keepAlive = true; if(g_pconf->keepAlive)g_pconf->BUFFER_NUM = 2; else g_pconf->BUFFER_NUM = 10; return 0; }
/*FileName: data.h*/ #ifndef __DATA_H__ #define __DATA_H__ #include <string.h> #include "query_list.h" int _read_data( CQuery* pQuery ); int read_data(); #endif
/*FileName: data.cpp*/ #include "data.h" int read_data(){ CQuery* pQuery=NULL; size_t send_num=0; int read_ret=0; while( NULL != (pQuery = get_free_query()) ){ if( 0 != (read_ret=_read_data(pQuery)) ){ add_free_list(pQuery); if( 1 == read_ret ){ /*文件讀完了*/ return 1; } else { /*讀取遇到錯誤,繼續嘗試*/ continue; } } add_ready_list(pQuery); /*添加到ready隊列*/ send_num++; } /* int flen=0,rlen=0,wlen=0; CQuery* tp = g_pfree_list; while(NULL != tp){ tp = tp->get_next_query(); flen++; } tp = g_pready_list; while(NULL != tp){ tp = tp->get_next_query(); rlen++; } tp = g_pwork_list; while(NULL != tp){ tp = tp->get_next_query(); wlen++; } */ //printf("Add to ready list: %d. free: %d, ready: %d, work: %d.\n", send_num, flen, rlen, wlen); return 0; } int _read_data( CQuery* pQuery ){ char* request = new char[MAX_QUERY_LEN+1]; char* fun_type = "GET"; char* url = "/index.html"; char* accept_type = "html/text"; char* ip = "127.0.0.1"; int port = 80; char* connection_type = NULL; if(g_pconf->keepAlive){ connection_type = "Keep-Alive"; }else{ connection_type = "Close"; } snprintf(request, MAX_QUERY_LEN+1, "%s %s HTTP/1.1\r\nAccept: %s\r\nHost: %s:%d\r\nConnection: %s\r\n\r\n", fun_type, url, accept_type, ip, port, connection_type); pQuery->set_query(request, strnlen(request, MAX_QUERY_LEN)); /*讀入數據到pQuery*/ delete []request; return 0; }
/*FileName: query.h*/ #ifndef __QUERY_H__ #define __QUERY_H__ #include <stdlib.h> #include <unistd.h> #include <stdio.h> #include <errno.h> #include <sys/epoll.h> #include <sys/socket.h> #include <fcntl.h> #include <sys/types.h> #include <arpa/inet.h> #include <netinet/tcp.h> //#include<linux/tcp.h> #include "conf.h" /*接收數據的緩沖大小*/ #define MAX_QUERY_LEN 4096 #define TIME_OUT 1000 typedef enum _QUERY_STATE_e{ QUERY_STATE_SEND=1, QUERY_STATE_RECV, QUERY_STATE_IDLE }QUERY_STATE; class CQuery{ public: CQuery() :m_socket_fd(-1) ,m_query_len(-1) { pPre_query=NULL; pNext_query=NULL; m_state = QUERY_STATE_IDLE; m_str_Query[0]='\0'; } ~CQuery(); /*操作socket*/ int make_tcp_connect(const char* pIP,const int port); int send_query(); int recv_reply(); int close_socket(); bool is_sock_ok(); int check_socket_err(); /*get & set操作*/ int set_query_sock(int sock); int get_socket(); int set_query(const char * pBuf,const int buf_len); char* get_query_buffer(); int get_query_len(); int set_pre_query(CQuery* pQuery); CQuery* get_pre_query(); int set_next_query(CQuery* pQuery); CQuery* get_next_query(); private: int m_socket_fd; //socket fd char m_str_Query[MAX_QUERY_LEN+1]; //接收的數據 int m_query_len; // query長度 CQuery* pPre_query; //上一個req CQuery* pNext_query; //下一個req QUERY_STATE m_state; }; extern conf_t* g_pconf; #endif
/*FileName: query.cpp*/ #include "query.h" inline int SetSockNonblock(int sockfd) { int flag = 0; if(0 > (flag = fcntl(sockfd, F_GETFL, 0))){ /*fcntl()針對(文件)描述符提供控制;F_GETFL 取得文件描述詞狀態旗標,此旗標為open()的參數flags。*/ return -1; } if(0 > fcntl(sockfd, F_SETFL, flag | O_NONBLOCK)){ /*F_SETFL 設置文件描述詞狀態旗標,參數arg為新旗標,但只允許O_APPEND、O_NONBLOCK和O_ASYNC位的改變,其他位的改變將不受影響。此處設置為非阻塞方式*/ return -1; } return 0; } CQuery::~CQuery(){ close_socket(); }; //建立socket並對實參給出的pIP:port建立連接,就緒后保存到m_socket_fd待用 int CQuery::make_tcp_connect(const char* pIP,const int port){ if((NULL == pIP) || (0 == port)){ return -99; } if( QUERY_STATE_IDLE != m_state ){ /*如果已經建立了socket,就先關閉它再繼續;socket都是在調用該函數時才關閉,故必須有這個判斷和關閉*/ close_socket(); } if(0 > (m_socket_fd = socket(AF_INET, SOCK_STREAM, 0))){ /*建立socket,參數表示面向網絡的連接,並且是面向連接的基於TCP的應用*/ m_socket_fd = -1; return -10; } int one = 1; if(setsockopt(m_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(int)) < 0) /*設置socket參數:允許套接口和一個已在使用中的地址捆綁*/ { close_socket(); return -2; } one = 1; if(setsockopt(m_socket_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(int)) < 0) /*設置socket參數:禁止Nagle算法。Nagle算法通過將未確認的數據存入緩沖區直到蓄足一個包一起發送的方法,來減少主機發送的零碎小數據包的數目。*/ { close_socket(); return -3; } struct linger m_linger; m_linger.l_onoff=1; m_linger.l_linger=0; if(setsockopt(m_socket_fd,SOL_SOCKET,SO_LINGER,(const char*)&m_linger,sizeof(m_linger)) <0) /*設置socket參數:如關閉(closesocket()調用已執行)時有未發送數據,則逗留。*/ { close_socket(); return -4; } if(0 > SetSockNonblock(m_socket_fd)){ /*設置fd參數:設置為非阻塞模式*/ close_socket(); return -5; } struct sockaddr_in servaddr; /*保存服務器地址,用於connect()函數*/ memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); /*將一個無符號短整型的主機數值轉換為網絡字節順序,即大尾順序(big-endian)。網絡字節順序是TCP/IP中規定好的一種數據表示格式,它與具體的CPU類型、操作系統等無關,從而可以保證數據在不同主機之間傳輸時能夠被正確解釋,網絡字節順序采用big-endian排序方式。*/ if(0 >= inet_pton(AF_INET, pIP, &(servaddr.sin_addr))){ /*Linux下IP地址轉換函數,可以在將IP地址在“點分十進制”和“整數”之間轉換*/ close_socket(); return -6; } int ret_code = connect(m_socket_fd, (struct sockaddr *)&servaddr, sizeof(servaddr)); /*在客戶端的套接字上發送連接請求*/ if((0 > ret_code) && (EINPROGRESS != errno)){ close_socket(); return -20; } m_state=QUERY_STATE_SEND; return 0; } int CQuery::send_query(){ if( QUERY_STATE_SEND != m_state ){ close_socket(); return -1; } if ( !is_sock_ok() )return -2; int send_byte=0,have_send=0; while( have_send < m_query_len ){ /*用一個while循環不斷的寫入數據,但是循環過程中的buf參數和nbytes參數是我們自己來更新的。返回值大於0,表示寫了部分數據或者是全部的數據。返回值小於0,此時出錯了,需要根據錯誤類型進行相應的處理*/ send_byte = write(m_socket_fd, m_str_Query+have_send, m_query_len-have_send); /*將socket當普通文件進行讀寫就可以*/ if(send_byte <= 0){ if(errno== EINTR || EAGAIN == errno ){ /*EINTR 此調用被信號所中斷;EAGAIN 當使用不可阻斷I/O 時(O_NONBLOCK),若無數據可讀取則返回此值。*/ continue; } else{ close_socket(); return -2; } } have_send += send_byte; } m_state=QUERY_STATE_RECV; /*發送完畢,設置標記*/ return have_send; } int CQuery::recv_reply(){ if( QUERY_STATE_RECV != m_state ){ close_socket(); return -1; } if( !is_sock_ok() )return -2; int read_byte=0,have_read=0; while( (read_byte = read(m_socket_fd, m_str_Query+have_read, MAX_QUERY_LEN-have_read)) != 0){ if(read_byte < 0){ if(EINTR == errno){ read_byte = 0; continue; }else if(EAGAIN == errno){ /*數據未就緒,應該再嘗試讀;但是對於長連接,讀完數據后會一直是這個狀態。*/ if(g_pconf->keepAlive){read_byte=0;break;} read_byte = 0; continue; }else{ close_socket(); return -2; } } have_read += read_byte; if( have_read >= MAX_QUERY_LEN ) { break; } } if( have_read >= MAX_QUERY_LEN ) /*數據超過最大Query長度,則舍棄后面的數據*/ { char bad_buff[MAX_QUERY_LEN]; while( (read_byte = read(m_socket_fd, bad_buff, MAX_QUERY_LEN)) != 0 ){ if(read_byte < 0){ if(EINTR == errno ){ read_byte = 0; continue; }else{ close_socket(); return -2; } } } } m_state=QUERY_STATE_SEND; /*接收完畢,設計標記為可發送*/ return have_read; } int CQuery::close_socket(){ if(0 <= m_socket_fd){ while(close(m_socket_fd) && (EINTR == errno)); m_socket_fd = -1; } m_state = QUERY_STATE_IDLE; return 0; } bool CQuery::is_sock_ok(){ return (m_socket_fd > 0); } int CQuery::check_socket_err() { int error; int len = sizeof(error); if(getsockopt(m_socket_fd,SOL_SOCKET,SO_ERROR,&error,(socklen_t*)&len) < 0){ return -1; } else return error; } int CQuery::set_query_sock(int sock){ if(0 < m_socket_fd && 0 < sock){ close_socket(); } m_socket_fd = sock; } int CQuery::get_socket(){ return m_socket_fd; } int CQuery::set_query(const char* pBuf,const int buf_len){ memcpy(m_str_Query,pBuf,buf_len); m_query_len = buf_len; m_str_Query[buf_len] ='\0'; return 0; } char* CQuery::get_query_buffer(){ return m_str_Query; } int CQuery::get_query_len(){ return m_query_len; } int CQuery::set_pre_query(CQuery* pQuery){ pPre_query = pQuery; return 0; } CQuery* CQuery::get_pre_query(){ return pPre_query; } int CQuery::set_next_query(CQuery* pQuery){ pNext_query = pQuery; return 0; } CQuery* CQuery::get_next_query(){ return pNext_query; }
1 /*FileName: query_list.h*/ 2 #ifndef __QUERY_LIST_H__ 3 #define __QUERY_LIST_H__ 4 5 #include <pthread.h> 6 #include <stdio.h> 7 #include "query.h" 8 9 int init_query_list_lock(); 10 int destroy_query_list_lock(); 11 12 CQuery* get_free_query(); 13 int add_free_list(CQuery* pQuery); 14 CQuery* get_ready_query(); 15 int add_ready_list(CQuery* pQuery); 16 int add_work_list(CQuery* pQuery); 17 int del_work_list(CQuery* pQuery); 18 19 extern CQuery* g_pfree_list; 20 extern CQuery* g_pready_list; 21 extern CQuery* g_pwork_list; 22 23 #endif
/*FileName: query_list.cpp*/ #include "query_list.h" pthread_mutex_t free_list_mutex; pthread_mutex_t ready_list_mutex; pthread_mutex_t work_list_mutex; int init_query_list_lock(){ pthread_mutex_init(&free_list_mutex,NULL); pthread_mutex_init(&ready_list_mutex,NULL); pthread_mutex_init(&work_list_mutex,NULL); return 0; } int destroy_query_list_lock(){ pthread_mutex_destroy(&free_list_mutex); pthread_mutex_destroy(&ready_list_mutex); pthread_mutex_destroy(&work_list_mutex); return 0; } CQuery* get_free_query(){ CQuery* pQuery = NULL; pthread_mutex_lock(&free_list_mutex); if(NULL == g_pfree_list){ pthread_mutex_unlock(&free_list_mutex); return NULL; } pQuery = g_pfree_list; g_pfree_list = g_pfree_list->get_next_query(); if( NULL != g_pfree_list )g_pfree_list->set_pre_query(NULL); pthread_mutex_unlock(&free_list_mutex); pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); return pQuery; } int add_free_list(CQuery* pQuery){ pthread_mutex_lock(&free_list_mutex); if(NULL == g_pfree_list){ pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); g_pfree_list = pQuery; } else { pQuery->set_pre_query(NULL); pQuery->set_next_query(g_pfree_list); pQuery->get_next_query()->set_pre_query(pQuery); g_pfree_list = pQuery; } pthread_mutex_unlock(&free_list_mutex); return 0; } CQuery* get_ready_query(){ CQuery* pQuery = NULL; pthread_mutex_lock(&ready_list_mutex); if(NULL == g_pready_list){ pthread_mutex_unlock(&ready_list_mutex); return NULL; } pQuery = g_pready_list; g_pready_list = g_pready_list->get_next_query(); if( NULL != g_pready_list )g_pready_list->set_pre_query(NULL); pthread_mutex_unlock(&ready_list_mutex); pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); return pQuery; } int add_ready_list(CQuery* pQuery){ pthread_mutex_lock(&ready_list_mutex); if(NULL == g_pready_list){ pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); g_pready_list = pQuery; } else { pQuery->set_pre_query(NULL); pQuery->set_next_query(g_pready_list); pQuery->get_next_query()->set_pre_query(pQuery); g_pready_list = pQuery; } pthread_mutex_unlock(&ready_list_mutex); return 0; } int add_work_list(CQuery* pQuery){ pthread_mutex_lock(&work_list_mutex); if(NULL == g_pwork_list){ pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); g_pwork_list = pQuery; } else { pQuery->set_pre_query(NULL); pQuery->set_next_query(g_pwork_list); pQuery->get_next_query()->set_pre_query(pQuery); g_pwork_list = pQuery; } pthread_mutex_unlock(&work_list_mutex); return 0; } int del_work_list(CQuery* pQuery){ pthread_mutex_lock(&work_list_mutex); if( NULL != pQuery->get_pre_query() ){ pQuery->get_pre_query()->set_next_query(pQuery->get_next_query()); } else { g_pwork_list = pQuery->get_next_query(); } if( NULL != pQuery->get_next_query() ){ pQuery->get_next_query()->set_pre_query(pQuery->get_pre_query()); } pthread_mutex_unlock(&work_list_mutex); pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); return 0; }
#FileName: Makefile #Compile cmd: make CC = gcc CPP = g++ CXX = gcc EXECUTABLE = myPressTool all : $(EXECUTABLE) rm -f *.o clean : rm -f *.o $(EXECUTABLE) $(EXECUTABLE) : myPressTool.o conf.o control_send.o data.o query.o query_list.o recv_res.o send_req.o $(CPP) -o $@ $^ -lm -lpthread %.o : %.cpp $(CPP) -c $< -o $@
附錄A. 文件描述符fd
內核(kernel)利用文件描述符(file descriptor)來訪問文件。文件描述符是非負整數。打開現存文件或新建文件時,內核會返回一個文件描述符。讀寫文件也需要使用文件描述符來指定待讀寫的文件。
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核為每一個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。
文件描述符的范圍是0 ~ OPEN_MAX ,可以在如下位置查看:/usr/include/linux/limits.h:#define OPEN_MAX 256
有三個特殊的文件描述符,每個進程在創建時,都默認打開三個文件描述符:標准輸入(standard input)的文件描述符是 0,標准輸出(standard output)是 1,標准錯誤(standard error)是 2。POSIX 定義了 STDIN_FILENO、STDOUT_FILENO 和 STDERR_FILENO 來代替 0、1、2。這三個符號常量的定義位於頭文件 unistd.h。
//FileName: file_descriptor.cpp //Compile: g++ -o file_descriptor.out file_descriptor.cpp //Run: ./file_descriptor.out #include <stdio.h> #include <fcntl.h> int main(void) { int fd; fd = open("/tmp/tmp.txt", O_RDONLY | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); /*只讀打開,若不存在則創建,創建的文件屬性為擁有者讀寫執行,組只讀,其他只讀*/ //打開或創建的文件會獲取一個fd,且排在STDIN_FILENO、STDOUT_FILENO 和 STDERR_FILENO之后,應該為3 printf("fd:%d\n", fd); return 0; }
內核使用三種數據結構表示打開的文件:
(1)文件描述符表:
用戶區的一部分,除非通過使用文件描述符的函數,否則程序無法對其進行訪問。對進程中每個打開的文件,該表都包含一個文件描述符條目,fd值即該文件在該表中的下標索引。與每個文件描述符相關聯的是:
a)文件描述符標志(close_on_exec).
b))指向系統文件表的某個條目的指針。
(2)系統文件表:
為系統中所有的進程共享。對每個活動的open, 它都包含一個條目。每個系統文件表的條目都包含文件偏移量、以及。每個文件表項包含:
a)訪問模式(讀、寫、or 讀-寫)
b)(在內存索引節點表中)當前文件的偏移量
c)指向它的文件描述符表的條目計數
(3)內存索引節點表:
對系統中的每個活動的文件(被某個進程打開了),內存中索引節點表都包含一個條目。幾個系統文件表條目可能對應於同一個內存索引節點表的條目(不同進程打開同一個文件)。
若2個進程同時打開一個文件做讀操作,每個進程都有自己相對於文件的偏移量,而且讀入整個文件是獨立於另一個進程的;如果2個進程打開同一個文件做寫操作,寫操作是相互獨立的,每個進程都可以重寫另一個進程寫入的內容。
如果一個進程在open()以后又執行了close()函數,操作系統會刪除文件描述符表的對應條目(回收fd),和系統文件表的對應條目(若指向它的描述符表唯一,若不唯一則對該條目的計數減一,不刪除該條目,也不對內存索引節點表條目中的計數進行更改),並對內存索引節點表條目中的計數減1,如果自減以后變為0,說明沒有其他進程鏈接此文件,將索引節點表條目也刪除。
通過fork()創建子進程時,子進程繼承父進程的文件描述符表,即子進程完全復制父進程的文件描述符表。
對於父進程在fork()之前打開的文件來說,子進程都會繼承,與父進程共享相同的文件偏移量並相互影響(例如STDIN_FILENO、STDOUT_FILENO 和 STDERR_FILENO ),即對應文件描述符表的條目均指向相同的系統文件表條目。
在fork()之后父進程(或子進程)打開的文件,不與子進程(或父進程)共享文件偏移量並相互獨立無影響。即對應文件描述符表的條目指向新增的系統文件表條目,並不相同。
附錄B. 多進程
多進程的實例程序如下:
//FileName: multi_pid_example.cpp //Compile: g++ -o multi_pid_example.out multi_pid_example.cpp //Run: ./multi_pid_example.out #include <unistd.h> #include <sys/types.h> #include <stdio.h> #include <stdlib.h> #include <sys/wait.h> //linux進程使用的內存空間分為代碼段、堆棧段和數據段: //代碼段用來存放程序執行代碼,也有可能包含一些只讀的常數變量,例如字符串常量等;只讀可執行。 //堆棧段:Stack,存放局部變量和臨時變量如復雜表達式的中間變量;向下增長。 //數據段:其空間自下而上又分為:初始化數據區域:已賦值的全局變量和靜態變量;BSS:未初始化全局變量和靜態變量並在運行前由系統進行清零操作;堆;Heap,例如malloc動態申請的內存;堆向上增長,棧向下增長,兩者相對,之間有空洞區域。 //可執行代碼(linux下為ELF格式)中存儲了.text,.data,.bss三個段,其中.bss段只在Section header table中描述端的起始偏移位置和長度,並不分配實際的段數據,也不占用空間。 void print_exit() { printf("the exit pid:%d\n",getpid() ); /*打印進程pid*/ } int main () { pid_t pid; atexit( print_exit ); /*注冊該進程退出時的回調函數*/ //fork后,子進程會復制父進程的task_struct結構,並為子進程的堆棧分配物理頁。子進程和父進程使用相同的代碼段;子進程復制父進程的堆棧段和數據段。 //寫時復制:一般CPU都是以"頁"為單位來分配內存空間的,每一個頁都是實際物理內存的一個映像,象INTEL的CPU,其一頁在通常情況下是4086字節大小,而無論是數據段還是堆棧段都是由許多"頁"構成的,fork函數復制這兩個段,只是"邏輯"上的,並非"物理"上的,也就是說,實際執行fork時,物理空間上兩個進程的數據段和堆棧段都還是共享着的,當有一個進程寫了某個數據時,這時兩個進程之間的數據才有了區別,系統就將有區別的"頁"從物理上也分開。 //子進程一旦開始運行,子進程和父進程之間就已經不再共享任何數據了。它們再要交互信息時,只有通過進程間通信來實現。 pid=fork(); /*產生子進程*/ //fork產生子進程的表現就是它會返回2次 //一次返回0,順序執行下面的代碼。這是子進程。 //一次返回子進程的pid,也順序執行下面的代碼,這是父進程。 int i; if (pid < 0) printf("error in fork!"); else if (pid == 0) /* 子進程程序 */ for ( i = 1; i <10; i ++ ) { printf("This is child process\n"); sleep(1); } else { /* 父進程程序,此時pid值為子進程pid*/ for ( i = 1; i <5; i ++ ) { printf("This is process process\n"); sleep(1); } pid_t pr = wait(NULL); /*阻塞自己,等待一個(注意是一個,不是全部)已經變成僵屍的子進程,wait就會收集這個子進程的信息,並把它徹底銷毀后返回;輸入可以為&int類型來獲取子進程的返回值;返回為子進程的pid*/ printf("I catched a child process with pid of %d\n", pr); } return 0; }
exec函數的使用實例:
//FileName: exec_example.cpp //Compile: g++ -o exec_example.out exec_example.cpp //Run: ./exec_example.out #include <stdio.h> #include <stdlib.h> #include <sys/wait.h> #include <unistd.h> #include <sys/types.h> #include <string.h> #include <errno.h> char command[256]; int main() { int rtn; /*子進程的返回數值*/ while(1) { /* 從終端讀取要執行的命令 */ printf( ">" ); fgets( command, 256, stdin ); command[strlen(command)-1] = 0; if ( fork() == 0 ) { /* 子進程執行此命令 */ //在Linux中,exec函數族不止一個,它們分別是:execl,execlp,execle,execv,execve和execvp //一個進程一旦調用exec類函數,它本身就"死亡"了,系統把代碼段替換成新的程序的代碼,廢棄原有的數據段和堆棧段,並為新程序分配新的數據段與堆棧段,唯一留下的,就是進程號 execlp( command, command ); /* 如果exec函數返回,表明沒有正常執行命令,打印錯誤信息*/ perror( command ); exit( errno ); } else { /* 父進程, 等待子進程結束,並打印子進程的返回值 */ wait ( &rtn ); /*阻塞自己,等待有個子進程成為僵屍進程,清理后返回並將子進程的返回值放到rtn變量中*/ printf( " child process return %d\n", rtn ); } } }
進程之間通信的主要方式有:
1. 管道
#include <unistd.h> int pipe(int filedis[2]); /*建立無名管道。參數filedis返回兩個文件描述符:filedes[0]為管道里的讀取端,filedes[1]為管道里的寫入端。*/ //當管道中的數據被讀取后,管道為空。一個隨后的read()調用將默認的被阻塞,等待某些數據寫入。
#include<sys/types.h> #include<sys/stat.h> int mkfifo(const char * pathname,mode_t mode); /*建立實名管道。會依參數pathname建立特殊的FIFO文件,該FIFO文件其他進程都可以用讀寫一般文件的方式存取*/
2. 消息隊列
消息隊列是一種正逐漸被淘汰的通信方式,我們可以用流管道或者套接口的方式來取代它
3. 共享內存
#include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> int shmget(key_t key, size_t size, int shmflg); /*創建一個共享內存對象。成功會返回共享內存的標識符。key為0(IPC_PRIVATE)會建立新共享內存對象;
否則使用來源於ftok返回的IPC鍵值*/ void *shmat(int shmid, const void *shmaddr, int shmflg); /*連接共享內存標識符為shmid的共享內存,連接成功后把共享內存區對象映射到調用進程的地址空間,
隨后可像本地空間一樣訪問。成功會返回附加好的共享內存地址*/ int shmdt(const void *shmaddr); /*斷開與共享內存附加點的地址,shmaddr為連接的共享內存的起始地址*/ int shmctl(int shmid, int cmd, struct shmid_ds *buf); /*完成對共享內存的控制*/ key_t ftok( const char * fname, int id ); /*系統建立IPC通訊 (消息隊列、信號量和共享內存) 時必須指定一個ID值。通常情況下,該id值通過ftok函數得到。
fname就是你指定的文件名,如果要確保key_t值不變,要么確保ftok的文件不被刪除,要么不用ftok,指定一個固定的key_t值*/
4. 信號量
信號量是用來協調不同進程間的數據對象的,信號量是一個計數器,它用來記錄對某個資源(如共享內存)的存取狀況。
一般說來,為了獲得共享資源,進程需要執行下列操作:
(1) 測試控制該資源的信號量。
(2) 若此信號量的值為正,則允許進行使用該資源。進程將進號量減1。
(3) 若此信號量為0,則該資源目前不可用,進程進入睡眠狀態,直至信號量值大於0,進程被喚醒,轉入步驟(1)。
(4) 當進程不再使用一個信號量控制的資源時,信號量值加1。如果此時有進程正在睡眠等待此信號量,則喚醒此進程。
維護信號量狀態的是Linux內核操作系統而不是用戶進程。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semget(key_t key, int nsems, int semflg); /*創建信號量集標識,或者獲取與某個鍵關聯的信號量集標識*/ int semop(int semid,struct sembuf *sops,size_t nsops); /*改變一個或一組信號量的值。該函數是PV操作*/ int semtimedop(int semid, struct sembuf *sops, unsigned nsops, struct timespec *timeout); /*當semtimedop()調用致使進程進入睡眠時,
睡眠時間不能超過timeout參數指定的值*/ int semctl(int semid,int semnum,int cmd, union semun arg); /*用來對信號量進行控制操作,不同的操作是通過cmd參數來實現的*/
5. 套接字
套接字(socket)編程是實現Linux系統和其他大多數操作系統中進程間通信的主要方式之一。除了在異地的計算機進程間以外,套接口同樣適用於本地同一台計算機內部的進程間通信。
當兩個進程在本機上進行Socket通訊時,由於可以使用localhost環回地址,數據不用經過物理網卡,操作系統內核還可以進行某些優化。
面向無連接的套接字通信流程:
服務器:socket(), bind(), recvfrom(), sendto();
客戶端:socket(), bind(), sendto(), recvfrom();
面向連接的套接字通信流程:
服務器:socket(), bind(), listen(), accept(), read(), write();
客戶端:socket(), connect(), write(), read();
#include <sys/types.h> #include <sys/socket.h> //套接字這套函數同樣的功能對應系統函數和庫函數2套函數: //庫函數是語言本身的一部分,而系統函數是內核提供給應用程序的接口,屬於系統的一部分。 int socket(int domain, int type, int protocol); /*創建套接字。實際上"建立一個Socket"意味着為一個Socket數據結構分配存儲空間。 domain:指明所使用的協議族,通常為PF_INET,表示互聯網協議族(TCP/IP協議族); type:指定socket的類型: SOCK_STREAM(流式,面向連接例如TCP服務應用) 或SOCK_DGRAM(數據報式,無連接例如UDP服務應用),Socket接口還定義了原始Socket(SOCK_RAW),允許程序使用低層協議; protocol:通常賦值"0"。 Socket()調用返回一個整型socket描述符,你可以在后面的調用使用它。 */ int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen); /*設置某個套接字選項的值。 sockfd:標識一個套接口的描述字。 level:選項定義的層次;支持SOL_SOCKET、IPPROTO_TCP、IPPROTO_IP和IPPROTO_IPV6。 optname:需設置的選項。 optval:指針,指向存放選項值的緩沖區。 optlen:optval緩沖區長度。 若無錯誤發生,setsockopt()返回0 */ int getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen); /*獲取某個套接字選項的值*/ ssize_t send(int sockfd, const void *buf, size_t len, int flags); /*發送消息到另一個套接字。返回值為實際發送的字符個數*/ ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen); /*發送消息到另一
個套接字。返回值為實際發送的字符個數*/ ssize_t recv(int sockfd, void *buf, size_t len, int flags); /*從一個套接字接收消息。返回值為實際接收的字符個數*/ ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen); /*從一個套接字接收消息。返回值為
實際接收的字符個數*/ int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen); /*將套接字與IP地址/端口號綁定*/ int listen(int sockfd, int backlog); /*設置套接字接口的監聽狀態*/ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen); /*接受客戶端的連接請求,返回值為用於數據傳輸的文件描述符*/ char* sIp = "127.0.0.1"; int iPort = 80; struct sockaddr_in servaddr; /*保存服務器地址,用於connect()函數*/ memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(iPort); /*將一個無符號短整型的主機數值轉換為網絡字節順序,即大尾順序(big-endian)。網絡字節順序是TCP/IP中規定好
的一種數據表示格式,它與具體的CPU類型、操作系統等無關,從而可以保證數據在不同主機之間傳輸時能夠被正確解釋,網絡字節順序采用big-endian排序方式。*/ inet_pton(AF_INET, sIp, &(servaddr.sin_addr)) /*Linux下IP地址轉換函數,可以在將IP地址在“點分十進制”和“整數”之間轉換*/ int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); /*在客戶端的套接字上發送連接請求*/
Linux的進程和Win32的進程/線程比較
WIN32里同一個進程里各個線程之間是共享數據段的。這是與Linux的進程最大的不同。
在WIN32下,使用CreateThread函數創建線程,與Linux下創建進程不同,WIN32線程不是從創建處開始運行的,而是由CreateThread指定一個函數,線程就從那個函數處開始運行。
全局變量是子線程與父線程共享的,這就是與Linux進程最大的不同之處。
在Linux要實現類似WIN32的線程並不難,只要fork以后,讓子進程調用ThreadProc函數,並且為全局變量開設共享數據區就行了,但在WIN32下就無法實現類似fork的功能了。所以現在WIN32下的C語言編譯器所提供的庫函數雖然已經能兼容大多數Linux/UNIX的庫函數,但卻仍無法實現fork。
附錄C. 多線程
多線程和共享鎖的實例程序如下:
//FileName: multi_thread_example.cpp //Compile: g++ -lpthread -o multi_thread_example.out multi_thread_example.cpp //Run: ./multi_thread_example.out #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <pthread.h> void *function(void *arg); //定義共享鎖 pthread_mutex_t mutex; int main(int argc, char *argv[]) { int rc1,rc2; char *str1="aaaaaaaaaaaaaaaaa"; char *str2="bbbbbbbbbbbbbbbbb"; pthread_t thread1,thread2; //初始化共享鎖 pthread_mutex_init(&mutex,NULL); //創建線程1,創建后立刻運行 if(rc1 = pthread_create(&thread1,NULL,function,str1)) { fprintf(stdout,"thread 1 create failed: %d\n",rc1); } //創建線程2,創建后立刻運行 if(rc2=pthread_create(&thread2,NULL,function,str2)) { fprintf(stdout,"thread 2 create failed: %d\n",rc2); } //等待線程1和線程2運行結束;如果不加pthread_join函數,主線程會直接運行結束,從而導致子線程未運行完就也被結束。 pthread_join(thread1,NULL); pthread_join(thread2,NULL); return 0; } //線程函數 void *function(void *arg) { char *m; m = (char *)arg; //使用共享鎖隔離關鍵區域;以下區域打印字符,如果不隔離的話,2個線程會交替打印出類似ababab...的字符串 pthread_mutex_lock(&mutex); while(*m != '\0') { printf("%c",*m); fflush(stdout); m++; sleep(1); /*模擬長時間處理情況*/ } printf("\n"); //釋放共享鎖,以讓其他線程可以繼續執行該段代碼 pthread_mutex_unlock(&mutex); }
多線程編程的注意點:
1. 線程安全:
概念比較直觀。一般說來,一個函數被稱為線程安全的,當且僅當被多個並發線程反復調用時,它會一直產生正確的結果。
要確保函數線程安全,主要需要考慮的是線程之間的共享變量:
屬於同一進程的不同線程會共享進程內存空間中的全局區和堆,而私有的線程空間則主要包括棧和寄存器。
因此,對於同一進程的不同線程來說,每個線程的局部變量都是私有的,而全局變量、局部靜態變量、分配於堆的變量都是共享的。在對這些共享變量進行訪問時,如果要保證線程安全,則必須通過加鎖的方式。
2. 可重入:
概念基本沒有比較正式的完整解釋,但是它比線程安全要求更嚴格。
根據經驗,所謂“重入”,常見的情況是,程序執行到某個函數foo()時,收到信號,於是暫停目前正在執行的函數,轉到信號處理函數,而這個信號處理函數的執行過程中,又恰恰也會進入到剛剛執行的函數foo(),這樣便發生了所謂的重入。此時如果foo()能夠正確的運行,而且處理完成后,之前暫停的foo()也能夠正確運行,則說明它是可重入的。
要確保函數可重入,需滿足一下幾個條件:
1、不在函數內部使用靜態或全局數據
2、不返回靜態或全局數據,所有數據都由函數的調用者提供。
3、使用本地數據,或者通過制作全局數據的本地拷貝來保護全局數據。
4、不調用不可重入函數。
多進程和多線程編程的比較:
用多進程時每個進程都有自己的地址空間,線程則共享地址空間。
線程機制支持並發程序設計技術,在多處理器上能真正保證並行處理。
切換線程context的時候,windows比linux快一倍多。
Linux下不管是多線程編程還是多進程編程,最終都是用do_fork實現的多進程編程,只是進程創建時的參數不同,從而導致有不同的共享環境。linux把所有的線程都當作進程實現,Linux線程在核內是以輕量級進程的形式存在。
實際應用中基本上都是“進程+線程”的結合方式
轉載自:http://blog.csdn.net/zhiyuan411/article/details/18055595