生產端
import json import redis # 以下代碼是向redis 發命令 QUEUE = "code" # 隊列名稱key # redisPool = redis.ConnectionPool(host=config.get_redis_host(), port=6379, db=config.get_redis_db()) redisPool = redis.ConnectionPool(host='localhost', port=6379, db=8) client = redis.Redis(connection_pool=redisPool) def send_cmd(seaweed): json_cmd = json.dumps(seaweed, ensure_ascii=False) client.rpush(QUEUE, json_cmd) ll = list(range(100)) # get_weekend('20180325')}) if __name__ == "__main__": for k in ll: send_cmd({"label": k, 'timd': 20160503, 'timm': 20170430})
消費端多進程消費
import chardet import json import multiprocessing import redis # 以下代碼是向redis 發命令 QUEUE = "code" # redisPool = redis.ConnectionPool(host=config.get_redis_host(), port=6379, db=config.get_redis_db()) redisPool = redis.ConnectionPool(host='localhost', port=6379, db=8) client = redis.Redis(connection_pool=redisPool) # 以下代碼是向redis 取命令,並且采用多進程來實現計算 def func(a, b, c): print(a, b) def worker(pname): client = redis.Redis(connection_pool=redisPool) # client_ = redis.ConnectionPool(host='localhost', port=6379, db=8) while True: # print(client) # print(cmd) try: cmd = client.lpop(QUEUE) encode1 = chardet.detect(cmd)["encoding"] cmd = cmd.decode(encode1) except: cmd = None if cmd is None: return else: cmd = format_cmd(cmd) try: func(cmd["label"], cmd['timd'], cmd['timm']) # price_fix.update(cmd["city"], cmd["region"], cmd["name"]) # print(pname + ":", cmd, "計算成功") except Exception as ex: print(ex) print(pname + ":", cmd, "計算失敗") def format_cmd(cmd): return json.loads(cmd) if __name__ == "__main__": # 多進程消費 pro_num = 5 pool = multiprocessing.Pool(processes=pro_num) for pid in range(1, pro_num): pid = "PROC" + str(pid).zfill(3) pool.apply_async(worker, (pid,)) pool.close() pool.join()
