消息隊列NetMQ 原理分析5-StreamEngine、Encord和Decord


前言

介紹

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標准socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。

zeromq的英文文檔
NetMQ的英文文檔

目的

對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計划如下:

  1. 消息隊列NetMQ 原理分析1-Context和ZObject
  2. 消息隊列NetMQ 原理分析2-IO線程和完成端口
  3. 消息隊列NetMQ 原理分析3-命令產生/處理、創建Socket和回收線程
  4. 消息隊列NetMQ 原理分析4-Socket、Session、Option和Pipe
  5. 消息隊列NetMQ 原理分析5-StreamEngine,Encord和Decord
  6. 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
  7. 消息隊列NetMQ 原理分析7-Device
  8. 消息隊列NetMQ 原理分析8-不同類型的Socket
  9. 消息隊列NetMQ 原理分析9-實戰

友情提示: 看本系列文章時最好獲取源碼,更有助於理解。


StreamEngine

SocketBaseMsg發送給SessionBase之后需要將Msg轉化為byte[]進行傳輸,Engine就是做轉換的工作,轉換完成之后就會和實際的底層Socket進行消息傳輸。

NetMQTcp協議消息轉換使用的是StreamEngine

internal sealed class StreamEngine : IEngine, IProactorEvents, IMsgSink
{

}

上一章介紹到管道事件。

發送數據

當出管道有數據可讀時,會調用SessionBaseReadActivated事件

public void ReadActivated(Pipe pipe)
{
    ...
    if (m_engine != null)
        m_engine.ActivateOut();
    else
        m_pipe.CheckRead();
}

然后會調用對應m_engine的ActivateOut事件

public void ActivateOut()
{
    FeedAction(Action.ActivateOut, SocketError.Success, 0);
}

public void FeedAction(){
    ...
    case State.Active:
        switch (action)
        {
            case Action.OutCompleted:
                int bytesSent = EndWrite(socketError, bytesTransferred);

                // IO error has occurred. We stop waiting for output events.
                // The engine is not terminated until we detect input error;
                // this is necessary to prevent losing incoming messages.
                if (bytesSent == -1)
                {
                    m_sendingState = SendState.Error;
                }
                else
                {
                    m_outpos.AdvanceOffset(bytesSent);
                    m_outsize -= bytesSent;

                    BeginSending();
                }
                break;
            ...
        }
    ...
}

TCPConnect客戶端發送請求完成時,會調用OutCompleted事件

private void Loop()
{
    ...
    switch (completion.OperationType)
    {
        ...
        case OperationType.Connect:
        case OperationType.Disconnect:
        case OperationType.Send:
        item.ProactorEvents.OutCompleted(
            completion.SocketError,
            completion.BytesTransferred);
            }
    }
    ...
public void OutCompleted(SocketError socketError, int bytesTransferred)
{
    ...
    // Create the engine object for this connection.
    var engine = new StreamEngine(m_s, m_options, m_endpoint);
    ...
    // Attach the engine to the corresponding session object.
    SendAttach(m_session, engine);
    ...
    }

此時會創建一個StreamEngine和請求的SessionBase對象進行關聯。

protected override void ProcessAttach(IEngine engine)
{
    Debug.Assert(engine != null);

    // Create the pipe if it does not exist yet.
    if (m_pipe == null && !IsTerminating)
    {
        ZObject[] parents = { this, m_socket };
        int[] highWaterMarks = { m_options.ReceiveHighWatermark, m_options.SendHighWatermark };
        int[] lowWaterMarks = { m_options.ReceiveLowWatermark, m_options.SendLowWatermark };
        bool[] delays = { m_options.DelayOnClose, m_options.DelayOnDisconnect };
        Pipe[] pipes = Pipe.PipePair(parents, highWaterMarks, lowWaterMarks, delays);

        // Plug the local end of the pipe.
        pipes[0].SetEventSink(this);

        // Remember the local end of the pipe.
        Debug.Assert(m_pipe == null);
        m_pipe = pipes[0];

        // Ask socket to plug into the remote end of the pipe.
        SendBind(m_socket, pipes[1]);
    }

    // Plug in the engine.
    Debug.Assert(m_engine == null);
    m_engine = engine;
    m_engine.Plug(m_ioThread, this);
}

接收數據

當完成端口通知數據接收完成時,會調用ProactorInCompleted事件,實際就是調用的對應的StreamEngineInCompleted事件

public void InCompleted(SocketError socketError, int bytesTransferred)
{
    FeedAction(Action.InCompleted, socketError, bytesTransferred);
}
public void FeedAction(){
    ...
    case State.Active:
        switch (action)
        {
            case Action.InCompleted:
                m_insize = EndRead(socketError, bytesTransferred);

                ProcessInput();
                break;
            ...
        }
    ...
}

接收完成后會對接收到的數據進行處理

private void ProcessInput()
{
    ...
    if (m_options.RawSocket)
    {
        if (m_insize == 0 || !m_decoder.MessageReadySize(m_insize))
        {
            processed = 0;
        }
        else
        {
            processed = m_decoder.ProcessBuffer(m_inpos, m_insize);
        }
    }
    else
    {
        // Push the data to the decoder.
        processed = m_decoder.ProcessBuffer(m_inpos, m_insize);
    }
    ...
    // Flush all messages the decoder may have produced.
    m_session.Flush();
    ...
}
public override bool MessageReadySize(int msgSize)
{
    m_inProgress = new Msg();
    m_inProgress.InitPool(msgSize);

    NextStep(new ByteArraySegment(m_inProgress.Data, m_inProgress.Offset),
        m_inProgress.Size, RawMessageReadyState);

    return true;
}

讀取數據到Msg后會調用DecoderProcessBuffer方法

PS:由於NetMQ有自己的傳輸協議格式,因此當使用NetMQ和其他程序進行Socket傳輸時,必須使用StreamSocket


public int ProcessBuffer(ByteArraySegment data, int size)
{
    ...
    while (m_toRead == 0)
    {
        if (!Next())
        {
            if (State < 0)
            {
                return -1;
            }
            return size;
        }
    }
    return size;
    ...
}
protected override bool Next()
{
    if (State == RawMessageReadyState)
    {
        return RawMessageReady();
    }

    return false;
}
private bool RawMessageReady()
{
    ...
    bool isMessagedPushed = m_msgSink.PushMsg(ref m_inProgress);

    if (isMessagedPushed)
    {
        // NOTE: This is just to break out of process_buffer
        // raw_message_ready should never get called in state machine w/o
        // message_ready_size from stream_engine.
        NextStep(new ByteArraySegment(m_inProgress.Data, m_inProgress.Offset),
            1, RawMessageReadyState);
    }
    return isMessagedPushed;
    ...
}

對讀到的數據進行處理調用RawDecoderNext的方法,將獲取到的Msg放入到SeesionBase的管道中。

流程分析

讀寫數據流程圖如下圖所示:
2017731191255-StreamEngine
我們使用WireShark進行驗證。

我們監聽15557地址,然后創建一個客戶端連接15557地址
2017731192245-1
前面3條是三次握手。第四條是客戶端向服務器發送了10字節長度的請求頭部,以0xff開頭,0x7f結尾。中間是8字節是Identitysize長度

...
switch (m_handshakeState)
{
    case HandshakeState.Closed:
        switch (action)
        {
            case Action.Start:
                // Send the 'length' and 'flags' fields of the identity message.
                // The 'length' field is encoded in the long format.
                m_greetingOutputBuffer[m_outsize++] = 0xff;
                m_greetingOutputBuffer.PutLong(m_options.Endian, (long)m_options.IdentitySize + 1, 1);
                m_outsize += 8;
                m_greetingOutputBuffer[m_outsize++] = 0x7f;
                ...
        }
        ...
}
...


第6條是服務器向客戶端發送的10字節長度的請求頭部,以0xff開頭,0x7f結尾。中間是8字節是identitysize的信息
I
第8條是服務器向客戶端發送的版本號和Socket類型,01表示版本號1,06表示當前是RouterSocket

...
case HandshakeState.ReceivingGreeting:
    switch (action)
    {
        case Action.InCompleted:
        ...
        
                if (m_greeting[0] != 0xff || (m_greetingBytesRead == 10 && (m_greeting[9] & 0x01) == 0)){
                ...
                }
                else if (m_greetingBytesRead < 10)
                {
                    var greetingSegment = new ByteArraySegment(m_greeting, m_greetingBytesRead);
                    BeginRead(greetingSegment, PreambleSize - m_greetingBytesRead);
                }
                else
                {
                    ...
                    m_outpos[m_outsize++] = 1; // Protocol version
                    m_outpos[m_outsize++] = (byte)m_options.SocketType;
                    ...
                }
        ...
    }
...


第10條是客戶端向服務器發送的版本號和socket類型,05表示當前是DealSocket

...
case HandshakeState.ReceivingRestOfGreeting:
    switch (action)
    {
        case Action.InCompleted:
        ...
        if (m_greeting[VersionPos] == 0)
        {
            // ZMTP/1.0 framing.
            m_encoder = new V1Encoder(Config.OutBatchSize, m_options.Endian);
            m_encoder.SetMsgSource(m_session);

            m_decoder = new V1Decoder(Config.InBatchSize, m_options.MaxMessageSize, m_options.Endian);
            m_decoder.SetMsgSink(m_session);
        }
        else
        {
            // v1 framing protocol.
            m_encoder = new V2Encoder(Config.OutBatchSize, m_session, m_options.Endian);
            m_decoder = new V2Decoder(Config.InBatchSize, m_options.MaxMessageSize, m_session, m_options.Endian);
        }
        Activate();
        ...
    }
...

Encoder

V2Encoder

接下來就是數據傳輸。

public V2Encoder(int bufferSize, IMsgSource session, Endianness endian)
    : base(bufferSize, endian)
{
    m_inProgress = new Msg();
    m_inProgress.InitEmpty();

    m_msgSource = session;

    // Write 0 bytes to the batch and go to message_ready state.
    NextStep(m_tmpbuf, 0, MessageReadyState, true);
}

由於NetMQ使用的是版本1,用的是V2EncoderV2Decoder進行編碼和解碼。
在初始化Encoder的時候會向報文寫入2個0字節數據,暫時不明白為何要這樣做。


int protocolFlags = 0;
if (m_inProgress.HasMore)
    protocolFlags |= V2Protocol.MoreFlag;
if (m_inProgress.Size > 255)
    protocolFlags |= V2Protocol.LargeFlag;
m_tmpbuf[0] = (byte)protocolFlags;

// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
int size = m_inProgress.Size;
if (size > 255)
{
    m_tmpbuf.PutLong(Endian, size, 1);
    NextStep(m_tmpbuf, 9, SizeReadyState, false);
}
else
{
    m_tmpbuf[1] = (byte)(size);
    NextStep(m_tmpbuf, 2, SizeReadyState, false);
}

第一個字節是Flags用於標記該報文是否為大報文,超過過255個字節就會標記為大包標記,是否還有更多報文。若報文長度小於256,則第二個字節用於存儲報文長度。但是若是大報文,則會8個字節保存報文長度。
下面就開始發送數據
我們用客戶端發一個字符串test1,然后服務端原樣返回該字符串

可以看到如我們上面分析的一樣,第一個字節為0,第二個字節為大小test1為5個字節長度。由於CMD命令單行輸入最長字符限制長度為255,因此我們沒辦法在CMD命令下輸入更長數據進行測試。暫時就不做驗證。

V1Encoder

V1Encoder編碼如下所示

if (size < 255)
{
    m_tmpbuf[0] = (byte)size;
    m_tmpbuf[1] = (byte)(m_inProgress.Flags & MsgFlags.More);
    NextStep(m_tmpbuf, 2, SizeReadyState, false);
}
else
{
    m_tmpbuf[0] = 0xff;
    m_tmpbuf.PutLong(Endian, size, 1);
    m_tmpbuf[9] = (byte)(m_inProgress.Flags & MsgFlags.More);
    NextStep(m_tmpbuf, 10, SizeReadyState, false);
}

當小於255字符,首字符是長度,第二個字符是Flags,超過255字符,首字符為0xff,然后跟着8個字符長度的長度值,接下來是Flags

RawEncoder

使用RawEncoder會將原始數據原樣發送不會增加任何其他字符。

Decoder

V2Decoder

接收到數據會先接收第一個字節Flags判斷是否有后續包以及是小包還是打包,若是小包,則解析第一個字節長度位,否則讀取8個字節長度位。

V1Decoder

接收到數據收先會判斷第一個字節是不是Oxff,若為Oxff則表示為打包,獲取8位字節長度,否則獲取1位字節長度處理。

RawDecoder

使用RawDecoder會讀取數據保存到管道中。

總結

本片介紹了NetMQ的報文格式並闡述了底層Msg如何轉換為流進行發送和接收。


20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
本文地址:https://www.cnblogs.com/Jack-Blog/p/7283897.html
作者博客:傑哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM