Gearman是一個用來把工作委派給其他機器、分布式的調用更適合做某項工作的機器、並發的做某項工作在多個調用間做負載均衡、或用來在調用其它語言的函數的系統。Gearman是一個分發任務的程序框架,可以用在各種場合,開源、多語言支持、靈活、快速、可嵌入、可擴展、無消息大小限制、可容錯,與Hadoop相比,Gearman更偏向於任務分發功能。它的任務分布非常簡單,簡單得可以只需要用腳本即可完成。Gearman最初用於LiveJournal的圖片resize功能,由於圖片resize需要消耗大量計算資 源,因此需要調度到后端多台服務器執行,完成任務之后返回前端再呈現到界面。
gearman的任務傳遞模式是一對一的,不能實現一對多,一個client通過job server最后只能夠到達一個worker上。如果需要一對多,需要定義多個worker的function,依次向這些worker進行發送,非常的不方便。這一點就不如ZeroMQ,ZeroMQ支持的模式很多,能夠滿足各種消息隊列需求。他們用在不同的場合,Gearman是分布式任務系統,而ZeroMQ是分布式消息系統,任務只需要做一次就行。
1. Server
1.1 Gearman工作原理
Gearman 服務有很多要素使得它不僅僅是一種提交和共享工作的方式,但是主要的系統只由三個組件組成: gearmand 守護進程(server),用於向 Gearman 服務提交請求的 client ,執行實際工作的 worker。其關系如下圖所示:
Gearmand server執行一個簡單的功能,即從client收集job請求並充當一個注冊器,而worker可以在此提交關於它們支持的job和操作類型的信息,這樣server實際上就充當了Client和Worker的中間角色。Client將job直接丟給server,而server根據worker提交的信息,將這些job分發給worker來做,worker完成后也可返回結果,server將結果傳回client。舉個例子,在一個公司里面,有老板1、老板2、老板3(client),他們的任務就是出去喝酒唱歌拉項目(job),將拉來的項目直接交給公司的主管(server),而主管並不親自來做這些項目,他將這些項目分給收手下的員工(worker)來做,員工做完工作后,將結果交給主管,主管將結果報告給老板們即可。
要使用gearman,首先得安裝server,下載地址:https://launchpad.net/gearmand。當下載安裝完成后,可以啟動gearmand,啟動有很多參數選項,可以man gearmand來查看,主要的 選項有:
- -b, --backlog=BACKLOG Number of backlog connections for listen.
- -d, --daemon Daemon, detach and run in the background.
- -f, --file-descriptors=FDS Number of file descriptors to allow for the process
- (total connections will be slightly less). Default is max allowed for user.
- -h, --help Print this help menu.
- -j, --job-retries=RETRIES Number of attempts to run the job before the job server removes it. Thisis helpful to ensure a bad job does not crash all available workers. Default is no limit.
- -l, --log-file=FILE Log file to write errors and information to. Turning this option on also forces the first verbose level to be enabled.
- -L, --listen=ADDRESS Address the server should listen on. Default is INADDR_ANY.
- -p, --port=PORT Port the server should listen on.
- -P, --pid-file=FILE File to write process ID out to.
- -r, --protocol=PROTOCOL Load protocol module.
- -R, --round-robin Assign work in round-robin order per workerconnection. The default is to assign work in the order of functions added by the worker.
- -q, --queue-type=QUEUE Persistent queue type to use.
- -t, --threads=THREADS Number of I/O threads to use. Default=0.
- -u, --user=USER Switch to given user after startup.
- -v, --verbose Increase verbosity level by one.
- -V, --version Display the version of gearmand and exit.
- -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received. The default is to wakeup all available workers.
啟動gearmand:
- sudo gearmand --pid-file=/var/run/gearmand/gearmand.pid --daemon --log-file=/var/log/gearman.log
1.2 實例化queue與容錯
Gearman默認是將queue保存在內存中的,這樣能夠保障速速,但是遇到宕機或者server出現故障時,在內存中緩存在queue中的任務將會丟失。Gearman提供了了queue實例化的選項,能夠將queue保存在數據庫中,比如:SQLite3、Drizzle、MySQL、PostgresSQL、Redis(in dev)、MongoDB(in dev).在執行任務前,先將任務存入持久化隊列中,當執行完成后再將該任務從持久化隊列中刪除。要使用db來實例化queue,除了在啟動時加入-q參數和對應的數據庫之外,還需要根據具體的數據庫使用相應的選項,例如使用sqlit3來實例化queue,並指明使用用來存儲queue的文件:
- gearmand -d -q libsqlite3 --libsqlite3-db=/tmp/demon/gearman.db --listen=localhost --port=4370
再如使用mysql來實例化queue,選項為:
- <pre name="code" class="plain">/usr/local/gearmand/sbin/gearmand -d -u root \
- –queue-type=MySQL \
- –mysql-host=localhost \
- –mysql-port=3306 \
- –mysql-user=gearman \
- –mysql-password=123456 \
- –mysql-db=gearman \
- –mysql-table=gearman_queue
還要創建相應的數據庫和表,並創建gearman用戶,分配相應的權限:
- CREATE DATABASE gearman;
- CREATE TABLE `gearman_queue` (
- `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
- `unique_key` varchar(64) NOT NULL,
- `function_name` varchar(255) NOT NULL,
- `when_to_run` int(10) NOT NULL,
- `priority` int(10) NOT NULL,
- `data` longblob NOT NULL,
- PRIMARY KEY (`id`),
- UNIQUE KEY `unique_key_index` (`unique_key`,`function_name`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- create USER gearman@localhost identified by ’123456′;
- GRANT ALL on gearman.* to gearman@localhost;
可以在gearman的配置文件中加入相關配置,以免每次啟動都需要寫一堆東西:
- # /etc/conf.d/gearmand: config file for /etc/init.d/gearmand
- # Persistent queue store
- # The following queue stores are available:
- # drizzle|memcache|mysql|postgre|sqlite|tokyocabinet|none
- # If you do not wish to use persistent queues, leave this option commented out.
- # Note that persistent queue mechanisms are mutally exclusive.
- PERSISTENT="mysql"
- # Persistent queue settings for drizzle, mysql and postgre
- #PERSISTENT_SOCKET=""
- PERSISTENT_HOST="localhost"
- PERSISTENT_PORT="3306"
- PERSISTENT_USER="gearman"
- PERSISTENT_PASS="your-pass-word-here"
- PERSISTENT_DB="gearman"
- PERSISTENT_TABLE="gearman_queue"
- # Persistent queue settings for sqlite
- #PERSISTENT_FILE=""
- # Persistent queue settings for memcache
- #PERSISTENT_SERVERLIST=""
- # General settings
- #
- # -j, --job-retries=RETRIES Number of attempts to run the job before the job
- # server removes it. Thisis helpful to ensure a bad
- # job does not crash all available workers. Default
- # is no limit.
- # -L, --listen=ADDRESS Address the server should listen on. Default is
- # INADDR_ANY.
- # -p, --port=PORT Port the server should listen on. Default=4730.
- # -r, --protocol=PROTOCOL Load protocol module.
- # -t, --threads=THREADS Number of I/O threads to use. Default=0.
- # -v, --verbose Increase verbosity level by one.
- # -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received.
- # The default is to wakeup all available workers.
- GEARMAND_PARAMS="-L 127.0.0.1 --verbose=DEBUG"
這其實並不是一個很好的方案,因為當使用數據庫來實例化queue時,會增加兩個步驟:Client和worker必須連接到server上去讀寫job,並且數據庫在處理的速度上也會大大降低。在大並發任務量的情況下,性能會受到直接影響,你會發現SQLite或者mysql並不能滿足處理大量BLOB的性能要求,job會不斷地積攢而得不到處理,給一個任務猶如石牛如一樣海毫無反應。歸根結底,需要根據自己的應用場景,合理設計一些測試用例和自動化腳本,通過實際的運行狀態進行參數的調整。
job分布式系統一個基本的特點就是要有單點容錯能力(no single point failure),還不能有單點性能瓶頸(no single point of bottleneck)。即:一個節點壞了不影響整個系統的業務,一個節點的性能不能決定整個系統的性能。那如果server掛了該怎么辦?解決方法是使用多個server:
- gearmand -d -q libsqlite3 --listen=localhost --port=4370
- gearmand -d -q libsqlite3 --listen=localhost --port=4371
每個client連接多個server,並使用負載最低的那個server,當該server掛掉之后,gearman會自動切換到另一個server上,如下圖所示:

1.3 輪詢調度
當job不斷地增加時,我們可能需要增加worker服務器來增加處理能力,但你可能會發現任務並不是均勻地分布在各個worker服務器上,因為server分配任務給worker的方式默認按照循序分配的,比如你現有worker-A,在server上注冊了5個worker進程,隨着任務的增加,又加了一台worker-B,並向同一個server注冊了5個worker進程。默認情況下,server會按照worker注冊的先后順序進行調度,即:只有給worker-A分配滿任務后才會給worker-B分配任務,即分配方式是wA, wA,wA, wA,wA,wB, wB,wB, wB, wB。為了能夠給worker-A和worker-B均勻地分配任務,server可以采用輪詢的方式給worker服務器分配任務,即分配方式為: wA, wB, wA, wB ...,那么在啟動server時加上選項:-R或者--round-robin
1.4 受限喚醒
根據gearman協議的設計,Worker 如果發現隊列中沒有任務需要處理,是可以通過發送 PRE_SLEEP 命令給服務器,告知說自己將進入睡眠狀態。在這個狀態下,Worker 不會再去主動抓取任務,只有服務器發送 NOOP 命令喚醒后,才會恢復正常的任務抓取和處理流程。因此 Gearmand 在收到任務時,會去嘗試喚醒足夠的 Worker 來抓取任務;此時如果 Worker 的總數超過可能的任務數,則有可能產生驚群效應。而通過 –worker-wakeup 參數,則可以指定收到任務時,需要喚醒多少個 Worker 進行處理,避免在 Worker 數量非常大時,發送大量不必要的 NOOP 報文,試圖喚醒所有的 Worker。
1.6 線程模型
Gearman中有三種線程:
- 監聽和管理線程。只有一個(負責接收連接,然后分配給I/O線程來處理,如果有多個I/O線程的話,同時也負責啟動和關閉服務器,采用libevent來管理socket和信號管道)
- I/O線程。可以有多個(負責可讀可寫的系統調用和對包初步的解析,將初步解析的包放入各自的異步隊列中,每個I/O線程都有自己的隊列,所以競爭很少,通過-t選項來指定I/O線程數)
- 處理線程。只有一個(負責管理各種信息列表和哈希表,比如跟蹤唯一鍵、工作跟蹤句柄、函數、工作隊列等。將處理結果信息包返回給I/O線程,I/O線程將該包挑選出來並向該連接發送數據)
進程句柄數
- flier@debian:~$ ulimit -n
- 1024
- flier@debian:~$ ulimit -HSn 4096 // 設置進程句柄數的最大軟硬限制
- 4096
- flier@debian:~$ cat /proc/sys/fs/file-max
- 24372
- flier@debian:~# sysctl -w fs.file-max=100000
- 100000
CAP_SYS_RESOURCE capability) may make arbitrary changes to either limit value."
2. Client
- import gearman
- import time
- from gearman.constants import JOB_UNKNOWN
- def check_request_status(job_request):
- """check the job status"""
- if job_request.complete:
- print 'Job %s finished! Result: %s - %s' % (job_request.job.unique, job_request.state, job_request.result)
- elif job_request.time_out:
- print 'Job %s timed out!' % job_request.unique
- elif job_request.state == JOB_UNKNOWN:
- print "Job %s connection failed!" % job_request.unique
- gm_client = gearman.GearmanClient(['localhost:4730','localhost:4731'])
- complete_job_request = gm_client.submit_job("reverse", "Hello World!")
- check_request_status(complete_job_request)
2.1 task與job
- Task是一組job,在下發后會執行並返回結果給調用方
- Task內的子任務悔下發給多個work並執行
- client下放給server的任務為job,而整個下方並返回結果的過程為task,每個job會在一個work上執行
- task是一個動態的概念,而job是一個靜態的概念。這有點類似“進程”和“程序”概念的區別。既然是動態的概念,就有完成(complete)、超時(time_out)、攜帶的job不識別(JOB_UNKNOWN)等狀態
2.2 job優先級(priority)
2.3 同步與異步(background)


2.4 阻塞與非阻塞(wait_until_complete)
2.5 送多個job
- GearmanClient.submit_multiple_jobs(jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None)
這里jobs_to_submit是一組job,每個job是上述格式的字典,這里解釋一下unique,unique是設置task的unique key,即在小結2.1中的job_request.job.unique的值,如果不設置的話,會自動分配。
- GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
- GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
- GearmanClient.submit_multiple_requests(job_requests, wait_until_complete=True, poll_timeout=None)
- GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
- GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
下面是官網給的一個同步非阻塞方式發送多個job的例子,在該例子的最后,在取得server返回結果之前,用了wait_until_jobs_completed函數來等待task中的所有job返回結果:
- import time
- gm_client = gearman.GearmanClient(['localhost:4730'])
- list_of_jobs = [dict(task="task_name", data="binary data"), dict(task="other_task", data="other binary data")]
- submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)
- # Once we know our jobs are accepted, we can do other stuff and wait for results later in the function
- # Similar to multithreading and doing a join except this is all done in a single process
- time.sleep(1.0)
- # Wait at most 5 seconds before timing out incomplete requests
- completed_requests = gm_client.wait_until_jobs_completed(submitted_requests, poll_timeout=5.0)
- for completed_job_request in completed_requests:
- check_request_status(completed_job_request)
- import time
- gm_client = gearman.GearmanClient(['localhost:4730'])
- list_of_jobs = [dict(task="task_name", data="task binary string"), dict(task="other_task", data="other binary string")]
- failed_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False)
- # Let's pretend our assigned requests' Gearman servers all failed
- assert all(request.state == JOB_UNKNOWN for request in failed_requests), "All connections didn't fail!"
- # Let's pretend our assigned requests' don't fail but some simply timeout
- retried_connection_failed_requests = gm_client.submit_multiple_requests(failed_requests, wait_until_complete=True, poll_timeout=1.0)
- timed_out_requests = [job_request for job_request in retried_requests if job_request.timed_out]
- # For our timed out requests, lets wait a little longer until they're complete
- retried_timed_out_requests = gm_client.submit_multiple_requests(timed_out_requests, wait_until_complete=True, poll_timeout=4.0)
2.6 序列化
- import pickle
- class PickleDataEncoder(gearman.DataEncoder):
- @classmethod
- def encode(cls, encodable_object):
- return pickle.dumps(encodable_object)
- @classmethod
- def decode(cls, decodable_string):
- return pickle.loads(decodable_string)
- class PickleExampleClient(gearman.GearmanClient):
- data_encoder = PickleDataEncoder
- my_python_object = {'hello': 'there'}
- gm_client = PickleExampleClient(['localhost:4730'])
- gm_client.submit_job("task_name", my_python_object)
3 worker
3.1 主要API
- GearmanWorker.set_client_id(client_id):設置自身ID
- GearmanWorker.register_task(task, callback_function):為task注冊處理函數callback_function,其中callback_function的定義格式為:
- def function_callback(calling_gearman_worker, current_job):
- return current_job.data
- GearmanWorker.unregister_task(task):注銷worker上定義的函數
- GearmanWorker.work(poll_timeout=60.0): 無限次循環, 完成發送過來的job.
- GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
- GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None):Send a Gearman JOB_STATUS update for an inflight job
- GearmanWorker.send_job_warning(current_job, data, poll_timeout=None):Send a Gearman JOB_WARNING update for an inflight job
3.2 簡單示例
- import gearman
- gm_worker = gearman.GearmanWorker(['localhost:4730'])
- def task_listener_reverse(gearman_worker, gearman_job):
- print 'Reversing string:' + gearman_job.data
- return gearman_job.data[::-1]
- gm_worker.set_client_id("worker_revers")
- gm_worker.register_task("reverse", task_listener_reverse)
- gm_worker.work()


3.2 返回結果
- GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
- GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None): Send a Gearman JOB_STATUS update for an inflight job
- GearmanWorker.send_job_warning(current_job, data, poll_timeout=None): Send a Gearman JOB_WARNING update for an inflight job
- gm_worker = gearman.GearmanWorker(['localhost:4730'])
- # See gearman/job.py to see attributes on the GearmanJob
- # Send back a reversed version of the 'data' string through WORK_DATA instead of WORK_COMPLETE
- def task_listener_reverse_inflight(gearman_worker, gearman_job):
- reversed_data = reversed(gearman_job.data)
- total_chars = len(reversed_data)
- for idx, character in enumerate(reversed_data):
- gearman_worker.send_job_data(gearman_job, str(character))
- gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)
- return None
- # gm_worker.set_client_id is optional
- gm_worker.register_task('reverse', task_listener_reverse_inflight)
- # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
- gm_worker.work()
3.3 數據序列化
- import json # Or similarly styled library
- class JSONDataEncoder(gearman.DataEncoder):
- @classmethod
- def encode(cls, encodable_object):
- return json.dumps(encodable_object)
- @classmethod
- def decode(cls, decodable_string):
- return json.loads(decodable_string)
- class DBRollbackJSONWorker(gearman.GearmanWorker):
- data_encoder = JSONDataEncoder
- def after_poll(self, any_activity):
- # After every select loop, let's rollback our DB connections just to be safe
- continue_working = True
- # self.db_connections.rollback()
- return continue_working
Polling callback to notify any outside listeners whats going on with the GearmanWorker.
Return True to continue polling, False to exit the work loop
4 admin_client
前面講了Client和Worker,對於server也提供了一些API,可以對其進行監控和設置,比如:設置queue大小、關閉連接、查看狀態、查看worker等,用於操作的對象時GearmanAdminClient,其定義如下:
- GearmanAdminClient.send_maxqueue(task, max_size): Sends a request to change the maximum queue size for a given task
- GearmanAdminClient.send_shutdown(graceful=True): Sends a request to shutdown the connected gearman server
- GearmanAdminClient.get_status():Retrieves a list of all registered tasks and reports how many items/workers are in the queue
- GearmanAdminClient.get_version(): Retrieves the version number of the Gearman server
- GearmanAdminClient.get_workers():Retrieves a list of workers and reports what tasks they’re operating on
- GearmanAdminClient.ping_server(): Sends off a debugging string to execute an application ping on the Gearman server, return the response time
- gm_admin_client = gearman.GearmanAdminClient(['localhost:4730'])
- # Inspect server state
- status_response = gm_admin_client.get_status()
- version_response = gm_admin_client.get_version()
- workers_response = gm_admin_client.get_workers()
- response_time = gm_admin_client.ping_server()
5. job對象
5.1 GearmanJob
GearmanJob對象提供了發送到server的job的最基本信息,其定義如下:
class gearman.job.GearmanJob(connection, handle, task, unique, data)
server信息
- GearmanJob.connection: GearmanConnection - Server assignment. Could be None prior to client job submission
- GearmanJob.handle:string - Job’s server handle. Handles are NOT interchangeable across different gearman servers
- GearmanJob.task:string - Job’s task
- GearmanJob.unique:string - Job’s unique identifier (client assigned)
- GearmanJob.data:binary - Job’s binary payload
5.2 GearmanJobRequest
跟蹤job發送
- GearmanJobRequest.gearman_job: GearmanJob - Job that is being tracked by this GearmanJobRequest object
- GearmanJobRequest.priority: PRIORITY_NONE [default]、PRIORITY_LOW、PRIORITY_HIGH
- GearmanJobRequest.background: boolean - Is this job backgrounded?
- GearmanJobRequest.connection_attempts: integer - Number of attempted connection attempts
- GearmanJobRequest.max_connection_attempts: integer - Maximum number of attempted connection attempts before raising an exception
跟蹤job執行過程
- GearmanJobRequest.result: binary - Job’s returned binary payload - Populated if and only if JOB_COMPLETE
- GearmanJobRequest.exception: binary - Job’s exception binary payload
- GearmanJobRequest.state:
- GearmanJobRequest.timed_out: boolean - Did the client hit its polling_timeout prior to a job finishing?
- GearmanJobRequest.complete: boolean - Does the client need to continue to poll for more updates from this job?
- JOB_UNKNOWN - Request state is currently unknown, either unsubmitted or connection failed
- JOB_PENDING - Request has been submitted, pending handle
- JOB_CREATED - Request has been accepted
- JOB_FAILED - Request received an explicit job failure (job done but errored out)
- JOB_COMPLETE - Request received an explicit job completion (job done with results)
跟蹤運行中的job狀態
- GearmanJobRequest.warning_updates: collections.deque - Job’s warning binary payloads
- GearmanJobRequest.data_updates: collections.deque - Job’s data binary payloads
- GearmanJobRequest.status: dictionary - Job’s status
- handle - string - Job handle
- known - boolean - Is the server aware of this request?
- running - boolean - Is the request currently being processed by a worker?
- numerator - integer
- denominator - integer
- time_received - integer - Time last updated