twsited(5)--不同模塊用rabbitmq傳遞消息


  上一章,我們講到,用redis共享數據,以及用redis中的隊列來實現一個簡單的消息傳遞。其實在真實的過程中,不應該用redis來傳遞,最好用專業的消息隊列,我們python中,用到最廣泛的就是rabbitmq,雖然它是用erlang開發的,但真的非常好用,經過無數次驗證。如果大家不會安裝rabbitmq,請看我這篇文章,http://www.cnblogs.com/yueerwanwan0204/p/5319474.html   這篇文章講解了怎么安裝rabbitmq以及簡單的使用它。

  我們把上一章的圖再稍微修改一下,

  其實在真實的項目中,也這樣,一般來說,利用redis在不同模塊之間共享數據,利用rabbitmq來進行消息傳遞。我們這個項目只做到從web到flask,再到rabbitmq,傳遞給tcpserver,再下放給具體的tcpclient客戶端;其實還可以反向傳遞,即從tcp的client到tcp服務器,再到rabbitmq,到前端tcp或者前端http,但是這個前端tcp或者http要基於循環模式的,flask肯定不行。我們從下一章開始講tornado,用tornado來接受,並且做一個websocket,就可以下放下去。

  好了,說了這么多,我們來看一下代碼,首先,tcpserver這塊,我們之前用redis的隊列做消息隊列,現在修改一下,修改的大概代碼如下:

import pika
from pika.adapters import twisted_connection

RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'rabbitmq01'
RABBITMQ_PASSWORD = 'rabbitmq01'


class RabbitMQ(object):
    _connection = None
    _channel_receive_from_http = None

    @staticmethod
    @defer.inlineCallbacks
    def init_mq(ip_address, port):
        credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
        parameters = pika.ConnectionParameters(credentials=credentials)
        cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
        RabbitMQ._connection = yield cc.connectTCP(ip_address, port)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def set_channel_receive_from_back(user_factory):
        """
        設置rabbitmq消息接受隊列的channel,並且做好循環任務
        """
        RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel()
        yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp')
        queue_object, consumer_tag = yield 
RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True)
        l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory)
        l.start(0.5)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def read_from_mq(queue_object, chat_factory):
        """
        讀取接受到的消息隊列消息,並且處理
        """
        ch, method, properties, body = yield queue_object.get()

        if body:
            log.msg('Accept data from http successful!')
            chat_factory.process_data_from_mq(body)
            defer.returnValue(1)
        defer.returnValue(0)

  首先,大家要注意一下,由於twisted是異步的,所以不能采用原先阻塞的函數,連接或者接受或者發送消息,所有跟rabbitmq的連接,發送,接受,都要異步化,即都要返回defer對象。因為連接rabbitmq的本質,其實就是socket的網絡行為,任何網絡行為都有可能被阻塞,一旦阻塞,異步的效率會極其低下。(以后我們寫tornado也是這樣,一定要返回future對象)。

  我看到網上還有很多博客,在接受rabbitmq的消息的時候,居然開了另外一個進程或者線程,有時候這么做,程序運行起來沒問題,但涉及到異步的時候,還是會影響效率。都已經用異步的代碼了,就不應該大量使用多進程或者多線程。多進程或者多線程,會讓cpu調度頻繁切換,大量並發的時候,嚴重影響效率。

  詳細看上面的代碼,簡單的解釋一下,

  init_mq就是初始化消息隊列,先加入用戶名,密碼,返回一個類似與token的東西,然后用twisted客戶端來連接rabbitmq,其實就是socket行為,返回一個connection。

  set_channel_receive_from_back設置channel,其實就是定義一個管道,我從這個管道接受東西。接受並讀取的過程其實就是寫一個循環任務,這個循環任務每0.5秒執行一次,你也可以寫小一點,0.1秒執行一次,具體的看你需要設置。

  read_from_mq就是真正的讀取並處理的函數,我這邊在read_from_mq中,加了一個參數,就是這個工廠對象,因為接受的時候,一個工廠,就產生一個接受函數。然后讀取到消息以后,把消息傳遞到這個工廠對象的處理方法中,整個環節就完整了。

  RabbitMQ的3個方法全是靜態方法,所以我沒有生成RabbitMQ對象,直接使用這個類本身就可以了。所以在運行的時候,又加了如下代碼。

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)
task_receive_data_from_mq.start(0.1, now=False)

reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT)

reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf)

reactor.listenTCP(8124, cf)
reactor.run()

  看見我加的代碼沒有,一個init_mq,一個set_channel_receive_from_back。一個初始化消息隊列,初始化好以后,再設置channel,並且開始接受消息。

  整個tcpserver這塊就算完成了,下面是整個tcpserver的代碼

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, task, defer, protocol
import struct
import json
from twisted.python import log
import sys
import time
import txredisapi as redis
import pika
from pika.adapters import twisted_connection
log.startLogging(sys.stdout)

REDIS_HOST = 'localhost'
REDIS_PORT = 6380
REDIS_DB = 4
REDIS_PASSWORD = 'dahai123'

RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'rabbitmq01'
RABBITMQ_PASSWORD = 'rabbitmq01'


redis_store = redis.lazyConnectionPool(dbid=4, host='localhost', port=6380, password='dahai123')


@defer.inlineCallbacks
def check_token(phone_number, token):
    token_in_redis = yield redis_store.hget('user:%s' % phone_number, 'token')
    if token != token_in_redis:
        defer.returnValue(False)
    else:
        defer.returnValue(True)


class RabbitMQ(object):
    _connection = None
    _channel_receive_from_http = None

    @staticmethod
    @defer.inlineCallbacks
    def init_mq(ip_address, port):
        credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
        parameters = pika.ConnectionParameters(credentials=credentials)
        cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
        RabbitMQ._connection = yield cc.connectTCP(ip_address, port)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def set_channel_receive_from_back(user_factory):
        """
        設置rabbitmq消息接受隊列的channel,並且做好循環任務
        """
        RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel()
        yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp')
        queue_object, consumer_tag = yield RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True)
        l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory)
        l.start(0.5)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def read_from_mq(queue_object, chat_factory):
        """
        讀取接受到的消息隊列消息,並且處理
        """
        ch, method, properties, body = yield queue_object.get()

        if body:
            log.msg('Accept data from http successful!')
            chat_factory.process_data_from_mq(body)
            defer.returnValue(1)
        defer.returnValue(0)


class Chat(Protocol):
    def __init__(self, factory):
        self.factory = factory
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.last_heartbeat_time = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat,
            5: self.handle_heartbeat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        log.msg("[%s]:斷線" % self.phone_number.encode('utf-8'))
        if self.phone_number in self.factory.users:
            del self.factory.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到數據以后的操作
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                return

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4, 5]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)

            if self.state == "DATA":
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                return

    def handle_heartbeat(self, content):
        """
        處理心跳包
        """
        self.last_heartbeat_time = int(time.time())

    @defer.inlineCallbacks
    def handle_verify(self, content):
        """
        驗證函數
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        token = content.get('token')

        result = yield check_token(phone_number, token)

        if not result:
            send_content = json.dumps({'code': 0})
            self.send_content(send_content, 101, [phone_number])
            length = 12 + len(send_content)
            version = self.version
            command_id = 101
            header = [length, version, command_id]
            header_pack = struct.pack('!3I', *header)
            self.transport.write(header_pack + send_content)
            return

        if phone_number in self.factory.users:
            log.msg("電話號碼<%s>存在老的連接." % phone_number.encode('utf-8'))
            self.factory.users[phone_number].connectionLost("")
            self.factory.users.pop(phone_number)

        log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.factory.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根據command_id來分配函數
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        單播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        組播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        廣播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.factory.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.factory.users.keys():
                self.factory.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self)

    def check_users_online(self):
        for key, value in self.users.items():
            if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
                log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8'))
                value.transport.abortConnection()

    @defer.inlineCallbacks
    def receive_from_mq(self):
        data = yield redis_store.rpop('front_tcp')
        if data:
            log.msg("接受到來自消息隊列的消息:", data)
            self.process_data_from_mq(data)

    def process_data_from_mq(self, data):
        loads_data = json.loads(data)
        command_id = loads_data.get('command_id')
        phone_numbers = loads_data.get('chat_to')
        chat_from = loads_data.get('chat_from')
        chat_content = loads_data.get('chat_content')

        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(content, command_id, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = 1100
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8'))

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)
task_receive_data_from_mq.start(0.1, now=False)

reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT)

reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf)

reactor.listenTCP(8124, cf)
reactor.run()
View Code

  

  下面是web方面的代碼,web也是,之前用redis很簡單的做,現在換到rabbitmq,由於這個例子很簡單,所以我就在request過程中初始化rabbitmq了,整個代碼就非常簡單了,就是一個發送函數而已。

# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from app.model import User, db_session
import json
from . import web
import pika

RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'rabbitmq01'
RABBITMQ_PASSWORD = 'rabbitmq01'


@web.teardown_request
def handle_teardown_request(exception):
    db_session.remove()


@web.route('/send-command', methods=['GET', 'POST'])
def send_command():
    if request.method == 'GET':
        users = User.query.all()
        return render_template('web/send-command.html', users=users)
    else:
        data = request.get_json()
        command_id = data.get('command_id')
        chat_from = '13764408552'
        chat_to = data.get('chat_to')
        chat_content = data.get('content')

        if not chat_to or not chat_content or not command_id:
            return jsonify({'code': 0, 'message': '信息不完整'})

        send_data = json.dumps(dict(command_id=command_id, chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        # current_app.redis.lpush('front_tcp', send_data)

        credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials, port=RABBITMQ_PORT))
        channel = connection.channel()
        channel.queue_declare(queue='front_tcp')

        channel.basic_publish(exchange='',
                              routing_key='front_tcp',
                              body=send_data)

        print "send json_data to front_tcp, the data is ", send_data

        connection.close()

        return jsonify({'code': 1, 'message': '發送成功'})

  所有代碼更換完成,看一下具體效果吧

  web上先發送一個消息。

  隨便啟動一個客戶端,看看接受吧。

  看見沒有,整個過程就全部打通了。

  總結:整個twisted就講到這了,大家可以看到,twisted我也不是特別熟悉,所以我一共就用了5章把它講完。從下一章開始,我開始講tornado,利用tornado做tcpserver,tcpclient,websocket服務器,因為tornado的源碼比較好讀,所以我重點也會放在tornado上。最近我在看reactjs,屆時我會用稍微好看一點的圖形界面,來做websocket頁面,tornado這個庫真正做到small strong smart,我一直喜歡小而精的庫。總之,我重點會放在tornado上,希望大家到時候會喜歡。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM