消息隊列NetMQ 原理分析4-Socket、Session、Option和Pipe



前言

介紹

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標准socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。

zeromq的英文文檔
NetMQ的英文文檔

目的

對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計划如下:

  1. 消息隊列NetMQ 原理分析1-Context和ZObject
  2. 消息隊列NetMQ 原理分析2-IO線程和完成端口
  3. 消息隊列NetMQ 原理分析3-命令產生/處理、創建Socket和回收線程
  4. 消息隊列NetMQ 原理分析4-Socket、Session、Option和Pipe
  5. 消息隊列NetMQ 原理分析5-Engine,Encord和Decord
  6. 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
  7. 消息隊列NetMQ 原理分析7-Device
  8. 消息隊列NetMQ 原理分析8-不同類型的Socket
  9. 消息隊列NetMQ 原理分析9-實戰

友情提示: 看本系列文章時最好獲取源碼,更有助於理解。


Socket

上一章最后我們簡單介紹了SocketBaseSessionBase的創建和回收,這一張我們詳細介紹SocketBaseSessionBase
首先SocketBase繼承自Own,即也是ZObject對象,同時由於SocketBase需要進行消息的傳輸,因此它實現了一些結構,包括IPollEventsPipe.IPipeEvents

接口實現

internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{
    ...
}
  • IPollEvents事件上一章回收線程已經介紹過,這里不再做過多說明了,簡單講SocketBase實現該事件只有在回收線程回收Socket的時候會觸發。
  • Pipe.IPipeEvents:是管道事件,它的簽名如下
public interface IPipeEvents
{
    void ReadActivated([NotNull] Pipe pipe);
    void WriteActivated([NotNull] Pipe pipe);
    void Hiccuped([NotNull] Pipe pipe);
    void Terminated([NotNull] Pipe pipe);
}
  • ReadActivated:表示管道可讀,管道實際調用SocketBaseSessionBaseReadActivated方法,而SocketBase實際會調用XReadActivated方法。
  • WriteActivated:表示管道可寫,管道實際調用SocketBaseSessionBaseWriteActivated方法,而SocketBase實際會調用XWriteActivated方法。
  • Hiccuped:當連接突然中斷時會調用此方法。
  • WriteActivated:表示管道終止。

內部結構

SocketBase的內部維護着一個字段,用於存放連接/綁定地址和它的管道(若當前SocketBaseTCPListener,則無需初始化管道,管道為空)。

private readonly Dictionary<string, Endpoint> m_endpoints = new Dictionary<string, Endpoint>();
private readonly Dictionary<string, Pipe> m_inprocs = new Dictionary<string, Pipe>();

Endpoint對象用於存放SessionBasePipeListener的引用

private class Endpoint
{
    public Endpoint(Own own, Pipe pipe)
    {
        Own = own;
        Pipe = pipe;
    }

    public Own Own { get; }
    public Pipe Pipe { get; }
}

SocketBase連接或綁定最后會向將Endpoint保存到字典中

private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe)
{
    LaunchChild(endpoint);
    m_endpoints[address] = new Endpoint(endpoint, pipe);
}

SocketBase斷開連接時會移除它

public void TermEndpoint([NotNull] string addr)
{
    ...
    if (protocol == Address.InProcProtocol)
    {
        ...
        m_inprocs.Remove(addr);
    }
    else
    {
        ...
        m_endpoints.Remove(addr);
    }
}

m_inprocs也是一個字典用於存放inproc協議的連接。
第一章創建SocketBase我們介紹了Context創建SocketBase所做的一些工作,初始化SocketBase時,會創建MailBox,用於傳輸Command

protected SocketBase([NotNull] Ctx parent, int threadId, int socketId)
    : base(parent, threadId)
{
    m_options.SocketId = socketId;
    m_mailbox = new Mailbox("socket-" + socketId);
}

每個SocketBase的命令處理實際都是在工作線程中進行。因此理論上(忽略線程上下文切換時造成的性能損失)線程數越多,NetMQ的IO吞吐量和工作線程數成正比關系。
Context創建SocketBase會根據Create靜態方法根據不同類型創建不同的SocketBase

public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{
    switch (type)
    {
        case ZmqSocketType.Pair:
            return new Pair(parent, threadId, socketId);
        case ZmqSocketType.Pub:
            return new Pub(parent, threadId, socketId);
        case ZmqSocketType.Sub:
            return new Sub(parent, threadId, socketId);
        case ZmqSocketType.Req:
            return new Req(parent, threadId, socketId);
        case ZmqSocketType.Rep:
            return new Rep(parent, threadId, socketId);
        case ZmqSocketType.Dealer:
            return new Dealer(parent, threadId, socketId);
        case ZmqSocketType.Router:
            return new Router(parent, threadId, socketId);
        case ZmqSocketType.Pull:
            return new Pull(parent, threadId, socketId);
        case ZmqSocketType.Push:
            return new Push(parent, threadId, socketId);
        case ZmqSocketType.Xpub:
            return new XPub(parent, threadId, socketId);
        case ZmqSocketType.Xsub:
            return new XSub(parent, threadId, socketId);
        case ZmqSocketType.Stream:
            return new Stream(parent, threadId, socketId);
        default:
            throw new InvalidException("SocketBase.Create called with invalid type of " + type);
    }
}

具體創建SocketBase的工作在上一章已經做了詳細的介紹,這里不再復述。

Session

首先和SocketBase一樣,SessionBase也繼承自Own,即也是ZObject對象,同時由於SessionBaseSocketBase存在消息傳輸,所以它也實現了IPipeEvents接口,同時它實現了IProactorEvents接口,在消息收發是會接收到通知。SessionBase一端和SocketBase進行消息的通訊,另一端和Engine存在消息通訊,它實現了IMsgSinkIMsgSource接口和Engine進行消息傳輸。

 internal class SessionBase : Own,
        Pipe.IPipeEvents, IProactorEvents,
        IMsgSink, IMsgSource{

        }
internal interface IMsgSink
{
    /// <summary>
    /// 傳輸消息.成功時返回true.
    /// </summary>
    /// <param name="msg">將msg消息寫入到管道中</param>
    bool PushMsg(ref Msg msg);
}
internal interface IMsgSource
{
    /// <summary>
    /// 取一個消息。成功時返回,從管道獲取消息寫入msg參數中;若失敗則返回false,將null寫入到msg參數中。
    /// </summary>
    /// <param name="msg">從管道獲取消息寫入Msg中</param>
    /// <returns>true if successful - and writes the message to the msg argument</returns>
    bool PullMsg(ref Msg msg);
}

SocketBase將消息寫入到寫管道時,對應的SessionBase會從讀管道讀到SocketBase寫入的數據,然后將數據從管道取出生成一個Msg,Engine會和AsyncSocket交互傳輸數據,關於Engine下一章再做介紹。

Option

option參數如下

  1. Affinity
    表示哪個線程是可用的,默認為0,表示所有線程在負載均衡都可使用。
  2. Backlog
    最大Socket待連接數
  3. DelayAttachOnConnect
    在創建連接時,延遲在SocketSession之間創建雙向的管道,默認創建連接時立即創建管道
  4. DelayOnClose
    若為true,則在Socket關閉時Session先從管道接收所有消息發送出去。
    否則直接關閉,默認為true
  5. DelayOnDisconnect
    若為true,則在Pipe通知我們中斷時Socket先將接收所有入隊管道消息。
    否則直接中斷管道。默認為true.
  6. Endianness
    字節序,數據在內存中是高到低排還是低到高排。
  7. Identity
    響應的Identity,每個Identity用於查找SocketIdentiy是一個重復的隨機32位整形數字,轉換為字節5位字節數組。每個消息的第一部分是Identity,
  8. IdentitySize
    1個字節用於保存Identity的長度。
  9. IPv4Only
  10. Linger
    當Socket關閉時,是否延遲一段時間等待數據發送完畢后再關閉管道
  11. MaxMessageSize
    每個消息包最大消息大小
  12. RawSocket
    若設置為true,RouterSocket可以接收非NetMQ發送來的tcp連接。
    默認是false,Stream在構造函數時會設置為true,設置為true時會將RecvIdentity修改為false(用NetMQ接收其他系統發送來的Socket請求應該用StreamSocekt,否則由於應用層協議不一樣可能會導致一些問題。)
  13. RecvIdentity
    若為true,Identity轉發給Socket
  14. ReconnectIvl
    設置最小重連時間間隔,單位ms。默認100ms
  15. ReconnectIvlMax
    設置最大重連時間間隔,單位ms。默認0(無用)
  16. RecoveryIvl
    PgmSocket用的
  17. SendBuffer
    發送緩存大小,設置底層傳輸Socket的發送緩存大小,初始為0
  18. ReceiveBuffer
    接收緩存大小,設置底層傳輸Socket的接收緩存大小,初始為0
  19. SendHighWatermark
    Socket發送的管道的最大消息數,當發送水位達到最大時會阻塞發送。
  20. ReceiveHighWatermark
    Socket接收管道的最大消息數
  21. SendLowWatermark
    Socket發送低水位,消息的最小數量單位,每次達到多少消息數量才向Session管道才激活寫事件。默認1000
  22. ReceiveLowWatermark
    Socket接收低水位,消息的最小數量單位,每次達到多少消息數量Session管道才激活讀事件。默認1000
  23. SendTimeout
    Socket發送操作超時時間
  24. TcpKeepalive
    TCP保持連接設置,默認-1不修改配置
  25. TcpKeepaliveIdle
    TCP心跳包在空閑時的時間間隔,默認-1不修改配置
  26. TcpKeepaliveIntvl
    TCP心跳包時間間隔,默認-1不修改配置
  27. DisableTimeWait
    客戶端斷開連接時禁用TIME_WAIT TCP狀態

Pipe

上一章我們講到過在SocketBaseSessionBase是通過2條單向管道進行消息傳輸,傳輸的消息單位是Msg,消息管道是YPipe<Msg>類型,那么YPipe<>又是什么呢?

YPipe

Ypipe內部實際維護這一個YQueue類型的先進先出隊列,YPipe向外暴露了一下方法:

  1. TryRead
    該方法用於判斷當前隊列是否可讀,可讀的話第一個對象出隊
public bool TryRead(out T value)
{
    if (!CheckRead())
    {
        value = default(T);
        return false;
    }
    value = m_queue.Pop();
    return true;
}
  1. Unwrite
    取消寫入消息
public bool Unwrite(ref T value)
{
    if (m_flushToIndex == m_queue.BackPos)
        return false;
    value = m_queue.Unpush();

    return true;
}
  1. 寫入消息
    將消息寫入到隊列中,若寫入未完成則當前消息的指針索引指向當前隊列塊的后一位。
public void Write(ref T value, bool incomplete)
{
    m_queue.Push(ref value);

    // Move the "flush up to here" pointer.
    if (!incomplete)
    {
        m_flushToIndex = m_queue.BackPos;
    }
}
  1. 完成寫入
    當該部分消息寫完時,則會調用Flush完成寫入並通知另一個管道消息可讀
public void Flush()
{
    if (m_state == State.Terminating)
        return;
    if (m_outboundPipe != null && !m_outboundPipe.Flush())
        SendActivateRead(m_peer);
}

Msg

寫入的消息單位是Msg,它實現了多條數據的存儲,當每次數據寫完還有數據帶寫入時通過將Flag標記為More表示消息還沒寫入完。

YQueue

YQueue是由一個個trunk組成的,每個trunk就是一個消息塊,每個消息塊可能包含多個Msg,主要由寫入消息時是否還有更多消息帶寫入(Flag)決定。trunk是一個雙向循環鏈表,內部維護着一個數組用於存放數據,每個數據會有2個指針,分別指向前一個塊和后一個塊,每個塊還有一個索引,表示當前塊在隊列中的位置。

private sealed class Chunk
{
    public Chunk(int size, int globalIndex)
    {
        Values = new T[size];
        GlobalOffset = globalIndex;
        Debug.Assert(Values != null);
    }
    
    /// <summary>數據</summary>
    public T[] Values { get; }
    
    /// <summary>當前塊在隊列中的位置</summary>
    public int GlobalOffset { get; }
    /// <summary>前一個塊</summary>
    [CanBeNull]
    public Chunk Previous { get; set; }

    /// <summary>下一個塊</summary>
    [CanBeNull]
    public Chunk Next { get; set; }
}

每個chunk默認最多可保存256個部分。
由於每次向SocketBase寫入的Msg可能有多個部分,因此消息會寫入到數組中,所有消息寫完后指向trunk的指針才會后移一位。
YQueue有以下字段

//用於記錄當前塊消息的個數,默認為256
private readonly int m_chunkSize;

// 當隊列是空的時,下一個塊指向null,首尾塊都指向初始化的一個塊,開始位置的塊僅用於隊列的讀取(front/pop),最后位置的僅用於隊列的寫入(back/push)。
// 開始位置
private volatile Chunk m_beginChunk;
//chunk的當前可讀位置索引
private int m_beginPositionInChunk;
//指向后一個塊
private Chunk m_backChunk;
//chunk的最后一個可讀位置索引
private int m_backPositionInChunk;
//指向后一個塊
private Chunk m_endChunk;
//chunk的下一個可寫位置索引
private int m_endPosition;
//當達到最大Msg數量時,擴展一個chunk,最大為256個塊
private Chunk m_spareChunk;

當前trunk頭部在整個隊列中的的索引位置
private int m_nextGlobalIndex;

YPipe寫入Msg實際是向YQueue入隊

public void Push(ref T val)
{
    m_backChunk.Values[m_backPositionInChunk] = val;
    //指向后一個塊
    m_backChunk = m_endChunk;
    //索引更新到最后可讀位置
    m_backPositionInChunk = m_endPosition;
    //下一個可寫位置向后移動一位
    m_endPosition++;
    if (m_endPosition != m_chunkSize)
        return;
    //到達最后一個位置則需要擴充一個塊
    Chunk sc = m_spareChunk;
    if (sc != m_beginChunk)
    {
        //已經擴充了塊則更新下一個塊的位置
        m_spareChunk = m_spareChunk.Next;
        m_endChunk.Next = sc;
        sc.Previous = m_endChunk;
    }
    else
    {
        //新建一個塊,並更新索引位置
        m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex);
        m_nextGlobalIndex += m_chunkSize;
        m_endChunk.Next.Previous = m_endChunk;
    }
    m_endChunk = m_endChunk.Next;
    當前塊的局部位置從0開始
    m_endPosition = 0;
}

每次消息寫完消息時調用YPipeFlush方法完成當前消息的寫入

public bool Flush()
{
    //只有一條Msg
    if (m_flushFromIndex == m_flushToIndex)
    {
        return true;
    }
    //將m_lastAllowedToReadIndex更新為flushToIndex
    if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex)
    {
        //沒有數據寫入時,lastAllowedToReadIndex為-1,表示沒有數據可讀,因此這里不需要關系線程安全
        Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex);
        m_flushFromIndex = m_flushToIndex;
        return false;
    }
    有數據寫入時更新指針
    m_flushFromIndex = m_flushToIndex;
    return true;
}

總結

該篇在上一片的基礎上對SocketBaseSessionBase進行了一些細節上的補充。同時,對NetMQ的配置參數進行了一些介紹,最后對消息管道進行了簡單講解。


20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
本文地址:https://www.cnblogs.com/Jack-Blog/p/7117798.html
作者博客:傑哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM