你真的會websocket嗎


Websocket

WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。
WebSocket通信協議於2011年被 IETF定為標准RFC 6455,並被RFC7936所補充規范。
 
WebSocket協議支持(在受控環境中運行不受信任的代碼的)客戶端與(選擇加入該代碼的通信的)遠程主機之間進行全雙工通信。用於此的安全模型是Web瀏覽器常用的基於原始的安全模式。 協議包括一個開放的握手以及隨后的TCP層上的消息幀。 該技術的目標是為基於瀏覽器的、需要和服務器進行雙向通信的(服務器不能依賴於打開多個HTTP連接(例如,使用XMLHttpRequest或<iframe>和長輪詢))應用程序提供一種通信機制。
 
這個協議目前仍是草案,只有最新的一些瀏覽器可以支持它。但是,它的好處是顯而易見的,隨着支持它的瀏覽器越來越多,我們將看到它越來越流行。(和以往的Web開發一樣,必須謹慎地堅持依賴可用的新功能並能在必要時回滾到舊技術的務實策略。)
 

Django用法

在1.9版本之后,Django實現了對Channels的支持,他所使用的是WebSocket通信,解決了實時通信的問題,而且在使用WebSocket進行通信的同時依舊能夠支持HTTP通信。

1.1目錄結構

在此結構中必須有硬性要求,具體如下:

新的目錄如下:
|-- channels_example
|    |--channels_example
|        |-- __init__.py
|        |-- settings.py
|        |-- urls.py
|        |-- wsgi.py
|        |-- routing.py   #必須
|        |-- consumer.py  #必須
|        |-- asgi.py
|    |-- manage.py

1.2配置settings.py文件

1.2.1將其添加到APP列表里

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'channels',
]

1.2.2然后,添加新的參數CHANNEL_LAYERS,如下:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "asgiref.inmemory.ChannelLayer",
        "ROUTING": "channels_example.routing.channel_routing",
    },
}

需要注意的是 ROUTING 參數,他是用來指定WebSocket表單的位置,當有WebSocket請求訪問時,就會根據這個路徑找到相應表單,調用相應的函數進行處理。
channels_example.routing 就是我們剛才建好的routing,py文件,里面的channel_routing我們下面會進行填充。

1.3填寫路由映射地址

from channels.routing import route
import consumers

channel_routing = [
    route('websocket.connect', consumers.ws_connect),        
    route('websocket.disconnect', consumers.ws_disconnect),        
    # route('websocket.receive', consumers.ws_message),        
    route('websocket.receive', consumers.ws_message_uuid),        
]

1.4路由映射到相對應的函數

from django.http import HttpResponse
from channels.handler import AsgiHandler

#message.reply_channel    一個客戶端通道的對象
#message.reply_channel.send(chunk)  用來唯一返回這個客戶端

#一個管道大概會持續30s


def ws_connect(message):
    auth = True

    if not auth:
        reply = json.dumps({'error': error})
        message.reply_channel.send({'text': reply, 'close': True})
    else:
        reply = "{}"
        message.reply_channel.send({'text': reply})
        print(">>> %s connected" % str(message))


def ws_disconnect(message):
    print("<<< %s disconnected" % str(message))
    # with message_queue.mutex:
    #     message_queue.queue.clear()
    while not message_queue.empty():
        try:
            message_queue.get(False)
        except Empty:
            continue

        message_queue.task_done()


def ws_message_uuid(message):
    task = Task.create(message)

    if task:
        message_queue.put(task)

 tornado用法

1.1Tornado的WebSocket模塊

Tornado在websocket模塊中提供了一個WebSocketHandler類。這個類提供了和已連接的客戶端通信的WebSocket事件和方法的鈎子。當一個新的WebSocket連接打開時,open方法被調用,而on_messageon_close方法分別在連接接收到新的消息和客戶端關閉時被調用。

此外,WebSocketHandler類還提供了write_message方法用於向客戶端發送消息,close方法用於關閉連接。

class EchoHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        self.write_message('connected!')

    def on_message(self, message):
        self.write_message(message)

正如你在我們的EchoHandler實現中所看到的,open方法只是使用WebSocketHandler基類提供的write_message方法向客戶端發送字符串"connected!"。每次處理程序從客戶端接收到一個新的消息時調用on_message方法,我們的實現中將客戶端提供的消息原樣返回給客戶端。這就是全部!讓我們通過一個完整的例子看看實現這個協議是如何簡單的吧。

WebSocketHandler.open()

當一個WebSocket連接建立后被調用。

WebSocketHandler.on_message(message)

當客戶端發送消息message過來時被調用,注意此方法必須被重寫。

WebSocketHandler.on_close()

當WebSocket連接關閉后被調用。

WebSocketHandler.write_message(message, binary=False)

向客戶端發送消息messagea,message可以是字符串或字典(字典會被轉為json字符串)。若binary為False,則message以utf8編碼發送;二進制模式(binary=True)時,可發送任何字節碼。

WebSocketHandler.close()

關閉WebSocket連接。

WebSocketHandler.check_origin(origin)

判斷源origin,對於符合條件(返回判斷結果為True)的請求源origin允許其連接,否則返回403。可以重寫此方法來解決WebSocket的跨域請求(如始終return True)。

1.2實例--工作websocket實際應用

#coding=utf-8

import uuid
import os
from works.actions import work
import hashlib
import json
import Queue
from threading import Thread
import numpy as np
import cv2
import base64
import jwt
import tornado.gen
from handlers.base_handler import BaseWebSocket
from config import MEDIA_ROOT
import time

message_queue = Queue.PriorityQueue()


def work_loop():
    while True:
        task = message_queue.get()

        iuuid = task.uuid
        offset_top = task.offset_top
        image_data = task.image_data
        channel = task.channel
        zoom = task.zoom
        rType = task.rType
        responseType = task.responseType

        print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top))

        filename = str(uuid.uuid1()) + '.jpg'
        filepath = os.path.join(MEDIA_ROOT, filename)

        with open(filepath, 'wb') as f:
            f.write(image_data.decode("base64"))

        if zoom != 1.0:
            im = cv2.imread(filepath)

            if im is None:
                continue

            osize = im.shape[1], im.shape[0]
            size = int(im.shape[1] * zoom), int(im.shape[0] * zoom)
            im = cv2.resize(im, size)
            cv2.imwrite(filepath, im)

        try:
            reply = work(filepath, use_crop=False, result=rType,responseType=responseType)
        except Exception as e:
            print("!!!!!! %s -> %s caused error" % (iuuid, filename))
            print(e)
            cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename))
            os.system(cmd.encode('utf-8'))
            continue


        if responseType == 'url':
            # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename
            rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename
            reply = {'url': rtn_url, 'uuid': iuuid}

        reply['uuid'] = iuuid
        channel.write_message({'text': json.dumps(reply)})
        print '%s end time:' % channel, time.time()


class BrowserWebSocket(BaseWebSocket):

    '''瀏覽器websocket服務器'''


    def open(self):
        '''新的WebSocket連接打開時被調用'''
        # message = {}
        # remote_ip = self.request.remote_ip
        # message['query_string']=self.get_argument('query_string')
        # message['remote_ip']=remote_ip
        # auth, error = verify_auth_token(message)
        auth = True
        error = 'error'

        if not auth:
            reply = json.dumps({'error': error})
            self.write_message({'text': reply, 'close': True})
        else:
            reply = "{}"
            self.write_message({'text': reply})
            print(">>> %s connected" % self.request.remote_ip)


    def on_message(self, message):
        '''連接收到新消息時被調用'''
        print '%s start time:'%self,time.time()
        task = Task.create(message,self)

        if task:
            message_queue.put(task)

    @tornado.gen.coroutine
    def on_messages(self, message):
        '''連接收到新消息時被調用'''
        task = Task.create(message,self)

        if task:
            message_queue.put(task)


    def on_close(self):
        '''客戶端關閉時被調用'''
        print("<<< %s disconnected" % str(self.request.remote_ip))
        # with message_queue.mutex:
        #     message_queue.queue.clear()
        while not message_queue.empty():
            try:
                message_queue.get(False)
            except Queue.Empty:
                continue

            message_queue.task_done()


    def check_origin(self, origin):
        '''允許WebSocket的跨域請求'''

        return True

class Task(object):
    def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args):
        self.uuid = uuid
        self.offset_top = int(float(offset_top))
        self.image_data = image_data
        self.channel = channel
        self.zoom = zoom
        self.rType = rType
        self.responseType = responseType

    @classmethod
    def create(clz, message,sel):
        # data = message.get('text')
        data = message

        try:
            params = json.loads(data[:150])

            image_data = data[150:]
            image_data = image_data.replace(" ", "+")

            params['image_data'] = image_data
            params['channel'] = sel

            # add Type
            if params.get('responseType') is None:
                params['responseType'] = 'url'

            # request type
            if params.get('rType') is None:
                params['rType'] = 'rl'

            task = Task(**params)


        except ValueError as e:
            task = None
            print(">>>message data error!")
            print(e)

        return task

    def __cmp__(self, other):
        return cmp(self.offset_top, other.offset_top)



def verify_auth_token(message):
    '''token 驗證'''

    token = message.get('query_string')
    secret_key = 'aoiakai'

    try:
        payload = jwt.decode(token, secret_key, algorithms=['HS256'])
        if payload.get('ip') != message.get('remote_ip'):
            return False, 'ip mismatch'
    except jwt.ExpiredSignatureError as e:
        print(e)
        return False, 'token expired'
    except Exception as e:
        print(e)
        return False, 'enter correct token'

    return True, ''


work_thread = Thread(target=work_loop)
work_thread.daemon = True
work_thread.start()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM