nodejs fork 子進程創建任務以及簡單的prometheus 監控


以下是一個簡單的基於nodejs 的fork 子進程創建子任務,同時使用prometheus 暴露一些簡單的metrics
使用express 框架

環境准備

  • 項目結構
 
├── Dockerfile
├── README.md
├── app.js
├── docker-compose.yaml
├── grafana
└── metrics.json
├── metrics.js
├── package.json
├── prometheus.yml
├── send_mail.js
├── taskstatus.js
├── utils.js
└── yarn.lock
  • 結構說明
    send_mail.js 為簡單的fork 子進程的代碼
    taskstatus.js 為存儲子進程任務狀態的
    utils.js 為一個簡單的工具模塊,主要對於已經完成的子任務的清理
    metrics.js 為prometheus metrics 定義
    app.js express rest 接口操作以及任務處理的核心代碼
    docker-compose.yaml docker-compose 全家桶運行
    Dockerfile nodejs 應用容器化
    grafana/metrics.json 為應用grafana 對於 prometheus 的dashboard 配置
  • 代碼說明
    app.js 核心處理
 
const { fork } = require('child_process')
const express = require("express")
const util = require("./utils")
const uuid = require("uuid/v4")
const { child_process_status_all, child_process_status_pending, child_process_status_ok } = require("./taskstatus")
const { child_process_status_all_counter, child_process_status_pending_gauge, child_process_status_ok_counter, initMetrics, initGcStats, process_ok_clean_timer__status, up } = require("./metrics")
const app = express();
const main_process_id = process.pid;
let interval = false;
/**
 * metrics route register 注冊prometheus metrcis 路由
 */
app.get('/metrics', (req, res) => {
  initMetrics(req, res);
})
/**
 * disable process clean timer 禁用定時任務清理
 */
app.get("/disable_timer", (req, res) => {
  if (interval) {
    interval = false;
  }
  process_ok_clean_timer__status.set(0)
  res.send({ timer_statuss: false })
})
/**
 * enable process clean timer 啟用定時任務清理子進程
 */
app.get("/enable_timer", (req, res) => {
  if (interval == false) {
    interval = true;
  }
  process_ok_clean_timer__status.set(1)
  res.send({ timer_statuss: true })
})
/**
 * for create process workers
 */
app.get('/endpoint', (req, res) => {
  // fork another process 子進程的創建以及消息的通信(狀態以及部分prometheus metrics 維護)
  const myprocess = fork('./send_mail.js');
  child_process_status_pending[myprocess.pid] = {
    status: "pending"
  }
  child_process_status_all[myprocess.pid] = {
    status: "pending"
  }
  child_process_status_all_counter.inc(1)
  child_process_status_pending_gauge.inc(1)
  console.log(`fork process pid:${myprocess.pid}`)
  const mails = uuid();
  // send list of e-mails to forked process
  myprocess.send({ mails });
  // listen for messages from forked process
  myprocess.on('message', (message) => {
    console.log(`Number of mails sent ${message.counter}`);
    child_process_status_ok[myprocess.pid] = {
      status: "ok"
    }
    child_process_status_ok_counter.inc(1)
    child_process_status_pending_gauge.dec(1)
    child_process_status_all[myprocess.pid] = {
      status: "ok"
    }
    delete child_process_status_pending[myprocess.pid]
  });
  return res.json({ status: true, sent: true });
});
/**
 * call api for stop processed workers   刪除任務完成的子進程
 */
app.get("/stop", (req, res) => {
  util.stopProcess(main_process_id, (err, data) => {
    if (err == null) {
      res.send({ timer_clean_status: "ok" })
    }
  })
})
// init gc metrics   gc metrcis 暴露
initGcStats()
// clean ok process timer 定時任務清理完成的進程
setInterval(function () {
  if (interval) {
    util.stopProcess(main_process_id, (err, data) => {
      if (err == null) {
        console.log({ timer_clean_status: "ok" })
      } else {
        process_ok_clean_timer__status.set(0)
      }
    })
  }
}, 10000)
// set metric status to up 
up.set(1)
app.listen(8080, "0.0.0.0", () => {
  console.log(`go to http://localhost:8080/ to generate traffic`)
}).on("error", () => {
  up.set(0)
})

utils.js 定時任務清理模塊

const psTree = require("ps-tree")
const {spawn } = require('child_process')
const {child_process_status_ok} = require("./taskstatus")
const {process_ok_clean_timer__status} = require("./metrics")
/**
 * 
 * @param {mainProcessID} mainProcessID 
 * @param {cb} callback for check status
 */
function stopProcess(mainProcessID,cb){
    psTree(mainProcessID, function (err, children) {
        if (err){
          process_ok_clean_timer__status.set(0)
        }
        let pids = [];
        for (const key in child_process_status_ok) {
          if (child_process_status_ok.hasOwnProperty(key)) {
            pids.push(key)
            delete child_process_status_ok[key]
          }
        }
        let info = children.filter(item => item.COMM == "ps" || item.COMMAND == "ps").map(function (p) { return p.PID })
        pids.push(...info)
        console.log(`stop child process ids: ${JSON.stringify(pids)}`)
        spawn('kill', ['-9'].concat(pids));
        cb(null,"ok")
      })
}
module.exports = {
    stopProcess
};
  

metrics.js prometheus metrics 模塊
主要是metrics 定義,以及一個初始化的方法

 
const Prometheus = require("prom-client")
const gcStats = require('prometheus-gc-stats')
module.exports = {
    child_process_status_all_counter:new Prometheus.Counter({
        name: 'child_process_status_all_total',
        help: 'all running process',
        labelNames: ['process_all']
      }),
      child_process_status_pending_gauge:new Prometheus.Gauge({
        name: 'current_child_process_status_pending',
        help: 'current pending process',
        labelNames: ['process_pending']
      }),
      child_process_status_ok_counter:new Prometheus.Counter({
        name: 'child_process_status_ok_total',
        help: 'all ok process',
        labelNames: ['process_ok']
      }),
      process_ok_clean_timer__status:new Prometheus.Gauge({
        name: 'process_ok_clean_timer_status',
        help: 'process_ok_clean_timer_status',
        labelNames: ['process_timer']
      }),
      up:new Prometheus.Gauge({
        name: 'up',
        help: 'metrics_status',
        labelNames: ['metrics_status']
      }),
      initGcStats: function(){
        const startGcStats = gcStats(Prometheus.register)
        startGcStats()
      },
      initMetrics:function(req,res){
        res.set('Content-Type', Prometheus.register.contentType)
        res.end(Prometheus.register.metrics())
      }
}

taskstatus.js 狀態存儲(很簡單,就是一個json 對象)

module.exports = {
    child_process_status_all:{},
    child_process_status_pending:{},
    child_process_status_ok:{}
}

send_mail.js 子進程任務處理

async function sendMultipleMails(mails) {
    let sendMails = 0;
    // logic for
    // sending multiple mails
    return sendMails;
 }
 // receive message from master process
 process.on('message', async (message) => {
   console.log("get messageId",message)
   const numberOfMailsSend = await sendMultipleMails(message.mailsid); 
   setTimeout(()=>{
    process.send({ counter: numberOfMailsSend });
   },Number.parseInt(Math.random()*10000))
   // send response to master process
 });

Dockerfile 為了方便使用docker 運行

FROM node:12.14.0-alpine3.9
WORKDIR /app
COPY app.js /app/app.js
COPY package.json /app/package.json
COPY yarn.lock /app/yarn.lock
COPY metrics.js /app/metrics.js
COPY utils.js /app/utils.js
COPY taskstatus.js /app/taskstatus.js
COPY send_mail.js /app/send_mail.js
#CMD [ "yarn","app"]
EXPOSE 8080
RUN yarn
ENTRYPOINT [ "yarn","app" ]

docker-compose 文件主要是包含prometheus 以及grafana還有nodejs 應用

version: "3"
services:
  app:
    build: ./
    ports: 
    - "8080:8080"
    image: dalongrong/node-process
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
  prometheus:
    image: prom/prometheus
    volumes:
      - "./prometheus.yml:/etc/prometheus/prometheus.yml"
    ports:
      - "9090:9090"

啟動&&效果

  • 啟動
docker-compose up -d
  • 簡單壓測
ab -n 200 -c 20 http://localhost:8080/endpoint
  • grafana 效果

 

 

說明

以上就是一個簡單的基於fork 以及prometheus 的nodejs 子任務創建

參考資料

https://github.com/rongfengliang/node-process-fork-learning
https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM