Python之mqtt接收異步消息


    由於系統上傳圖片有時候C端沒有接收到消息,需要做一個同步功能。C端加載圖片的時候不用請求遠程圖片庫而是加載本地的圖片,相當於做了個緩存,大大減少了C端加載圖片的時間,提高了用戶體驗。

一、功能作用

   mqtt是rabbitmq服務器的一個插件,可以用它發布與訂閱主題。

   這個同步功能,其實就是用了rabbitmq的應用場景之一異步處理。

二、流程步驟

  1、設置mqtt唯一ID,因客戶端id不能重復,所以選當前時間為唯一ID

client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
client = mqtt.Client(client_id)  # ClientId不能重復,所以使用當前時間

  2、設置rabbitmq服務器的用戶名和密碼

client.username_pw_set("dev", "YTc4Mj")

  3、訂閱主題

client.subscribe("sync")

  4、接收消息

recvmsg = msg.payload.decode("utf-8")

  5、處理消息

三、demo源碼

1、mqtt連接rabbitmq服務器

import paho.mqtt.client as mqtt
import time
import socketclient
import logger
import demjson
import common
import syncfile

log = logger.Logger("info")

HOST = "127.0.0.1"
PORT = 1883

def client_loop():
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id)  # ClientId不能重復,所以使用當前時間
    client.username_pw_set("dev", "YTc4Mj")  # 必須設置,否則會返回「Connected with result code 4」
    client.on_connect = on_connect
    client.on_message = on_message
    log.info('開始連接mqtt' + HOST + ':' + str(PORT))
    client.connect(HOST, PORT, 60)
    log.info('mqtt連接完成' + HOST + ':' + str(PORT))
    client.loop_forever()

def on_connect(client, userdata, flags, rc):
    log.info("Connected with result code " + str(rc))
    client.subscribe("projector-remote-control")
    log.info("訂閱消息 projector-remote-control")
    client.subscribe("holo-file-sync")
    log.info("訂閱消息 holo-file-sync")
    client.subscribe("sync")
    log.info("訂閱消息 sync")
    client.subscribe("sync-single-work")
    log.info("訂閱消息 sync-single-work")

def on_message(client, userdata, msg):
    recvmsg = msg.payload.decode("utf-8")
    log.info("收到消息" + recvmsg + ",開始執行命令")
    print(msg.topic + "" + recvmsg)

    if(msg.topic=='sync-single-work'):
      common.insert_sql(recvmsg)
    elif(msg.topic=='sync'):
      common.insert_sql(recvmsg)
    elif(msg.topic=='holo-file-sync'):
      common.insert_sql(recvmsg)
    elif(msg.topic=='projector-remote-control'):
      text = demjson.decode(msg)
      command = text['command']
      projectors = text['projectors']
      #socketclient.sendSocket(recvmsg)
      socketclient.handlerMsg(command,projectors)

if __name__ == '__main__':
    syncfile.scheduletask()
    syncfile.scheduleuncompletedtask()
    client_loop() 

2、消息處理

import logger
from threading import Timer
import os
import requests
from io import BytesIO
from PIL import Image 
import common

log = logger.Logger("info")

def scheduletask():
    t = Timer(10,scheduletask)
    t.start()
    #print('定時任務已開啟,等待接收參數中...')
    list = common.select_sql(3)
    if(list == None):
        return
    getimagesize(list[0],list[2])
def scheduleuncompletedtask():
    t = Timer(10,scheduleuncompletedtask)
    t.start()
    #print('定時任務已開啟,等待接收參數中...')
    list = common.select_sql(2)
    if(list == None):
        return
    getimagesize(list[0],list[2])

def getimagesize(id,url):
    savepath = common.readJson()['holoImageUrl']
    try:
       response = requests.get(url)
    except:
       common.delete_sql(id)
       return
    tmpIm = BytesIO(response.content)
    im = Image.open(tmpIm)
    imgpath = url[url.index('.com/')+5:]
    dirpath = imgpath[:imgpath.rindex('/')].replace('/','\\')
    filename= imgpath[imgpath.rfind('/')+1:]
    targetpath = savepath+'\\'+dirpath+'\\'
    filename1 = filename[:filename.find('!')]
    if (os.path.exists(targetpath)):pass
    else:os.makedirs(targetpath)
    im.save(targetpath+filename1)
    if(os.path.exists(targetpath + filename)):
        os.remove(targetpath + filename)
    os.rename(targetpath + filename1, targetpath + filename)
    #localfilesize = os.path.getsize(targetpath+filename)
    #remotefilesize = dict(response.headers).get('Content-Length', 0)
    # if(localfilesize == remotefilesize):  
    if(os.path.exists(targetpath+filename) and os.path.getsize(targetpath+filename) != 0): 
        common.delete_sql(id)
        log.info("增量圖片下載完成...")
    else:
        common.update_sql(id)
        log.info("增量圖片部分下載 ... ")

if __name__ == '__main__':
    scheduletask()

3、公共模塊

import json
import sqlite3
import re
from threading import Timer
import logger

log = logger.Logger("info")

def readJson():
    with open('config.json', 'rt') as jsonFile:
        val = jsonFile.read()
        Config = json.loads(val)
        return Config

def create_table():
    conn = sqlite3.connect('imagesnyc.xs')
    curs=conn.cursor() # 獲取游標
    try:
        create_tb_cmd='''
        CREATE TABLE IF NOT EXISTS image_sync(id INTEGER PRIMARY KEY AUTOINCREMENT,imgsize INTEGER,url TEXT,status INTEGER)
        '''
        #主要就是上面的語句
        curs.execute(create_tb_cmd)
        conn.commit()
    except:
        log.info('Create table failed')
        return False
    finally:
        return conn

def insert_sql(url):
    conn = create_table()
    curs=conn.cursor() # 獲取游標
    imgurls = re.split(',',url)
    for imageurl in imgurls:
       curs.execute("INSERT INTO image_sync(imgsize,url,status) VALUES('{}','{}','{}');".format(0,imageurl,3))#添加記錄
    conn.commit()
    log.info("插入完成")
             

def select_sql(status):
    conn = create_table()
    curs = conn.cursor() # 獲取游標
    curs.execute("select * from image_sync where status ='%s' order by id ASC LIMIT 1"%status)#查詢記錄
    list = curs.fetchone()
    conn.commit()
    return list

def update_sql(id):
    conn = create_table()
    curs=conn.cursor() # 獲取游標
    try:
      curs.execute("update image_sync set status = 2 where id = '%s'"%id)#更新記錄
      conn.commit()
    except:
        log.info('根據主鍵id更新失敗')
    finally:
        print('continue')

def delete_sql(id):
    conn = create_table()
    curs=conn.cursor() # 獲取游標
    curs.execute("delete from image_sync where id = '%s'"%id)#刪除記錄
    conn.commit()
    #log.info('成功刪除已下載完成的記錄')

 四、技術難點

    1、同步一個作品,接收到的作品字符串中可能包含多個圖片地址,則需要分割字符串然后存儲到內存數據庫或者sqllite免安裝數據庫,如果同時下載多張圖片會造成線程阻塞,所以用了python的定時器功能,設置好圖片預計需要下載的時間。

    2、由於接受到的消息字符串地址是預覽圖片格式,最后有!的,Image不能保存該圖片格式的地址,所以先截取最后面一個!,然后保存,保存成功后再改變為預覽地址,C端便可成功加載。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM