STOMP協議詳解


http://blog.csdn.net/chszs/article/details/46592777

http://nikipore.github.io/stompest/

https://stomp.github.io/stomp-specification-1.1.html#Heart-beating

https://nikipore.github.io/stompest/protocol.html

 

the sender MUST send new data over the network connection at least every <n> milliseconds   配了heartbeat,你就必須主動發數據,否則會被斷開

 

 

SUBSCRIBE id Header

 

An id header MUST be included in the frame to uniquely identify the subscription within the STOMP connection session. Since a single connection can have multiple open subscriptions with a server, the id header allows the client and server to relate subsequent ACKNACK or UNSUBSCRIBEframes to the original subscription.

 

StompConfig(uri, amq_config['USER'], amq_config['PASSWORD'], version=StompSpec.VERSION_1_2)

1.2版本的subscribe消息需要id頭

 

 

一、STOMP協議介紹

STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,簡單(流)文本定向消息協議,它提供了一個可互操作的連接格式,允許STOMP客戶端與任意STOMP消息代理(Broker)進行交互。STOMP協議由於設計簡單,易於開發客戶端,因此在多種語言和多種平台上得到廣泛地應用。

STOMP協議的前身是TTMP協議(一個簡單的基於文本的協議),專為消息中間件設計。

STOMP是一個非常簡單和容易實現的協議,其設計靈感源自於HTTP的簡單性。盡管STOMP協議在服務器端的實現可能有一定的難度,但客戶端的實現卻很容易。例如,可以使用Telnet登錄到任何的STOMP代理,並與STOMP代理進行交互。

STOMP協議與2012年10月22日發布了最新的STOMP 1.2規范。
要查看STOMP 1.2規范,見: https://stomp.github.io/stomp-specification-1.2.html

二、STOMP的實現

業界已經有很多優秀的STOMP的服務器/客戶端的開源實現,下面就介紹一下這方面的情況。

1、STOMP服務器

項目名 兼容STOMP的版本 描述
Apache Apollo 1.0 1.1 1.2 ActiveMQ的繼承者 http://activemq.apache.org/apollo
Apache ActiveMQ 1.0 1.1 流行的開源消息服務器 http://activemq.apache.org/
HornetQ 1.0 來自JBoss的消息中間件 http://www.jboss.org/hornetq
RabbitMQ 1.0 1.1 1.2 基於Erlang、支持多種協議的消息Broker,通過插件支持STOMP協議 http://www.rabbitmq.com/plugins.html#rabbitmq-stomp
Stampy 1.2 STOMP 1.2規范的一個Java實現 http://mrstampy.github.com/Stampy/
StompServer 1.0 一個輕量級的純Ruby實現的STOMP服務器 http://stompserver.rubyforge.org/

這里只列了部分。

2、STOMP客戶端庫

項目名 兼容STOMP的版本 描述
activemessaging 1.0 Ruby客戶端庫 http://code.google.com/p/activemessaging/
onstomp 1.0 1.1 Ruby客戶端庫 https://rubygems.org/gems/onstomp
Apache CMS 1.0 C++客戶端庫 http://activemq.apache.org/cms/
Net::STOMP::Client 1.0 1.1 1.2 Perl客戶端庫 http://search.cpan.org/dist/Net-STOMP-Client/
Gozirra 1.0 Java客戶端庫 http://www.germane-software.com/software/Java/Gozirra/
libstomp 1.0 C客戶端庫,基於APR庫 http://stomp.codehaus.org/C
Stampy 1.2 Java客戶端庫 http://mrstampy.github.com/Stampy/
stomp.js 1.0 1.1 JavaScript客戶端庫 http://jmesnil.net/stomp-websocket/doc/
stompest 1.0 1.1 1.2 Python客戶端庫,全功能實現,包括同步和異步 https://github.com/nikipore/stompest
StompKit 1.2 Objective-C客戶端庫,事件驅動 https://github.com/mobile-web-messaging/StompKit/
stompngo 1.0 1.1 1.2 Go客戶端庫 https://github.com/gmallard/stompngo
stomp.py 1.0 1.1 1.2 Python客戶端庫 https://github.com/jasonrbriggs/stomp.py
tStomp 1.1 TCL客戶端庫 https://github.com/siemens/tstomp

這里只列了部分。

三、STOMP協議分析

STOMP協議與HTTP協議很相似,它基於TCP協議,使用了以下命令:

CONNECT
SEND
SUBSCRIBE
UNSUBSCRIBE
BEGIN
COMMIT
ABORT
ACK
NACK
DISCONNECT

STOMP的客戶端和服務器之間的通信是通過“幀”(Frame)實現的,每個幀由多“行”(Line)組成。
第一行包含了命令,然后緊跟鍵值對形式的Header內容。
第二行必須是空行。
第三行開始就是Body內容,末尾都以空字符結尾。
STOMP的客戶端和服務器之間的通信是通過MESSAGE幀、RECEIPT幀或ERROR幀實現的,它們的格式相似

 

 

http://diaocow.iteye.com/blog/1725186

==========

Stomp是一個簡單的消息文本協議,它的設計核心理念就是簡單與可用性,官方文檔:http://stomp.github.com/stomp-specification-1.1.html 

現在我們就來實踐一下Stomp協議,你需要的是: 

1.一個支持stomp消息協議的messaging server(譬如activemq,rabbitmq); 
2.一個終端(譬如linux shell); 
3.一些基本命令與操作(譬如nc,telnet) 

1.建立連接 
當我們(Client端)向服務器發送一個CONNECT Frame,就向服務器發起了一個連接請求,此時服務器端返回一個CONNECTED Frame表示建立連接成功,其中頭字段version表示采用的stomp協議版本(這里默認是1.0) 

 
ps: ^@符號 ctrl+@鍵(用來提交請求),刪除之前輸入的數據 ctrl+n+backspace鍵 

當然Client端也可指定所支持的協議版本(accept-version字段,多個版本按遞增順序排列,並用逗號分隔); 

 

服務器此時返回的CONNECTED Frame中會列出它所支持的協議版本號中最高的那個(如上圖的version:1.1) 

如果服務器端不支持客戶端所列舉的協議版本(比如這里的2.1),那么服務器會返回一個ERROR Frame並且列舉出服務器自己所支持的協議版本(如下圖的version:1.0,1.1) 

 


2.消息傳遞 
客戶端一旦與服務器端建立連接,那么就可發送下列Frame進行消息傳遞 

SEND
SUBSCRIBE
UNSUBSCRIBE
ACK
NACK
BEGIN
COMMIT
ABORT
DISCONNECT


SEND Frame 用來將客戶端消息發送到目的地(destination),因此它必須指定一個destination頭字段,另外在所有頭字段之后,新起一個空行,之后就是需要發送的消息(譬如這里的 hello stomp!) 

 

此時"hello stomp!" 這條消息就被發送到了隊列 my_queue中去 

現在我們再起一個客戶端clinet_a,來接受這個隊列(myqueue)中的消息 

 

SUBSCRIBE Frame 表示客戶端希望訂閱某一個目的地(destination)的消息(這里是/queue/my_queue),其中頭字段id表示在一個會話連接里,唯一標示一個訂閱者(subscription),頭字段destination標示該訂閱者(subscription)需要訂閱的目的地(這里是一個隊列,/queue/my_queue) 

緊接着,我們就接受到服務器端發送來的消息(MESSAGE Frame),其中message_id:唯一標示了這條消息(后面我們會使用這個消息id進行ack,uack操作),content-length:標示消息體的長度,在所有這些頭后面,新起一個空行就是消息內容(如這里的hello stomp!) 

如此時我們希望訂閱另一個destination,該如何辦呢?,是不是再發送一個SUBSCRIBE Frame就好了? 

 

結果發現服務器端返回了ERROR Frame 告訴我們SUBSCRIBE失敗,原來同一個subscription id只能訂閱一個destination,要想訂閱令一個destination,必須先發送UNSUBSCRIBE Frame,然后再SUBSCRIBE 到新的目的地 

 

UNSCUBSCRIBE Frame中的id標示需要取消訂閱的subscription,然后我們在訂閱到新的destination(這里是/queue/other_queue),可以發現訂閱成功,服務器沒有再發送ERROR Frame 

至此,我們就模擬出了一個PTP(point-to-point)消息模型,下面我們也模擬下另外一個pub/sub消息模型: 

准備工作,新起兩個連接訂閱到/topic/my_topic上,如下圖: 
clinet_a 
 

client b 
 

Send端發送廣播消息: 
 

我們去檢查下兩個消息接收客戶端,果然發現收到了這條廣播消息^_^ 
client a 
 

client b 
 

默認情況下,只要服務器端發送消息,就認為客戶端接收成功(即ack模式為auto),若我們需要更嚴格的消息保證,則必須采用client模式,即由客戶端確認消息的接受 

服務器發送了一個消息到/queue/ackqueue 

 

此時我們的客戶端client_a確實接受到了該消息 

 

雖然該消息已經發送到客戶端,但是由於該消息沒有確認(ack),則該消息還保存在隊列/queue/ackqueue中(直到客戶端確認才刪除),我們可以通過rabbitmq提供的命令來查看(sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged): 

 

若客戶端在確認消息前與客戶端斷開連接,那么服務器可能(根據不同server的設計而不同)會選擇將該消息發送給另一個subscription(這里是client_b) 

 

可以看出rabbit是選擇將它發送給另一個subscription, 我們在使用命令查ack_queue隊列中是否還有未確認的消息 

 

結果發現沒有了 ^_^ 

若是客戶端希望確認一個消息,該如何做呢?,只要發送一個ACK Frame即可! 
ACK Frame中,subscription標示是誰確認消息,message-id標示是確認哪條消息(即MESSAGE Frame中攜帶的message-id) 

 

另外我在rabbitmq的測試結果發現,ack具有累積效應(因為這里SUBSCRIBE幀的ack頭被設置成了‘client’),譬如接收了10條消息,如果你ack了8條消息,那么1-7條消息都會被ack,只有9-10兩條消息還保持未ack狀態 

除了有ACK Frame,還有NACK Frame,它表示客戶端未成功接收某條消息,這時候服務器可以選擇重發或者丟棄(對於rabbitmq,我的測試結果是選擇重發) 


3.斷開連接 
說完了連接的建立,消息的發送與接收,現在我們來看看客戶端如何與服務器斷開連接的,更重要的是如何安全的斷開連接 

 

通過發送DISCONNNECT Frame表示向服務器發送一個斷開連接請求,其中receipt表示服務器收到請求后請告知客戶端,並返回一個相同的receipt-id 


至此關於Stmop絕大部分概念,我們已經實踐完畢,如需更詳細的還請翻閱官方文檔 ^_^ 

 

================

http://www.cnblogs.com/davidwang456/p/4449428.html

 

 

http://nikipore.github.io/stompest/sync.html?highlight=stompest%20sync#module-stompest.sync.client

 

STOMP Frames

STOMP是基於幀的協議,它假定底層為一個2-way的可靠流的網絡協議(如TCP)。客戶端和服務器通信使用STOMP幀流通訊。幀的結構看起來像:

COMMAND header1:value1 header2:value2 Body^@ 

幀以command字符串開始,以EOL結束,其中包括可選回車符(13字節),緊接着是換行符(10字節)。command下面是0個或多個<key>:<value>格式的header條目, 每個條目由EOL結束。一個空白行(即額外EOL)表示header結束和body開始。body連接着NULL字節。本文檔中的例子將使用^@,在ASCII中用control-@表示,代表NULL字節。NULL字節可以選擇跟多個EOLs。欲了解更多關於STOMP幀的詳細信息,請參閱Augmented BNF節本文件。

本文檔中引用的所有command 和header 名字都是大小寫敏感的.

只有SENDMESSAGE, 和ERROR幀有body。所有其他的幀不能有body

 

content-length

content-type

receipt

任何客戶端幀(除了CONNECT幀)都可以為receipt header指定任何值。這會讓服務端應答帶有RECEIPT的客戶端幀的處理過程

如果client或者server收到重復的header條目,只有第一個會被用作header條目的值。其他的值僅僅用來維持狀態改變,或者被丟棄。

例如,如果client收到:

MESSAGE
foo:World
foo:Hello

^@

foo header的值為World.

 

大小限制

為了客戶端濫用服務端的內存分配,服務端可以設置可分配的內存大小:

  • 單個幀允許幀頭的個數
  • header中每一行的最大長度
  • 幀體的大小

如果超出了這些限制,server應該向client發送一個error frame,然后關閉連接.

連接延遲

STOMP servers必須支持client快速地連接server和斷開連接。 這意味着server在連接重置前只允許被關閉的連接短時間地延遲.

結果就是,在socket重置前client可能不會收到server發來的最后一個frame(比如ERROR或者RECEIPTframe去應答DISCONNECTframe)

 

  • Client Frames

client可以發送下列列表以外的frame,但是STOMP1.2 server會響應ERROR frame,然后關閉連接。

SUBSCRIBE 消息

SUBSCRIBE frame用於注冊給定的目的地. 和SENDframe一樣,SUBSCRIBEframe需要包含destination header表明client想要訂閱目的地。 被訂閱的目的地收到的任何消息將通過MESSAGE frame發送給client。 ack header控制着確認模式

例子:

SUBSCRIBE
id:0
destination:/queue/foo
ack:client //ACK頭

^@

如果server不能成功創建此次訂閱,那么server將返回ERROR frame然后關閉連接。

STOMP服務器可能支持額外的服務器特定的頭文件,來自定義有關訂閱傳遞語義.

SUBSCRIBE消息的 id 頭

一個單連接可以對應多個開放的servers訂閱,所以必須包含id header去唯一標示這個訂閱. 這個id frame可以把此次訂閱與接下來的MESSAGE frame和UNSUBSCRIBE frame聯系起來。

在相同的連接中,不同的訂閱必須擁有不同訂閱id

SUBSCRIBE消息的 ack 頭

ack header可用的值有autoclientclient-individual, 默認為auto.

  1. ackauto時,client收到server發來的消息后不需要回復ACK frame. server假定消息發出去后client就已經收到。這種確認方式可以減少消息傳輸的次數.
  2. ackclient時, client必須發送ACK frame給servers, 讓它處理消息. 如果在client發送ACK frame之前連接斷開了,那么server將假設消息沒有被處理,可能會再次發送消息給另外的客戶端。

    client發送的ACK frame被當作是積累的確認。這就意味這種確認方式會去操作ACK frame指定的消息和訂閱的所有消息. 

    譬如接收了10條消息,如果你ack了第8條消息,那么1-7條消息都會被ack,只有9-10兩條消息還保持未ack狀態。

    由於client不能處理某些消息,所以client應該發送NACK frame去告訴server它不能消費這些消息。

  1. 當ack模式是client-individual,確認工作就像上面的'client'確認模式(除了由客戶端發送的ACKNACK幀)不會被累計。這意味着,后續ACKNACK消息幀,也不能影響前面的消息的確認。
ACK 消息

用來確認訂閱的消息被消費了.

ACK
id:12345
transaction:tx1

^@
  • Server Frames

server偶爾也會發送frame給客戶端(除了連接最初的CONNECTED frame).

這些frames為: * MESSAGE * RECEIPT * ERROR

RECEIPT幀

server成功處理帶有receipt頭的client frame后, 將發送RECEIPT frame到client.  RECEIPT frame必須包含receipt-id header,它的值為client frame中receipt header的值。

RECEIPT
receipt-id:message-12345

^@

RECEIPT frame是作為server處理的client frame后的應答. 既然STOMP是基於流的,那么receipt也是對server已經收到所有的frames的累積確認。但是,以前的frames可能並沒有被完全處理。如果clients斷開連接,以前接收到的frames應該繼續被server處理。

=====

 

Examples

If you use ActiveMQ to run these examples, make sure you enable the STOMP connector, (see here for details):

<!-- add this to the config file "activemq.xml" -->
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/> 

For debugging purposes, it is highly recommended to turn on the logger on level DEBUG:

import logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) 

Producer

from stompest.config import StompConfig from stompest.sync import Stomp CONFIG = StompConfig('tcp://localhost:61613') QUEUE = '/queue/test' if __name__ == '__main__': client = Stomp(CONFIG) client.connect() client.send(QUEUE, 'test message 1'.encode()) client.send(QUEUE, 'test message 2'.encode()) client.disconnect() 

Consumer

from stompest.config import StompConfig from stompest.protocol import StompSpec from stompest.sync import Stomp
from stompest.error import StompConnectionError, StompProtocolError
import traceback CONFIG = StompConfig('tcp://localhost:61613') QUEUE = '/queue/test' if __name__ == '__main__': client = Stomp(CONFIG) client.connect() client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL}) //客戶端的ack模式為client_individual while True:
     try:
      if self.client.canRead(timeout=5):      frame = client.receiveFrame() //If we are not connected, this method will raise a StompConnectionError. Keep in mind that this method will block forever if there are no frames incoming on the wire. Be sure to use peek with self.canRead(timeout) before!     print('Got %s' % frame.info())    client.ack(frame)
     except (StompConnectionError, StompProtocolError) as e:
        self.client.connect()
     except Exception as e:
        self.logger.debug(traceback.format_exc()) client.disconnect()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM