前言
本篇文章介紹一下python的兩種分布式框架,一種是rq,另一種是ray。使用分布式可以處理大量的並發且耗時的操作。
一、RQ框架
1.1 rq介紹
RedisQueue是一款輕量級的分布式異步任務隊列調度框架,基於redis數據庫作為broker,生產端將任務job存儲到redis數據庫中,消費端監聽隊列並取出任務執行。
1.2 安裝
pip install rq
1.3 生產端
from rq import Queue
import redis
# 根據需要寫邏輯
def count_read_files(file):
f = open(file)
lines = f.read()
f.close()
return count(lines)
# 連接redis指定庫
conn = redis.Redis(host='127.0.0.1', password='xxxxxx', port=6379, db=1)
q = Queue("file", connection=conn)
file_list = ["a.txt", "b.txt", "c.txt"]
# 生產發送任務到指定的redis隊列中去
for file in file_list:
q.enqueue_call(count_read_files, args(file,))
1.4 消費端
import redis
from multiprocessiong import Pool
from rq import Worker, Queue, Connection
# 連接redis指定的庫
conn = redis.Redis(host='127.0.0.1', password='xxxxxx', port=6379, db=1)
def worker(listen):
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
def run(num, listen):
try:
p = Pool(num)
for i in range(num):
p.apply_async(worker, args=(listen,))
p.close()
p.join()
except Exception as e:
print(e)
if __name__ == "__main__":
listen = ["file", "don", "image"]
cpu_num = 4
run(cpu_num, listen)
1.5 redis集群
# 當redis是集群是,可以這樣獲取
from rq.cli.helpers import get_redis_from_config
settings = {'SENTINEL': {
'INSTANCES': [('redis1.cloud.bz', 6379),
('redis2.cloud.bz', 6379),
('redis3.cloud.bz', 6379)],
'SOCKET_TIMEOUT': None,
'PASSWORD': 'xxxxxx',
'DB': 1,
'MASTER_NAME': 'sentinel-127.0.0.1-6379'
}
}
conn = get_redis_from_config(settings)
二、Ray框架
2.1 介紹
Ray是UC Berkeley RISELab新推出的高性能分布式執行框架,它使用了和傳統分布式計算系統不一樣的架構和對分布式計算的抽象方式,具有比Spark更優異的計算性能
但是目前只要Linux下的編譯版本,Windows暫時無法使用。
2.2 安裝
pip install ray
2.3 示例
import ray
ray.init()
@ray.remote
def who():
return "xiao ming"
# 異步執行函數,返回對象ID
obj_id = who.remote()
name = ray.get(obj_id)
print(name)
"""
在Ray里,通過Python注解@ray.remote定義remote函數。使用此注解聲明的函數都會自帶一個默認的方法remote,通過此方法發起的函數調用都是以提交分布式任務的方式異步執行的,函數的返回值是一個對象id,使用ray.get內置操作可以同步獲取該id對應的對象。
"""
import ray
ray.init()
@ray.remote
def num_one():
return "one"
@ray.remote
def num_two():
return "two"
@ray.remote
def add(one, two):
res = one + two
return res
one_id = num_one.remote()
two_id = num_two.remote()
add_id = add.remote(one_id, two_id)
print(ray.get(add_id))
"""
例子代碼中,對函數num_one、num_two的調用是完全並行執行的,但是對函數add的調用依賴於num_one、num_two函數的返回結果。Ray可以保證函數add需要等待num_one、num_two函數的結果真正計算出來后才會執行。
這樣可以解決在異步中,某一部分依賴另一部分的結果問題。
"""
2.4 Ray集群
ray.init(redis_address="xxx.xxx.x.x")
2.5 常用方法
ray.put()
"""
可以將Python對象存入本地ObjectStore,並且異步返回一個唯一的ObjectID。通過該ID,Ray可以訪問集群中任一個節點上的對象
"""
ray.get()
"""
可以通過ObjectID獲取ObjectStore內的對象並將之轉換為Python對象。對於數組類型的對象,Ray使用共享內存機制減少數據的拷貝成本。而對於其它對象則需要將數據從ObjectStore拷貝到進程的堆內存中。
如果調用ray.get()操作時,對象尚未創建好,則get操作會阻塞,直到對象創建完成后返回。
"""
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ray.wait()
"""
支持批量的任務等待,基於此可以實現一次性獲取多個ObjectID對應的數據
"""
# 啟動5個remote函數調用任務
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個任務完成,超時時間為2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)