由於系統上傳圖片有時候C端沒有接收到消息,需要做一個同步功能。C端加載圖片的時候不用請求遠程圖片庫而是加載本地的圖片,相當於做了個緩存,大大減少了C端加載圖片的時間,提高了用戶體驗。
一、功能作用
mqtt是rabbitmq服務器的一個插件,可以用它發布與訂閱主題。
這個同步功能,其實就是用了rabbitmq的應用場景之一異步處理。
二、流程步驟
1、設置mqtt唯一ID,因客戶端id不能重復,所以選當前時間為唯一ID
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) client = mqtt.Client(client_id) # ClientId不能重復,所以使用當前時間
2、設置rabbitmq服務器的用戶名和密碼
client.username_pw_set("dev", "YTc4Mj")
3、訂閱主題
client.subscribe("sync")
4、接收消息
recvmsg = msg.payload.decode("utf-8")
5、處理消息
三、demo源碼
1、mqtt連接rabbitmq服務器
import paho.mqtt.client as mqtt import time import socketclient import logger import demjson import common import syncfile log = logger.Logger("info") HOST = "127.0.0.1" PORT = 1883 def client_loop(): client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) client = mqtt.Client(client_id) # ClientId不能重復,所以使用當前時間 client.username_pw_set("dev", "YTc4Mj") # 必須設置,否則會返回「Connected with result code 4」 client.on_connect = on_connect client.on_message = on_message log.info('開始連接mqtt' + HOST + ':' + str(PORT)) client.connect(HOST, PORT, 60) log.info('mqtt連接完成' + HOST + ':' + str(PORT)) client.loop_forever() def on_connect(client, userdata, flags, rc): log.info("Connected with result code " + str(rc)) client.subscribe("projector-remote-control") log.info("訂閱消息 projector-remote-control") client.subscribe("holo-file-sync") log.info("訂閱消息 holo-file-sync") client.subscribe("sync") log.info("訂閱消息 sync") client.subscribe("sync-single-work") log.info("訂閱消息 sync-single-work") def on_message(client, userdata, msg): recvmsg = msg.payload.decode("utf-8") log.info("收到消息" + recvmsg + ",開始執行命令") print(msg.topic + "" + recvmsg) if(msg.topic=='sync-single-work'): common.insert_sql(recvmsg) elif(msg.topic=='sync'): common.insert_sql(recvmsg) elif(msg.topic=='holo-file-sync'): common.insert_sql(recvmsg) elif(msg.topic=='projector-remote-control'): text = demjson.decode(msg) command = text['command'] projectors = text['projectors'] #socketclient.sendSocket(recvmsg) socketclient.handlerMsg(command,projectors) if __name__ == '__main__': syncfile.scheduletask() syncfile.scheduleuncompletedtask() client_loop()
2、消息處理
import logger from threading import Timer import os import requests from io import BytesIO from PIL import Image import common log = logger.Logger("info") def scheduletask(): t = Timer(10,scheduletask) t.start() #print('定時任務已開啟,等待接收參數中...') list = common.select_sql(3) if(list == None): return getimagesize(list[0],list[2]) def scheduleuncompletedtask(): t = Timer(10,scheduleuncompletedtask) t.start() #print('定時任務已開啟,等待接收參數中...') list = common.select_sql(2) if(list == None): return getimagesize(list[0],list[2]) def getimagesize(id,url): savepath = common.readJson()['holoImageUrl'] try: response = requests.get(url) except: common.delete_sql(id) return tmpIm = BytesIO(response.content) im = Image.open(tmpIm) imgpath = url[url.index('.com/')+5:] dirpath = imgpath[:imgpath.rindex('/')].replace('/','\\') filename= imgpath[imgpath.rfind('/')+1:] targetpath = savepath+'\\'+dirpath+'\\' filename1 = filename[:filename.find('!')] if (os.path.exists(targetpath)):pass else:os.makedirs(targetpath) im.save(targetpath+filename1) if(os.path.exists(targetpath + filename)): os.remove(targetpath + filename) os.rename(targetpath + filename1, targetpath + filename) #localfilesize = os.path.getsize(targetpath+filename) #remotefilesize = dict(response.headers).get('Content-Length', 0) # if(localfilesize == remotefilesize): if(os.path.exists(targetpath+filename) and os.path.getsize(targetpath+filename) != 0): common.delete_sql(id) log.info("增量圖片下載完成...") else: common.update_sql(id) log.info("增量圖片部分下載 ... ") if __name__ == '__main__': scheduletask()
3、公共模塊
import json import sqlite3 import re from threading import Timer import logger log = logger.Logger("info") def readJson(): with open('config.json', 'rt') as jsonFile: val = jsonFile.read() Config = json.loads(val) return Config def create_table(): conn = sqlite3.connect('imagesnyc.xs') curs=conn.cursor() # 獲取游標 try: create_tb_cmd=''' CREATE TABLE IF NOT EXISTS image_sync(id INTEGER PRIMARY KEY AUTOINCREMENT,imgsize INTEGER,url TEXT,status INTEGER) ''' #主要就是上面的語句 curs.execute(create_tb_cmd) conn.commit() except: log.info('Create table failed') return False finally: return conn def insert_sql(url): conn = create_table() curs=conn.cursor() # 獲取游標 imgurls = re.split(',',url) for imageurl in imgurls: curs.execute("INSERT INTO image_sync(imgsize,url,status) VALUES('{}','{}','{}');".format(0,imageurl,3))#添加記錄 conn.commit() log.info("插入完成") def select_sql(status): conn = create_table() curs = conn.cursor() # 獲取游標 curs.execute("select * from image_sync where status ='%s' order by id ASC LIMIT 1"%status)#查詢記錄 list = curs.fetchone() conn.commit() return list def update_sql(id): conn = create_table() curs=conn.cursor() # 獲取游標 try: curs.execute("update image_sync set status = 2 where id = '%s'"%id)#更新記錄 conn.commit() except: log.info('根據主鍵id更新失敗') finally: print('continue') def delete_sql(id): conn = create_table() curs=conn.cursor() # 獲取游標 curs.execute("delete from image_sync where id = '%s'"%id)#刪除記錄 conn.commit() #log.info('成功刪除已下載完成的記錄')
四、技術難點
1、同步一個作品,接收到的作品字符串中可能包含多個圖片地址,則需要分割字符串然后存儲到內存數據庫或者sqllite免安裝數據庫,如果同時下載多張圖片會造成線程阻塞,所以用了python的定時器功能,設置好圖片預計需要下載的時間。
2、由於接受到的消息字符串地址是預覽圖片格式,最后有!的,Image不能保存該圖片格式的地址,所以先截取最后面一個!,然后保存,保存成功后再改變為預覽地址,C端便可成功加載。