python發送消息到activeMQ后java接收到BinaryMessage的坑


和另一個系統進行對接,使用activemq進行消息通信。對方使用java客戶端監聽一個topic,我們需要發送TextMessage消息,對方接收后進行處理。而我們因為系統架構的原因只能使用python進行推送,也就只能通過stomp協議發送消息。然后就遇到了問題,發送的消息在java消費者端只能解析成BinaryMessage,而發送的時候根本沒有辦法指定消息類型。網上搜了很久沒有找到相同的情況。
根據官方通過python往ActiveMQ發送message的demo編寫如下代碼。

# -*-coding:utf-8-*-
import stomp
import time

queue_name = '/queue/SampleQueue'
topic_name = '/topic/SampleTopic'
listener_name = 'SampleListener'
test_name = "springBootMqQueue"
springBootMqQueue = '/queue/springBootMqQueue'


class SampleListener(object):
    def on_message(self, headers, message):
        print('headers: %s' % headers)
        print('message: %s' % message)


# 推送到隊列queue
def send_to_queue(msg):
    conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
    conn.connect('admin', 'admin', wait=True)
    conn.send(springBootMqQueue, msg)
    conn.disconnect()


##從隊列接收消息
def receive_from_queue():
    conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
    conn.set_listener(listener_name, SampleListener())
    conn.connect('admin', 'admin', wait=True)
    conn.subscribe(springBootMqQueue)
    time.sleep(1)  # secs
    conn.disconnect()


if __name__ == '__main__':
    send_to_queue('{"content":{"flow":{"network":"5","times":"1-1","url":"http://www.baidu.com","way":"5"},"sms":{"direction":"0","text":"短信內容詳情"},"voice":{"connect":"5","key":"掛斷"}},"form":"13901295021","formPort":"com4","interval":"2-2","network":"5","taskId":"1dsf3641212434g","times":"1-3","to":"18611010269","type":"1"}')
    receive_from_queue()

Stomp是一個很簡單的協議,協議中不攜帶TextMessage和BytesMessage相關的信息,而是通過content-length header判斷消息類型的。header中有content-length則說明是BytesMessage,否則是TextMessage。
接下來的問題就簡單了,發送的時候不在header中攜帶content-length就可以了,查看send方法的源碼發現

def __init__(self, transport, auto_content_length=True):
    self.transport = transport
    self.auto_content_length = auto_content_length
    transport.set_listener('protocol-listener', self)
    self.version = '1.0'

def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
    """
    Send a message to a destination.

    :param str destination: the destination of the message (e.g. queue or topic name)
    :param body: the content of the message
    :param str content_type: the content type of the message
    :param dict headers: a map of any additional headers the broker requires
    :param keyword_headers: any additional headers the broker requires
    """
    assert destination is not None, "'destination' is required"
    assert body is not None, "'body' is required"
    headers = utils.merge_headers([headers, keyword_headers])
    headers[HDR_DESTINATION] = destination
    if content_type:
        headers[HDR_CONTENT_TYPE] = content_type
    body = encode(body)
    if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers:
        headers[HDR_CONTENT_LENGTH] = len(body)
    self.send_frame(CMD_SEND, headers, body)

三個條件都為true則會填充content-length,而auto_content_length是在__init__方法中傳入的,默認值為True,所以只需要在創建對象的時候將該值設置為False即可。

# 推送到隊列queue
def send_to_queue(msg):
    conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
    conn.connect('admin', 'admin', wait=True)
    conn.send(springBootMqQueue, msg)
    conn.disconnect()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM