分布式任務系統gearman的python實戰


 

Gearman是一個用來把工作委派給其他機器、分布式的調用更適合做某項工作的機器、並發的做某項工作在多個調用間做負載均衡、或用來在調用其它語言的函數的系統。Gearman是一個分發任務的程序框架,可以用在各種場合,開源、多語言支持、靈活、快速、可嵌入、可擴展、無消息大小限制、可容錯,與Hadoop相比,Gearman更偏向於任務分發功能。它的任務分布非常簡單,簡單得可以只需要用腳本即可完成。Gearman最初用於LiveJournal的圖片resize功能,由於圖片resize需要消耗大量計算資 源,因此需要調度到后端多台服務器執行,完成任務之后返回前端再呈現到界面。

gearman的任務傳遞模式是一對一的,不能實現一對多,一個client通過job server最后只能夠到達一個worker上。如果需要一對多,需要定義多個worker的function,依次向這些worker進行發送,非常的不方便。這一點就不如ZeroMQ,ZeroMQ支持的模式很多,能夠滿足各種消息隊列需求。他們用在不同的場合,Gearman是分布式任務系統,而ZeroMQ是分布式消息系統,任務只需要做一次就行。

1. Server

1.1 Gearman工作原理

Gearman 服務有很多要素使得它不僅僅是一種提交和共享工作的方式,但是主要的系統只由三個組件組成: gearmand 守護進程(server),用於向 Gearman 服務提交請求的 client ,執行實際工作的 worker。其關系如下圖所示:

Gearmand server執行一個簡單的功能,即從client收集job請求並充當一個注冊器,而worker可以在此提交關於它們支持的job和操作類型的信息,這樣server實際上就充當了Client和Worker的中間角色。Client將job直接丟給server,而server根據worker提交的信息,將這些job分發給worker來做,worker完成后也可返回結果,server將結果傳回client。舉個例子,在一個公司里面,有老板1、老板2、老板3(client),他們的任務就是出去喝酒唱歌拉項目(job),將拉來的項目直接交給公司的主管(server),而主管並不親自來做這些項目,他將這些項目分給收手下的員工(worker)來做,員工做完工作后,將結果交給主管,主管將結果報告給老板們即可。

要使用gearman,首先得安裝server,下載地址:https://launchpad.net/gearmand。當下載安裝完成后,可以啟動gearmand,啟動有很多參數選項,可以man gearmand來查看,主要的 選項有:

 

  • -b, --backlog=BACKLOG       Number of backlog connections for listen. 
  • -d, --daemon                Daemon, detach and run in the background. 
  • -f, --file-descriptors=FDS  Number of file descriptors to allow for the process                             
  • (total connections will be slightly less). Default     is max allowed for user. 
  • -h, --help                  Print this help menu. 
  • -j, --job-retries=RETRIES   Number of attempts to run the job before the job  server removes it. Thisis helpful to ensure a bad  job does not crash all available workers. Default  is no limit. 
  • -l, --log-file=FILE         Log file to write errors and information to. Turning this option on also forces the first  verbose level to be enabled. 
  • -L, --listen=ADDRESS        Address the server should listen on. Default is  INADDR_ANY. 
  • -p, --port=PORT             Port the server should listen on. 
  • -P, --pid-file=FILE         File to write process ID out to. 
  • -r, --protocol=PROTOCOL     Load protocol module. 
  • -R, --round-robin           Assign work in round-robin order per  workerconnection. The default is to assign work in  the order of functions added by the worker. 
  • -q, --queue-type=QUEUE      Persistent queue type to use. 
  • -t, --threads=THREADS       Number of I/O threads to use. Default=0. 
  • -u, --user=USER             Switch to given user after startup. 
  • -v, --verbose               Increase verbosity level by one. 
  • -V, --version               Display the version of gearmand and exit. 
  • -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received.   The default is to wakeup all available workers.

 

啟動gearmand:

 

[plain]  view plain  copy
 
  1. sudo gearmand --pid-file=/var/run/gearmand/gearmand.pid --daemon --log-file=/var/log/gearman.log  
若提示沒有/var/log/gearman.log這個文件的話,自己新建一個就可以了。

 

1.2 實例化queue與容錯

Gearman默認是將queue保存在內存中的,這樣能夠保障速速,但是遇到宕機或者server出現故障時,在內存中緩存在queue中的任務將會丟失。Gearman提供了了queue實例化的選項,能夠將queue保存在數據庫中,比如:SQLite3、Drizzle、MySQL、PostgresSQL、Redis(in dev)、MongoDB(in dev).在執行任務前,先將任務存入持久化隊列中,當執行完成后再將該任務從持久化隊列中刪除。要使用db來實例化queue,除了在啟動時加入-q參數和對應的數據庫之外,還需要根據具體的數據庫使用相應的選項,例如使用sqlit3來實例化queue,並指明使用用來存儲queue的文件:

 

[plain]  view plain  copy
 
  1. gearmand -d -q libsqlite3 --libsqlite3-db=/tmp/demon/gearman.db --listen=localhost --port=4370  

 

再如使用mysql來實例化queue,選項為:

 

[plain]  view plain  copy
 
  1. <pre name="code" class="plain">/usr/local/gearmand/sbin/gearmand  -d  -u root \  
  2. –queue-type=MySQL \  
  3. –mysql-host=localhost \  
  4. –mysql-port=3306 \  
  5. –mysql-user=gearman \  
  6. –mysql-password=123456 \  
  7. –mysql-db=gearman \  
  8. –mysql-table=gearman_queue  
 
           

還要創建相應的數據庫和表,並創建gearman用戶,分配相應的權限:

 

[sql]  view plain  copy
 
  1. CREATE DATABASE gearman;  
  2. CREATE TABLE `gearman_queue` (  
  3. `id` int(10) unsigned NOT NULL AUTO_INCREMENT,  
  4. `unique_key` varchar(64) NOT NULL,  
  5. `function_name` varchar(255) NOT NULL,  
  6. `when_to_run` int(10) NOT NULL,  
  7. `priority` int(10) NOT NULL,  
  8. `data` longblob NOT NULL,  
  9. PRIMARY KEY (`id`),  
  10. UNIQUE KEY `unique_key_index` (`unique_key`,`function_name`)  
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  
  12.   
  13. create USER gearman@localhost identified by ’123456′;  
  14. GRANT ALL on gearman.* to gearman@localhost;  

 

可以在gearman的配置文件中加入相關配置,以免每次啟動都需要寫一堆東西:

 

 

[plain]  view plain  copy
 
  1. # /etc/conf.d/gearmand: config file for /etc/init.d/gearmand  
  2.   
  3. # Persistent queue store  
  4. # The following queue stores are available:  
  5. # drizzle|memcache|mysql|postgre|sqlite|tokyocabinet|none  
  6. # If you do not wish to use persistent queues, leave this option commented out.  
  7. # Note that persistent queue mechanisms are mutally exclusive.  
  8. PERSISTENT="mysql"  
  9.   
  10. # Persistent queue settings for drizzle, mysql and postgre  
  11. #PERSISTENT_SOCKET=""  
  12. PERSISTENT_HOST="localhost"  
  13. PERSISTENT_PORT="3306"  
  14. PERSISTENT_USER="gearman"  
  15. PERSISTENT_PASS="your-pass-word-here"  
  16. PERSISTENT_DB="gearman"  
  17. PERSISTENT_TABLE="gearman_queue"  
  18.   
  19. # Persistent queue settings for sqlite  
  20. #PERSISTENT_FILE=""  
  21.   
  22. # Persistent queue settings for memcache  
  23. #PERSISTENT_SERVERLIST=""  
  24.   
  25. # General settings  
  26. #  
  27. # -j, --job-retries=RETRIES   Number of attempts to run the job before the job  
  28. #                             server removes it. Thisis helpful to ensure a bad  
  29. #                             job does not crash all available workers. Default  
  30. #                             is no limit.  
  31. # -L, --listen=ADDRESS        Address the server should listen on. Default is  
  32. #                             INADDR_ANY.  
  33. # -p, --port=PORT             Port the server should listen on. Default=4730.  
  34. # -r, --protocol=PROTOCOL     Load protocol module.  
  35. # -t, --threads=THREADS       Number of I/O threads to use. Default=0.  
  36. # -v, --verbose               Increase verbosity level by one.  
  37. # -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received.  
  38. #                             The default is to wakeup all available workers.  
  39. GEARMAND_PARAMS="-L 127.0.0.1 --verbose=DEBUG"  

這其實並不是一個很好的方案,因為當使用數據庫來實例化queue時,會增加兩個步驟:Client和worker必須連接到server上去讀寫job,並且數據庫在處理的速度上也會大大降低。在大並發任務量的情況下,性能會受到直接影響,你會發現SQLite或者mysql並不能滿足處理大量BLOB的性能要求,job會不斷地積攢而得不到處理,給一個任務猶如石牛如一樣海毫無反應。歸根結底,需要根據自己的應用場景,合理設計一些測試用例和自動化腳本,通過實際的運行狀態進行參數的調整。

job分布式系統一個基本的特點就是要有單點容錯能力(no  single point failure),還不能有單點性能瓶頸(no single point of bottleneck)。即:一個節點壞了不影響整個系統的業務,一個節點的性能不能決定整個系統的性能。那如果server掛了該怎么辦?解決方法是使用多個server:

 

[plain]  view plain  copy
 
  1. gearmand -d -q libsqlite3  --listen=localhost --port=4370  
[plain]  view plain  copy
 
  1. gearmand -d -q libsqlite3  --listen=localhost --port=4371  

 

每個client連接多個server,並使用負載最低的那個server,當該server掛掉之后,gearman會自動切換到另一個server上,如下圖所示:

 

 

1.3 輪詢調度

當job不斷地增加時,我們可能需要增加worker服務器來增加處理能力,但你可能會發現任務並不是均勻地分布在各個worker服務器上,因為server分配任務給worker的方式默認按照循序分配的,比如你現有worker-A,在server上注冊了5個worker進程,隨着任務的增加,又加了一台worker-B,並向同一個server注冊了5個worker進程。默認情況下,server會按照worker注冊的先后順序進行調度,即:只有給worker-A分配滿任務后才會給worker-B分配任務,即分配方式是wA, wA,wA, wA,wA,wB, wB,wB, wB, wB。為了能夠給worker-A和worker-B均勻地分配任務,server可以采用輪詢的方式給worker服務器分配任務,即分配方式為: wA, wB, wA, wB ...,那么在啟動server時加上選項:-R或者--round-robin

 

1.4 受限喚醒

 

根據gearman協議的設計,Worker 如果發現隊列中沒有任務需要處理,是可以通過發送 PRE_SLEEP 命令給服務器,告知說自己將進入睡眠狀態。在這個狀態下,Worker 不會再去主動抓取任務,只有服務器發送 NOOP 命令喚醒后,才會恢復正常的任務抓取和處理流程。因此 Gearmand 在收到任務時,會去嘗試喚醒足夠的 Worker 來抓取任務;此時如果 Worker 的總數超過可能的任務數,則有可能產生驚群效應。而通過 –worker-wakeup 參數,則可以指定收到任務時,需要喚醒多少個 Worker 進行處理,避免在 Worker 數量非常大時,發送大量不必要的 NOOP 報文,試圖喚醒所有的 Worker。

 

1.6 線程模型

Gearman中有三種線程:

  1. 監聽和管理線程。只有一個(負責接收連接,然后分配給I/O線程來處理,如果有多個I/O線程的話,同時也負責啟動和關閉服務器,采用libevent來管理socket和信號管道)
  2. I/O線程。可以有多個(負責可讀可寫的系統調用和對包初步的解析,將初步解析的包放入各自的異步隊列中,每個I/O線程都有自己的隊列,所以競爭很少,通過-t選項來指定I/O線程數)
  3. 處理線程。只有一個(負責管理各種信息列表和哈希表,比如跟蹤唯一鍵、工作跟蹤句柄、函數、工作隊列等。將處理結果信息包返回給I/O線程,I/O線程將該包挑選出來並向該連接發送數據)
其中第1, 3種線程對全局處理性能沒有直接影響,雖然處理線程有可能成為瓶頸,但他的工作足夠簡單消耗可忽略不計,因此我們的性能調優主要目標是在IO線程的數量。對每個IO線程來說,它都會有一個libevent的實例;所有Gearman的操作會以異步任務方式提交到處理線程,並由IO線程獲取完成實際操作,因此IO線程的數量是與可並行處理任務數成正比。Gearmand 提供 -t 參數調整總IO線程數,需要使用 libevent 1.4 以上版本提供多線程支持。

 

 

進程句柄數

另外一個影響大規模部署的是進程句柄數,Gearman會為每一個注冊的Worker分配一個fd(文件描述符),而這個fd的總數是受用戶限制的,可以使用 ulimit -n 命令查看當前限制
[plain]  view plain  copy
 
  1. flier@debian:~$ ulimit -n  
  2. 1024  
  3. flier@debian:~$ ulimit -HSn 4096 // 設置進程句柄數的最大軟硬限制  
  4. 4096  
也就是說gearman缺省配置下,最多允許同時有小於1024個worker注冊上來,fd用完之后的Worker和Client會出現連接超時或無響應等異常情況。因此,發生類似情況時,我們應首先檢查 /proc/[PID]/fd/ 目錄下的數量,是否已經超過 ulimit -n 的限制,並根據需要進行調整。而全系統的打開文件設置,可以參考 /proc/sys/fs/file-max 文件,並通過 sysctl -w fs.file-max=[NUM] 進行修改。
[plain]  view plain  copy
 
  1. flier@debian:~$ cat /proc/sys/fs/file-max  
  2. 24372  
  3. flier@debian:~# sysctl -w fs.file-max=100000  
  4. 100000  
Gearmand 本身也提供了調整句柄數量限制的功能,啟動時則可以通過 -f或者–file-descriptors 參數指定,但非特權進程不能設置超過soft limit的數額。"The soft limit is the value that the kernel enforces for the corresponding resource. The hard limit acts as a ceiling for the soft limit: an unprivileged process may only set its soft limit to a value in the range from 0 up to the hard limit, and (irreversibly) lower its hard limit. A privileged process (under Linux: one with the
CAP_SYS_RESOURCE capability) may make arbitrary changes to either limit value."


2. Client

對於發送單個job,python-gearman提供了一個簡單的函數:submit_job,可以將job發送到server,其定義如下:
GearmanClient. submit_job (task, data, unique=None, priority=None, background=False, wait_until_complete=True, max_retries=0,poll_timeout=None )
 
下面來看看gearman的一個簡單樣例:
[python]  view plain  copy
 
  1. import gearman  
  2. import time  
  3. from gearman.constants import JOB_UNKNOWN  
  4.   
  5. def check_request_status(job_request):  
  6.     """check the job status"""  
  7.     if job_request.complete:  
  8.         print 'Job %s finished! Result: %s - %s' % (job_request.job.unique, job_request.state, job_request.result)  
  9.     elif job_request.time_out:  
  10.         print 'Job %s timed out!' % job_request.unique  
  11.     elif job_request.state == JOB_UNKNOWN:  
  12.         print "Job %s connection failed!" % job_request.unique  
  13.   
  14. gm_client = gearman.GearmanClient(['localhost:4730','localhost:4731'])  
  15.   
  16. complete_job_request = gm_client.submit_job("reverse", "Hello World!")  
  17. check_request_status(complete_job_request)  
gm_client連接到本地的4730/4731端口的server上,然后用submit_job函數將”reverse“和參數“Hello World!"傳給server,返回一個request,最后用check_request_status()函數檢查這個request的狀態。是不是很簡單?
 

2.1  task與job

task與job是有區別的區別主要在於:
  1. Task是一組job,在下發后會執行並返回結果給調用方
  2. Task內的子任務悔下發給多個work並執行
  3. client下放給server的任務為job,而整個下方並返回結果的過程為task,每個job會在一個work上執行
  4. task是一個動態的概念,而job是一個靜態的概念。這有點類似“進程”和“程序”概念的區別。既然是動態的概念,就有完成(complete)、超時(time_out)、攜帶的job不識別(JOB_UNKNOWN)等狀態

 

2.2 job優先級(priority)

client在發送job的時候,可以設定job的優先級,只需要在submit_job函數中添加選項“priority=gearman.PRIORITY_HIGH”即可創建高優先級task,priority可以有三個選項:PRIORITY_HIGH、PRIORITY_LOW、PRIORITY_NONE(default)
 

2.3 同步與異步(background)

默認情況下,client以同步方式發送job到server,所謂的同步,即client在向server發送完job后,不停地詢問該(組)job執行的情況,直到server返回結果。而異步方式則是client在得知task創建完成之后,不管該task的執行結果。要使client采用異步方式,則在submit_job加入參數“background=True”即可。下面展示了gearman同步/異步的方式時的時序圖。
由上面的同步時序圖可知,client端在job執行的整個過程中,與job server端的鏈接都是保持着的,這也給job完成后job server返回執行結果給client提供了通路。同時,在job執行過程當中,client端還可以發起job status的查詢。當然,這需要worker端的支持的。
由上面的異步時序圖可知,client提交完job,job server成功接收后返回JOB_CREATED響應之后,client就斷開與job server之間的鏈接了。后續無論發生什么事情,client都是不關心的。同樣,job的執行結果client端也沒辦法通過Gearman消息框架 獲得。
 

2.4 阻塞與非阻塞(wait_until_complete)

client創建task時,默認情況下使用的是阻塞模式,所謂的阻塞模式在進程上的表現為:在執行完submit_job后,卡在此處等待server返回結果。而非阻塞模式則是一旦job被server接收,程序可以繼續向下執行,我們可以在后面適當的位置(程序最后或者需要用到返回結果的地方)來檢查並取回這些task的狀態和結果。要使用非阻塞模式,則在submit_job里加入選項“wait_until_complete=False”即可。
 

2.5 送多個job

 

  • GearmanClient.submit_multiple_jobs(jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None)
Takes a list of jobs_to_submit with dicts of{‘task’: task, ‘data’: data, ‘unique’: unique, ‘priority’: priority}
這里jobs_to_submit是一組job,每個job是上述格式的字典,這里解釋一下unique,unique是設置task的unique key,即在小結2.1中的job_request.job.unique的值,如果不設置的話,會自動分配。
  • GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have moved to STATE_PENDING
  • GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have completed or failed
  • GearmanClient.submit_multiple_requests(job_requests, wait_until_complete=True, poll_timeout=None)
Take Gearman JobRequests, assign them connections, and request that they be done.
  • GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have moved to STATE_PENDING
  • GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have completed or failed

 

下面是官網給的一個同步非阻塞方式發送多個job的例子,在該例子的最后,在取得server返回結果之前,用了wait_until_jobs_completed函數來等待task中的所有job返回結果:

 

[python]  view plain  copy
 
  1. import time  
  2. gm_client = gearman.GearmanClient(['localhost:4730'])  
  3.   
  4. list_of_jobs = [dict(task="task_name", data="binary data"), dict(task="other_task", data="other binary data")]  
  5. submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)  
  6.   
  7. # Once we know our jobs are accepted, we can do other stuff and wait for results later in the function  
  8. # Similar to multithreading and doing a join except this is all done in a single process  
  9. time.sleep(1.0)  
  10.   
  11. # Wait at most 5 seconds before timing out incomplete requests  
  12. completed_requests = gm_client.wait_until_jobs_completed(submitted_requests, poll_timeout=5.0)  
  13. for completed_job_request in completed_requests:  
  14.     check_request_status(completed_job_request)  
下面這個例子中,用到了submit_multiple_requests函數對超時的請求再次檢查。

 

 

[python]  view plain  copy
 
  1. import time  
  2. gm_client = gearman.GearmanClient(['localhost:4730'])  
  3.   
  4. list_of_jobs = [dict(task="task_name", data="task binary string"), dict(task="other_task", data="other binary string")]  
  5. failed_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False)  
  6.   
  7. # Let's pretend our assigned requests' Gearman servers all failed  
  8. assert all(request.state == JOB_UNKNOWN for request in failed_requests), "All connections didn't fail!"  
  9.   
  10. # Let's pretend our assigned requests' don't fail but some simply timeout  
  11. retried_connection_failed_requests = gm_client.submit_multiple_requests(failed_requests, wait_until_complete=True, poll_timeout=1.0)  
  12.   
  13. timed_out_requests = [job_request for job_request in retried_requests if job_request.timed_out]  
  14.   
  15. # For our timed out requests, lets wait a little longer until they're complete  
  16. retried_timed_out_requests = gm_client.submit_multiple_requests(timed_out_requests, wait_until_complete=True, poll_timeout=4.0)  

 

2.6 序列化

默認情況下,gearman的client只能傳輸的data只能是字符串格式的,因此,要傳輸python數據結構,必須使用序列化方法。所幸的是,GearmanClient提供了data_encoder,允許定義序列化和反序列化方法,例如:
[python]  view plain  copy
 
  1. import pickle  
  2.   
  3. class PickleDataEncoder(gearman.DataEncoder):  
  4.     @classmethod  
  5.     def encode(cls, encodable_object):  
  6.         return pickle.dumps(encodable_object)  
  7.  
  8.     @classmethod  
  9.     def decode(cls, decodable_string):  
  10.         return pickle.loads(decodable_string)  
  11.   
  12. class PickleExampleClient(gearman.GearmanClient):  
  13.     data_encoder = PickleDataEncoder  
  14.   
  15. my_python_object = {'hello': 'there'}  
  16.   
  17. gm_client = PickleExampleClient(['localhost:4730'])  
  18. gm_client.submit_job("task_name", my_python_object)  


3 worker

3.1 主要API

worker端同樣提供了豐富的API,主要有:
  • GearmanWorker.set_client_id(client_id):設置自身ID
  • GearmanWorker.register_task(task, callback_function):為task注冊處理函數callback_function,其中callback_function的定義格式為:
    [python]  view plain  copy
     
    1. def function_callback(calling_gearman_worker, current_job):  
    2.     return current_job.data  
  • GearmanWorker.unregister_task(task):注銷worker上定義的函數
  • GearmanWorker.work(poll_timeout=60.0): 無限次循環, 完成發送過來的job.
  • GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
  • GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None):Send a Gearman JOB_STATUS update for an inflight job
  • GearmanWorker.send_job_warning(current_job, data, poll_timeout=None):Send a Gearman JOB_WARNING update for an inflight job
 

3.2 簡單示例

而worker端其實和client端差不多,也是要連接到server端,不同的是,worker端需要綁定函數來處理具體的job:
[python]  view plain  copy
 
  1. import gearman  
  2.   
  3. gm_worker = gearman.GearmanWorker(['localhost:4730'])  
  4.   
  5. def task_listener_reverse(gearman_worker, gearman_job):  
  6.     print 'Reversing string:' + gearman_job.data  
  7.     return gearman_job.data[::-1]  
  8.   
  9. gm_worker.set_client_id("worker_revers")  
  10. gm_worker.register_task("reverse", task_listener_reverse)  
  11.   
  12. gm_worker.work()  
可以看到,在worker同樣要連接到本地4730端口的server,給了一個job處理函數,反序job傳來的數據並返回,register_task函數將名為”reverse“的job和task_listener_reverse函數注冊在一起,說明該函數用來處理名為”reverse”的job的,最后調用work函數來工作。來,我們看看效果吧,首先啟用client.py文件,此時因為worker還沒啟動,client在此阻塞住,等待task處理。然后運行worker程序,可以看到client和worker的輸出:
 



3.2 返回結果

worker提供了3個API可以在worker函數中發送job的數據、狀態和警告:
  • GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
  • GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None): Send a Gearman JOB_STATUS update for an inflight job
  • GearmanWorker.send_job_warning(current_job, data, poll_timeout=None): Send a Gearman JOB_WARNING update for an inflight job
下面是來自官網的例子:
[python]  view plain  copy
 
  1. gm_worker = gearman.GearmanWorker(['localhost:4730'])  
  2.   
  3. # See gearman/job.py to see attributes on the GearmanJob  
  4. # Send back a reversed version of the 'data' string through WORK_DATA instead of WORK_COMPLETE  
  5. def task_listener_reverse_inflight(gearman_worker, gearman_job):  
  6.     reversed_data = reversed(gearman_job.data)  
  7.     total_chars = len(reversed_data)  
  8.   
  9.     for idx, character in enumerate(reversed_data):  
  10.         gearman_worker.send_job_data(gearman_job, str(character))  
  11.         gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)  
  12.   
  13.     return None  
  14.   
  15. # gm_worker.set_client_id is optional  
  16. gm_worker.register_task('reverse', task_listener_reverse_inflight)  
  17.   
  18. # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity  
  19. gm_worker.work()  
 

3.3 數據序列化

同client一樣,worker端也只能發送字符類型的數據,如果想要發送python里的結構體,必須用序列化將其轉化成字符串。與client一樣,worker也有一個encoder,你同樣可以在里面定義序列化和反序列化的方法,不過值得注意的是,worker端的序列化和反序列化方法必須對應,否則client和worker端的發送的數據都不能被彼此爭取的反序列化。下面演示了使用JSON方法來進行序列化:
[python]  view plain  copy
 
  1. import json # Or similarly styled library  
  2. class JSONDataEncoder(gearman.DataEncoder):  
  3.     @classmethod  
  4.     def encode(cls, encodable_object):  
  5.         return json.dumps(encodable_object)  
  6.  
  7.     @classmethod  
  8.     def decode(cls, decodable_string):  
  9.         return json.loads(decodable_string)  
  10.   
  11. class DBRollbackJSONWorker(gearman.GearmanWorker):  
  12.     data_encoder = JSONDataEncoder  
  13.   
  14.     def after_poll(self, any_activity):  
  15.         # After every select loop, let's rollback our DB connections just to be safe  
  16.         continue_working = True  
  17.         # self.db_connections.rollback()  
  18.         return continue_working  

worker端提供了rollback函數,每次輪詢完查看socket是否活躍或者timeout時就會調用這個函數:
GearmanWorker. after_poll ( any_activity )

Polling callback to notify any outside listeners whats going on with the GearmanWorker.

Return True to continue polling, False to exit the work loop


4 admin_client

前面講了Client和Worker,對於server也提供了一些API,可以對其進行監控和設置,比如:設置queue大小、關閉連接、查看狀態、查看worker等,用於操作的對象時GearmanAdminClient,其定義如下:

class gearman.admin_client.GearmanAdminClient(host_list=None,poll_timeout=10.0)
所提供的API有:
  • GearmanAdminClient.send_maxqueue(task, max_size): Sends a request to change the maximum queue size for a given task
  • GearmanAdminClient.send_shutdown(graceful=True): Sends a request to shutdown the connected gearman server
  • GearmanAdminClient.get_status():Retrieves a list of all registered tasks and reports how many items/workers are in the queue
  • GearmanAdminClient.get_version(): Retrieves the version number of the Gearman server
  • GearmanAdminClient.get_workers():Retrieves a list of workers and reports what tasks they’re operating on
  • GearmanAdminClient.ping_server(): Sends off a debugging string to execute an application ping on the Gearman server, return the response time
[python]  view plain  copy
 
  1. gm_admin_client = gearman.GearmanAdminClient(['localhost:4730'])  
  2.   
  3. # Inspect server state  
  4. status_response = gm_admin_client.get_status()  
  5. version_response = gm_admin_client.get_version()  
  6. workers_response = gm_admin_client.get_workers()  
  7. response_time = gm_admin_client.ping_server()  


5. job對象

5.1 GearmanJob

GearmanJob對象提供了發送到server的job的最基本信息,其定義如下:
class gearman.job.GearmanJob(connection, handle, task, unique, data)

 server信息

當我們得到一個job對象后,想知道與之相連的server信息時,就可以調用如下兩個屬性:
  • GearmanJob.connection: GearmanConnection - Server assignment. Could be None prior to client job submission
  • GearmanJob.handle:string - Job’s server handle. Handles are NOT interchangeable across different gearman servers
 
 job參數
  • GearmanJob.task:string - Job’s task
  • GearmanJob.unique:string - Job’s unique identifier (client assigned)
  • GearmanJob.data:binary - Job’s binary payload
 

5.2  GearmanJobRequest

GearmanJobRequest是job請求的狀態跟蹤器,代表一個job請求,可用於GearmanClient中,其定義如下:
class gearman.job.GearmanJobRequest(gearman_jobinitial_priority=Nonebackground=False,max_attempts=1) 

跟蹤job發送

  • GearmanJobRequest.gearman_job:             GearmanJob - Job that is being tracked by this GearmanJobRequest object
  • GearmanJobRequest.priority:                PRIORITY_NONE [default]、PRIORITY_LOW、PRIORITY_HIGH
  • GearmanJobRequest.background:              boolean - Is this job backgrounded?
  • GearmanJobRequest.connection_attempts:     integer - Number of attempted connection attempts
  • GearmanJobRequest.max_connection_attempts: integer - Maximum number of attempted connection attempts before raising an exception
 

跟蹤job執行過程

  • GearmanJobRequest.result:    binary - Job’s returned binary payload - Populated if and only if JOB_COMPLETE
  • GearmanJobRequest.exception:    binary - Job’s exception binary payload
  • GearmanJobRequest.state:     
  • GearmanJobRequest.timed_out:    boolean - Did the client hit its polling_timeout prior to a job finishing?
  • GearmanJobRequest.complete:     boolean - Does the client need to continue to poll for more updates from this job?
其中GearmanJobRequest.state的返回值可以有:
  • JOB_UNKNOWN - Request state is currently unknown, either unsubmitted or connection failed
  • JOB_PENDING - Request has been submitted, pending handle
  • JOB_CREATED - Request has been accepted
  • JOB_FAILED - Request received an explicit job failure (job done but errored out)
  • JOB_COMPLETE - Request received an explicit job completion (job done with results)
 

跟蹤運行中的job狀態

某些特定的GearmanJob在實際完成之前就可能發回數據。GearmanClient用一些隊列來保存跟蹤這些發回數據的時間和內容等
  • GearmanJobRequest.warning_updates: collections.deque - Job’s warning binary payloads
  • GearmanJobRequest.data_updates:        collections.deque - Job’s data binary payloads
  • GearmanJobRequest.status:                       dictionary - Job’s status
其中,GearmanJobRequest.status返回job的狀態是一個字典,內容有:
  • handle - string - Job handle
  • known - boolean - Is the server aware of this request?
  • running - boolean - Is the request currently being processed by a worker?
  • numerator - integer
  • denominator - integer
  • time_received - integer - Time last updated

 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM