Php5.6.15-fpm的運行機制源碼剖析


源碼版本:Php5.6.15

源碼目錄:sapi/fpm/fpm

說明:源碼的主要功能在上面直接注解

=============>>start<<================================

主進程信號初始化,依據收到的信號類型,進行處理

int fpm_signals_init_main() /* {{{ */
{
struct sigaction act; //create sigaction structure and bind signal handle function

// 創建sockpair,多路IO復用
if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) {
zlog(ZLOG_SYSERROR, "failed to init signals: socketpair()");
return -1;
}
// 設置非阻塞
if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) {
zlog(ZLOG_SYSERROR, "failed to init signals: fd_set_blocked()");
return -1;
}
//設置fd為保持狀態,不關閉
if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) {
zlog(ZLOG_SYSERROR, "falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)");
return -1;
}

memset(&act, 0, sizeof(act));
act.sa_handler = sig_handler; //設置信號處理函數
sigfillset(&act.sa_mask); //初始化信號集

//添加處理的信號集合
if (0 > sigaction(SIGTERM, &act, 0) ||
0 > sigaction(SIGINT, &act, 0) ||
0 > sigaction(SIGUSR1, &act, 0) ||
0 > sigaction(SIGUSR2, &act, 0) ||
0 > sigaction(SIGCHLD, &act, 0) ||
0 > sigaction(SIGQUIT, &act, 0)) {

zlog(ZLOG_SYSERROR, "failed to init signals: sigaction()");
return -1;
}
return 0;
}
/* }}} */

信號處理函數,根據相應的信號,把相對應的字符寫入sp[1]
static void sig_handler(int signo) /* {{{ */
{
static const char sig_chars[NSIG + 1] = {
[SIGTERM] = 'T',
[SIGINT] = 'I',
[SIGUSR1] = '1',
[SIGUSR2] = '2',
[SIGQUIT] = 'Q',
[SIGCHLD] = 'C'
};
char s;
int saved_errno;

if (fpm_globals.parent_pid != getpid()) {
/* prevent a signal race condition when child process
have not set up it's own signal handler yet */
return;
}

saved_errno = errno;
s = sig_chars[signo];
write(sp[1], &s, sizeof(s));
errno = saved_errno;
}
/* }}} */


子進程信號初始化,依據收到的信號類型,進行處理

int fpm_signals_init_child() /* {{{ */
{
struct sigaction act, act_dfl;

memset(&act, 0, sizeof(act));
memset(&act_dfl, 0, sizeof(act_dfl));

act.sa_handler = &sig_soft_quit; //調用信號處理函數,只處理SIGQUIT信號
act.sa_flags |= SA_RESTART;

act_dfl.sa_handler = SIG_DFL; //默認信號處理方法

//關閉socket對
close(sp[0]);
close(sp[1]);

if (0 > sigaction(SIGTERM, &act_dfl, 0) ||
0 > sigaction(SIGINT, &act_dfl, 0) ||
0 > sigaction(SIGUSR1, &act_dfl, 0) ||
0 > sigaction(SIGUSR2, &act_dfl, 0) ||
0 > sigaction(SIGCHLD, &act_dfl, 0) ||
0 > sigaction(SIGQUIT, &act, 0)) {

zlog(ZLOG_SYSERROR, "failed to init child signals: sigaction()");
return -1;
}
return 0;
}
/* }}} */

//子進程SIGQUIT信號處理函數
static void sig_soft_quit(int signo) /* {{{ */
{
int saved_errno = errno;

/* closing fastcgi listening socket will force fcgi_accept() exit immediately */
close(0);
if (0 > socket(AF_UNIX, SOCK_STREAM, 0)) {
zlog(ZLOG_WARNING, "failed to create a new socket");
}
fpm_php_soft_quit();
errno = saved_errno;
}
/* }}} */

 

void fpm_event_loop(int err) /* {{{ */
{
static struct fpm_event_s signal_fd_event;

/* sanity check */
if (fpm_globals.parent_pid != getpid()) {
return;
}

//創建一個fd集合,把fd[0]放入fd集合用於監聽fd變化,並設置相應處理函數fpm_got_signal
fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);
fpm_event_add(&signal_fd_event, 0);

/* add timers */
if (fpm_globals.heartbeat > 0) {
fpm_pctl_heartbeat(NULL, 0, NULL);
}

if (!err) {
fpm_pctl_perform_idle_server_maintenance_heartbeat(NULL, 0, NULL);

zlog(ZLOG_DEBUG, "%zu bytes have been reserved in SHM", fpm_shm_get_size_allocated());
zlog(ZLOG_NOTICE, "ready to handle connections");

#ifdef HAVE_SYSTEMD
fpm_systemd_heartbeat(NULL, 0, NULL);
#endif
}

while (1) {
struct fpm_event_queue_s *q, *q2;
struct timeval ms;
struct timeval tmp;
struct timeval now;
unsigned long int timeout;
int ret;

/* sanity check */
if (fpm_globals.parent_pid != getpid()) {
return;
}

fpm_clock_get(&now);
timerclear(&ms);

/* search in the timeout queue for the next timer to trigger */
q = fpm_event_queue_timer;
while (q) {
if (!timerisset(&ms)) {
ms = q->ev->timeout;
} else {
if (timercmp(&q->ev->timeout, &ms, <)) {
ms = q->ev->timeout;
}
}
q = q->next;
}

/* 1s timeout if none has been set */
if (!timerisset(&ms) || timercmp(&ms, &now, <) || timercmp(&ms, &now, ==)) {
timeout = 1000;
} else {
timersub(&ms, &now, &tmp);
timeout = (tmp.tv_sec * 1000) + (tmp.tv_usec / 1000) + 1;
}

ret = module->wait(fpm_event_queue_fd, timeout);

/* is a child, nothing to do here */
if (ret == -2) {
return;
}

if (ret > 0) {
zlog(ZLOG_DEBUG, "event module triggered %d events", ret);
}

/* trigger timers */
q = fpm_event_queue_timer;
while (q) {
fpm_clock_get(&now);
if (q->ev) {
if (timercmp(&now, &q->ev->timeout, >) || timercmp(&now, &q->ev->timeout, ==)) {
fpm_event_fire(q->ev);
/* sanity check */
if (fpm_globals.parent_pid != getpid()) {
return;
}
if (q->ev->flags & FPM_EV_PERSIST) {
fpm_event_set_timeout(q->ev, now);
} else { /* delete the event */
q2 = q;
if (q->prev) {
q->prev->next = q->next;
}
if (q->next) {
q->next->prev = q->prev;
}
if (q == fpm_event_queue_timer) {
fpm_event_queue_timer = q->next;
if (fpm_event_queue_timer) {
fpm_event_queue_timer->prev = NULL;
}
}
q = q->next;
free(q2);
continue;
}
}
}
q = q->next;
}
}
}
/* }}} */

 

static void fpm_got_signal(struct fpm_event_s *ev, short which, void *arg) /* {{{ */
{
char c;
int res, ret;
int fd = ev->fd;

do {
do {
res = read(fd, &c, 1); //從fd[0]中取值,放入變量c
} while (res == -1 && errno == EINTR);

if (res <= 0) {
if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
zlog(ZLOG_SYSERROR, "unable to read from the signal pipe");
}
return;
}
//依據信號c的不同,執行不同的邏輯
switch (c) {
// 子信號退出或者暫停
case 'C' : /* SIGCHLD */
zlog(ZLOG_DEBUG, "received SIGCHLD");
fpm_children_bury();
break;
// 收到終止進程的信號
case 'I' : /* SIGINT */
zlog(ZLOG_DEBUG, "received SIGINT");
zlog(ZLOG_NOTICE, "Terminating ...");
fpm_pctl(FPM_PCTL_STATE_TERMINATING, FPM_PCTL_ACTION_SET);
break;
// 同上
case 'T' : /* SIGTERM */
zlog(ZLOG_DEBUG, "received SIGTERM");
zlog(ZLOG_NOTICE, "Terminating ...");
fpm_pctl(FPM_PCTL_STATE_TERMINATING, FPM_PCTL_ACTION_SET);
break;
// 進程退出信號
case 'Q' : /* SIGQUIT */
zlog(ZLOG_DEBUG, "received SIGQUIT");
zlog(ZLOG_NOTICE, "Finishing ...");
fpm_pctl(FPM_PCTL_STATE_FINISHING, FPM_PCTL_ACTION_SET);
break;
// 重新打開日志信號
case '1' : /* SIGUSR1 */
zlog(ZLOG_DEBUG, "received SIGUSR1");
if (0 == fpm_stdio_open_error_log(1)) {
zlog(ZLOG_NOTICE, "error log file re-opened");
} else {
zlog(ZLOG_ERROR, "unable to re-opened error log file");
}

ret = fpm_log_open(1);
if (ret == 0) {
zlog(ZLOG_NOTICE, "access log file re-opened");
} else if (ret == -1) {
zlog(ZLOG_ERROR, "unable to re-opened access log file");
}
/* else no access log are set */

break;
// reload信號
case '2' : /* SIGUSR2 */
zlog(ZLOG_DEBUG, "received SIGUSR2");
zlog(ZLOG_NOTICE, "Reloading in progress ...");
fpm_pctl(FPM_PCTL_STATE_RELOADING, FPM_PCTL_ACTION_SET);
break;
}

if (fpm_globals.is_child) {
break;
}
} while (1);
return;
}
/* }}} */

下面對各種信號的處理做一下分析

子信號退出或者暫停是會發送信號SIGCHLD,最懂會調用fpm_children_bury函數,在Fpm_children.c中

void fpm_children_bury() /* {{{ */
{
int status;
pid_t pid;
struct fpm_child_s *child;
//子進程結束,返回pid執行循環,否則退出
while ( (pid = waitpid(-1, &status, WNOHANG | WUNTRACED)) > 0) {
char buf[128];
int severity = ZLOG_NOTICE;
int restart_child = 1;

child = fpm_child_find(pid); //獲取退出的子進程

// 正常退出
if (WIFEXITED(status)) {

snprintf(buf, sizeof(buf), "with code %d", WEXITSTATUS(status));

/* if it's been killed because of dynamic process management
* don't restart it automaticaly
*/
if (child && child->idle_kill) {
restart_child = 0;
}

if (WEXITSTATUS(status) != FPM_EXIT_OK) {
severity = ZLOG_WARNING;
}
// 收到退出信號
} else if (WIFSIGNALED(status)) {
const char *signame = fpm_signal_names[WTERMSIG(status)];
const char *have_core = WCOREDUMP(status) ? " - core dumped" : "";

if (signame == NULL) {
signame = "";
}

snprintf(buf, sizeof(buf), "on signal %d (%s%s)", WTERMSIG(status), signame, have_core);

/* if it's been killed because of dynamic process management
* don't restart it automaticaly
*/
if (child && child->idle_kill && WTERMSIG(status) == SIGQUIT) {
restart_child = 0;
}

if (WTERMSIG(status) != SIGQUIT) { /* possible request loss */
severity = ZLOG_WARNING;
}
// 暫停退出信號
} else if (WIFSTOPPED(status)) {

zlog(ZLOG_NOTICE, "child %d stopped for tracing", (int) pid);

if (child && child->tracer) {
//主進程通過tracer恢復子進程
child->tracer(child);
}

continue;
}

if (child) {
struct fpm_worker_pool_s *wp = child->wp;
struct timeval tv1, tv2;

fpm_child_unlink(child);

fpm_scoreboard_proc_free(wp->scoreboard, child->scoreboard_i);

fpm_clock_get(&tv1);

timersub(&tv1, &child->started, &tv2);

if (restart_child) {
if (!fpm_pctl_can_spawn_children()) {
severity = ZLOG_DEBUG;
}
zlog(severity, "[pool %s] child %d exited %s after %ld.%06d seconds from start", child->wp->config->name, (int) pid, buf, tv2.tv_sec, (int) tv2.tv_usec);
} else {
zlog(ZLOG_DEBUG, "[pool %s] child %d has been killed by the process management after %ld.%06d seconds from start", child->wp->config->name, (int) pid, tv2.tv_sec, (int) tv2.tv_usec);
}

fpm_child_close(child, 1 /* in event_loop */);

fpm_pctl_child_exited(); //主進程退出
//異常退出線程數大於配置項,reload
if (last_faults && (WTERMSIG(status) == SIGSEGV || WTERMSIG(status) == SIGBUS)) {
time_t now = tv1.tv_sec;
int restart_condition = 1;
int i;

last_faults[fault++] = now;

if (fault == fpm_global_config.emergency_restart_threshold) {
fault = 0;
}

for (i = 0; i < fpm_global_config.emergency_restart_threshold; i++) {
if (now - last_faults[i] > fpm_global_config.emergency_restart_interval) {
restart_condition = 0;
break;
}
}

if (restart_condition) {

zlog(ZLOG_WARNING, "failed processes threshold (%d in %d sec) is reached, initiating reload", fpm_global_config.emergency_restart_threshold, fpm_global_config.emergency_restart_interval);

fpm_pctl(FPM_PCTL_STATE_RELOADING, FPM_PCTL_ACTION_SET);
}
}

if (restart_child) {
// 如果子進程非正常退出,重新fork一個
fpm_children_make(wp, 1 /* in event loop */, 1, 0);

if (fpm_globals.is_child) {
break;
}
}
} else {
zlog(ZLOG_ALERT, "oops, unknown child (%d) exited %s. Please open a bug report (https://bugs.php.net).", pid, buf);
}
}
}
/* }}} */

PHP用的最多的是reload操作,對應於SIGUSR2信號,對應處理函數fpm_pctl

void fpm_pctl(int new_state, int action) /* {{{ */
{
switch (action) {
case FPM_PCTL_ACTION_SET :
if (fpm_state == new_state) { /* already in progress - just ignore duplicate signal */
return;
}
// fpm_state
switch (fpm_state) { /* check which states can be overridden */
case FPM_PCTL_STATE_NORMAL :
/* 'normal' can be overridden by any other state */
break;
case FPM_PCTL_STATE_RELOADING :
/* 'reloading' can be overridden by 'finishing' */
if (new_state == FPM_PCTL_STATE_FINISHING) break;
case FPM_PCTL_STATE_FINISHING :
/* 'reloading' and 'finishing' can be overridden by 'terminating' */
if (new_state == FPM_PCTL_STATE_TERMINATING) break;
case FPM_PCTL_STATE_TERMINATING :
/* nothing can override 'terminating' state */
zlog(ZLOG_DEBUG, "not switching to '%s' state, because already in '%s' state",
fpm_state_names[new_state], fpm_state_names[fpm_state]);
return;
}

fpm_signal_sent = 0;
fpm_state = new_state;

zlog(ZLOG_DEBUG, "switching to '%s' state", fpm_state_names[fpm_state]);
/* fall down */
//由於沒有break語句,fpm_pctl_action_next將會執行
case FPM_PCTL_ACTION_TIMEOUT :
fpm_pctl_action_next();
break;
case FPM_PCTL_ACTION_LAST_CHILD_EXITED :
fpm_pctl_action_last();
break;

}
}
/* }}} */


/**功能描述
*1.向所有進程發送信號
*2.注冊一個定時器,在子進程結束超時時重發信號,如果正常結束會向sp[1]寫入C,執行fpm_children_bury
*最終調用fpm_pctl_child_exited,直到最后一個子進程結束。
**/
static void fpm_pctl_action_next() /* {{{ */
{
int sig, timeout;

if (!fpm_globals.running_children) {
fpm_pctl_action_last();
}

if (fpm_signal_sent == 0) {
if (fpm_state == FPM_PCTL_STATE_TERMINATING) {
sig = SIGTERM;
} else {
sig = SIGQUIT;
}
timeout = fpm_global_config.process_control_timeout;
} else {
if (fpm_signal_sent == SIGQUIT) {
sig = SIGTERM;
} else {
sig = SIGKILL;
}
timeout = 1;
}

fpm_pctl_kill_all(sig); //發送信號
fpm_signal_sent = sig;
fpm_pctl_timeout_set(timeout); //設置超時函數,最終會調用fpm_pctl(FPM_PCTL_STATE_UNSPECIFIED, FPM_PCTL_ACTION_TIMEOUT),繼續發信號
}
/* }}} */


static void fpm_pctl_action(struct fpm_event_s *ev, short which, void *arg) /* {{{ */
{
fpm_pctl(FPM_PCTL_STATE_UNSPECIFIED, FPM_PCTL_ACTION_TIMEOUT);
}
/* }}} */


如果子進程正常退出,會想sp[1]寫入sigchild信號,進行新一輪的循環,執行 fpm_children_bury-->fpm_pctl_child_exited
當處理完所有子進程時,調用fpm_pctl_action_last 里,則執行reload操作

static void fpm_pctl_action_last() /* {{{ */
{
switch (fpm_state) {
case FPM_PCTL_STATE_RELOADING: //執行reload操作
fpm_pctl_exec();
break;

case FPM_PCTL_STATE_FINISHING:
case FPM_PCTL_STATE_TERMINATING:
fpm_pctl_exit();
break;
}
}

static void fpm_pctl_exec() /* {{{ */
{

zlog(ZLOG_NOTICE, "reloading: execvp(\"%s\", {\"%s\""
"%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s"
"%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s" "%s%s%s"
"})",
saved_argv[0], saved_argv[0],
optional_arg(1),
optional_arg(2),
optional_arg(3),
optional_arg(4),
optional_arg(5),
optional_arg(6),
optduiional_arg(7),
optional_arg(8),
optional_arg(9),
optional_arg(10)
);

fpm_cleanups_run(FPM_CLEANUP_PARENT_EXEC);
execvp(saved_argv[0], saved_argv); //進程替換,execvp會清空老進程代碼區、數據器、堆棧,重新載入
zlog(ZLOG_SYSERROR, "failed to reload: execvp() failed");
exit(FPM_EXIT_SOFTWARE);
}
/* }}} */

以上源碼追蹤,最后總結一下reload邏輯過程,其信號處理邏輯不再追述。

1.fpm主進程收到SIGUSR2信號,回調函數向sp[1] 寫入2這個字符;
2.fpm fpm_event_loop 監聽到數據寫入(IO復用)調用回調函數fpm_got_signal讀取信號標識符2;
3.執行 fpm_pctl--> fpm_pctl_action_next,向每個子進程發信號並注冊一個定時器;
4.如果子進程退出超時,主進程會重發信號,正常退出則主進程收到sigchild信號,寫入標識符C對應sigchild;
5.重復2,取出C信號標識符執行fpm_children_bury-->fpm_pctl_child_exited;
6.所有子進程都結束后,進入fpm_pctl_action_last中斷reload操作,主要是調用execvp創建新進程(execvp執行方式是代碼空間、數據空間、堆棧的替換);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM