Websocket
Django用法
在1.9版本之后,Django實現了對Channels的支持,他所使用的是WebSocket通信,解決了實時通信的問題,而且在使用WebSocket進行通信的同時依舊能夠支持HTTP通信。
1.1目錄結構
在此結構中必須有硬性要求,具體如下:
新的目錄如下: |-- channels_example | |--channels_example | |-- __init__.py | |-- settings.py | |-- urls.py | |-- wsgi.py | |-- routing.py #必須 | |-- consumer.py #必須 | |-- asgi.py | |-- manage.py
1.2配置settings.py文件
1.2.1將其添加到APP列表里
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'channels', ]
1.2.2然后,添加新的參數CHANNEL_LAYERS,如下:
CHANNEL_LAYERS = { "default": { "BACKEND": "asgiref.inmemory.ChannelLayer", "ROUTING": "channels_example.routing.channel_routing", }, }
需要注意的是 ROUTING 參數,他是用來指定WebSocket表單的位置,當有WebSocket請求訪問時,就會根據這個路徑找到相應表單,調用相應的函數進行處理。
channels_example.routing 就是我們剛才建好的routing,py文件,里面的channel_routing我們下面會進行填充。
1.3填寫路由映射地址
from channels.routing import route import consumers channel_routing = [ route('websocket.connect', consumers.ws_connect), route('websocket.disconnect', consumers.ws_disconnect), # route('websocket.receive', consumers.ws_message), route('websocket.receive', consumers.ws_message_uuid), ]
1.4路由映射到相對應的函數
from django.http import HttpResponse from channels.handler import AsgiHandler #message.reply_channel 一個客戶端通道的對象 #message.reply_channel.send(chunk) 用來唯一返回這個客戶端 #一個管道大概會持續30s def ws_connect(message): auth = True if not auth: reply = json.dumps({'error': error}) message.reply_channel.send({'text': reply, 'close': True}) else: reply = "{}" message.reply_channel.send({'text': reply}) print(">>> %s connected" % str(message)) def ws_disconnect(message): print("<<< %s disconnected" % str(message)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Empty: continue message_queue.task_done() def ws_message_uuid(message): task = Task.create(message) if task: message_queue.put(task)
tornado用法
1.1Tornado的WebSocket模塊
Tornado在websocket模塊中提供了一個WebSocketHandler類。這個類提供了和已連接的客戶端通信的WebSocket事件和方法的鈎子。當一個新的WebSocket連接打開時,open方法被調用,而on_message和on_close方法分別在連接接收到新的消息和客戶端關閉時被調用。
此外,WebSocketHandler類還提供了write_message方法用於向客戶端發送消息,close方法用於關閉連接。
class EchoHandler(tornado.websocket.WebSocketHandler): def open(self): self.write_message('connected!') def on_message(self, message): self.write_message(message)
正如你在我們的EchoHandler實現中所看到的,open方法只是使用WebSocketHandler基類提供的write_message方法向客戶端發送字符串"connected!"。每次處理程序從客戶端接收到一個新的消息時調用on_message方法,我們的實現中將客戶端提供的消息原樣返回給客戶端。這就是全部!讓我們通過一個完整的例子看看實現這個協議是如何簡單的吧。
WebSocketHandler.open()
當一個WebSocket連接建立后被調用。
WebSocketHandler.on_message(message)
當客戶端發送消息message過來時被調用,注意此方法必須被重寫。
WebSocketHandler.on_close()
當WebSocket連接關閉后被調用。
WebSocketHandler.write_message(message, binary=False)
向客戶端發送消息messagea,message可以是字符串或字典(字典會被轉為json字符串)。若binary為False,則message以utf8編碼發送;二進制模式(binary=True)時,可發送任何字節碼。
WebSocketHandler.close()
關閉WebSocket連接。
WebSocketHandler.check_origin(origin)
判斷源origin,對於符合條件(返回判斷結果為True)的請求源origin允許其連接,否則返回403。可以重寫此方法來解決WebSocket的跨域請求(如始終return True)。
1.2實例--工作websocket實際應用
#coding=utf-8 import uuid import os from works.actions import work import hashlib import json import Queue from threading import Thread import numpy as np import cv2 import base64 import jwt import tornado.gen from handlers.base_handler import BaseWebSocket from config import MEDIA_ROOT import time message_queue = Queue.PriorityQueue() def work_loop(): while True: task = message_queue.get() iuuid = task.uuid offset_top = task.offset_top image_data = task.image_data channel = task.channel zoom = task.zoom rType = task.rType responseType = task.responseType print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top)) filename = str(uuid.uuid1()) + '.jpg' filepath = os.path.join(MEDIA_ROOT, filename) with open(filepath, 'wb') as f: f.write(image_data.decode("base64")) if zoom != 1.0: im = cv2.imread(filepath) if im is None: continue osize = im.shape[1], im.shape[0] size = int(im.shape[1] * zoom), int(im.shape[0] * zoom) im = cv2.resize(im, size) cv2.imwrite(filepath, im) try: reply = work(filepath, use_crop=False, result=rType,responseType=responseType) except Exception as e: print("!!!!!! %s -> %s caused error" % (iuuid, filename)) print(e) cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename)) os.system(cmd.encode('utf-8')) continue if responseType == 'url': # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename reply = {'url': rtn_url, 'uuid': iuuid} reply['uuid'] = iuuid channel.write_message({'text': json.dumps(reply)}) print '%s end time:' % channel, time.time() class BrowserWebSocket(BaseWebSocket): '''瀏覽器websocket服務器''' def open(self): '''新的WebSocket連接打開時被調用''' # message = {} # remote_ip = self.request.remote_ip # message['query_string']=self.get_argument('query_string') # message['remote_ip']=remote_ip # auth, error = verify_auth_token(message) auth = True error = 'error' if not auth: reply = json.dumps({'error': error}) self.write_message({'text': reply, 'close': True}) else: reply = "{}" self.write_message({'text': reply}) print(">>> %s connected" % self.request.remote_ip) def on_message(self, message): '''連接收到新消息時被調用''' print '%s start time:'%self,time.time() task = Task.create(message,self) if task: message_queue.put(task) @tornado.gen.coroutine def on_messages(self, message): '''連接收到新消息時被調用''' task = Task.create(message,self) if task: message_queue.put(task) def on_close(self): '''客戶端關閉時被調用''' print("<<< %s disconnected" % str(self.request.remote_ip)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Queue.Empty: continue message_queue.task_done() def check_origin(self, origin): '''允許WebSocket的跨域請求''' return True class Task(object): def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args): self.uuid = uuid self.offset_top = int(float(offset_top)) self.image_data = image_data self.channel = channel self.zoom = zoom self.rType = rType self.responseType = responseType @classmethod def create(clz, message,sel): # data = message.get('text') data = message try: params = json.loads(data[:150]) image_data = data[150:] image_data = image_data.replace(" ", "+") params['image_data'] = image_data params['channel'] = sel # add Type if params.get('responseType') is None: params['responseType'] = 'url' # request type if params.get('rType') is None: params['rType'] = 'rl' task = Task(**params) except ValueError as e: task = None print(">>>message data error!") print(e) return task def __cmp__(self, other): return cmp(self.offset_top, other.offset_top) def verify_auth_token(message): '''token 驗證''' token = message.get('query_string') secret_key = 'aoiakai' try: payload = jwt.decode(token, secret_key, algorithms=['HS256']) if payload.get('ip') != message.get('remote_ip'): return False, 'ip mismatch' except jwt.ExpiredSignatureError as e: print(e) return False, 'token expired' except Exception as e: print(e) return False, 'enter correct token' return True, '' work_thread = Thread(target=work_loop) work_thread.daemon = True work_thread.start()