erlang poolboy 源码剖析


poolboy

poolboy是一个轻量的,通用的,高性能的,高可用的 Erlang Pooling Library.

实现思路

  • 使用 gen_server 来保证多线程可用
  • 使用 link, supversior 来监控管理 worker
  • 使用 mornitor 来监控管理任务
  • Checkout 出错自动 cancel_waiting
  • 工作线程出错, 自动 checkin

进程图

进程图

各进程介绍:

  • poolboy 是 poolboy 的管理进程,可用给每个 poolboy 设定一个名字,不同名字的 poolboy 完全独立
  • poolboy_sup 是一个 supervsior, 监控所有的 poolboy_worker
  • poolboy_worker 是工作进程
  • client_process 是使用 poolboy 的进程

各进程之间的关系:

  1. poolboy 进程中存有 poolboy_sup 的pid
  2. poolboy_sup supervise 所有 poolboy_worker 进程,exit poolboy_sup 时会同时exit 所有 poolboy_worker 进程
  3. poolboy 进程 link 所有 poolboy_worker 进程,在 poolboy_worker 进程 exit 的时候做出相应的处理,同理, poolboy 进程挂掉的时候结束所有 poolboy_worker 进程, 这里用 link 的另一层考虑应该时和下名 mornitor 的进程有所区分
  4. poolboy 进程 mornitor 所有 client_process 进程,在 client_process 进程结束的时候,自动 check_in 对应的 poolboy_worker 进程

State 结构

  • waiting: 等待队列
  • monitors: 监听ets
  • supversior: pool_boy supversior pid
  • size: worker 数量限制
  • workers:工作进程
  • max_overflow: worker overflow 限制
  • overflow: 当前 overflow
  • strategry: lifo 或者 fifo

关键函数

child_spec

  • 启动poolboy gen_server, 可以传进程名字进来

init

  • 创角等待队列 waiting 队列
  • 创角 mornitors ets
  • State 中存储 waiting queue 和 mornitors ets tid
  • 创建 poolboy_sup 和 workers

支持的options

PoolArgs:

  • worker_module: worker 模块名, 启动 pool_boy supversior, 传入worker_module 参数
  • size: 初始化 #state.size, init最后会创建该数量的worker
  • max_overflow: 初始化 #state.max_overflow
  • strategray: 添加worker的策略

new_woker

  • new_worker(Sup):在Sup下创建worker, 将 worker 与 poolboy 链接起来, 监控worker状态
  • new_worker(Sup, FromPid): 不链接 worker, 而是监控 FromPid

checkout

checkout(Pool, Block, Timeout)
handle_call({checkout, CRef, Block}, {FromPid, _} = From, State):

  • 存在worker时:
    • 监控FromPid, 将监控关系{WorkerID, CRef, MRef}插入到mornitor ets.
    • 移除使用的Workers
  • 已经overflow,未超过上限
    • new_worker(Sup, FromPid), 监控FromPid
    • 将监控关系保存到 mornitor ets
    • Overflow + 1
  • Block = false
    {reply, full, State};
  • []
    • 监控FromPid
    • 将{FromPid, CRef, MRef} 存储到 waiting队列

有异常时 cacel_waiting: 必要性, 防止fun(Worker)崩掉,自动checkin:

  • 在 mornitor ets 或者 waiting 队列中删除 CRef对应的项

checkin

gen_server:cast(Pool, {checkin, Worker}).

  • 查找对应的监控关系,解除监控关系
  • handle_checkin
    • 存在等待队列, 转让给等待队列中的任务使用
    • 不存在等待队列且 overflow, 删除worker, overflow -1
    • 否则,将 worker 添加到 State#state.workers中

transcation

transaction(Pool, Fun, Timeout) ->                    
    Worker = poolboy:checkout(Pool, true, Timeout),   
    try                                               
        Fun(Worker)                                   
    after                                             
        ok = poolboy:checkin(Pool, Worker)            
    end.                                              

handle_info

handle_info({'DOWN', MRef, _, _, _}, State)

用户进程崩掉的时候:

  • 删除监控关系, handle_checkin, 自动checkin
  • 或者删除等待队列
 handle_info({'EXIT', Pid, _Reason}, State) 
  • 工作中worker: exit的时候出列流程和 handle_check_in类似,但是要注意新建 worker
  • 空闲worker: 重建新的, 不会自动restart吗?, supvisor策略为 never restart, 自己管理

疑问

  1. poolboy_sup 不负责 poolboy_worker 的重启, 只是负责 poolboy_worker 的exit, 那么最后用 link 来结束所有 poolboy_worker, 有什么不同,有什么区别?
    • 通过supversior结束poolboy_worker, 会执行 poolboy_worker:terminate
    • 通过 linked process 来结束 poolboy_worker, 需要 poolboy_worker 设置 trapexit
  2. checkout 时候没有直接使用 self PID 作为任务的ref,而是重新 make_ref, 注释说是为了解决某些情况下checkout超时不会自动checkin, 需要等 client_process exit 才会被 checkin, 没想通是为什么?


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM