消息隊列NetMQ 原理分析3-命令產生/處理和回收線程



前言

介紹

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標准socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。

zeromq的英文文檔
NetMQ的英文文檔

目的

對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計划如下:

  1. 消息隊列NetMQ 原理分析1-Context和ZObject
  2. 消息隊列NetMQ 原理分析2-IO線程和完成端口
  3. 消息隊列NetMQ 原理分析3-命令產生/處理、創建Socket和回收線程
  4. 消息隊列NetMQ 原理分析4-Session、Option和Pipe
  5. 消息隊列NetMQ 原理分析5-Engine
  6. 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
  7. 消息隊列NetMQ 原理分析7-Device
  8. 消息隊列NetMQ 原理分析8-不同類型的Socket
  9. 消息隊列NetMQ 原理分析9-實戰

友情提示: 看本系列文章時最好獲取源碼,更有助於理解。


命令

命令結構

Command定義如下

internal struct Command
{
    public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this()
    {
        Destination = destination;
        CommandType = type;
        Arg = arg;
    }
    [CanBeNull]
    public ZObject Destination { get; }
    public CommandType CommandType { get; }
    [CanBeNull]
    public object Arg { get; private set; }        
    public override string ToString()
    {
        return base.ToString() + "[" + CommandType + ", " + Destination + "]";
    }
}

其包含了3個信息:調用者,命令類型和命令參數。

命令產生

還記的《消息隊列NetMQ 原理分析1-Context和ZObject》中我們介紹過NetMQ中的命令類型嗎?待處理命令全部會存放着Socket的信箱中。當Socket有命令(連接完成、發送完成或接受完成等)需要處理時調用基類ZObjectSendCommand方法。

private void SendCommand([NotNull] Command cmd)
{
    m_ctx.SendCommand(cmd.Destination.ThreadId, cmd);
}

ZObject實際調用Context的SendCommand方法

public void SendCommand(int threadId, [NotNull] Command command)
{
    m_slots[threadId].Send(command);
}

m_slots[threadId]保存的是當前IO線程的IO信箱IOThreadMailbox,在《消息隊列NetMQ 原理分析2-IO線程和完成端口》
我們簡單介紹了IOThreadMailbox的結構。

[NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");

IOThreadMailbox中維護這一個Command管道,該管道實際就是一個先進先出隊列,詳細解析會在第四章進行介紹。

public void Send(Command command)
{
    bool ok;
    lock (m_sync)
    {
        //向管道寫入命令
        m_commandPipe.Write(ref command, false);
        //成功寫入會返回false,表示有命令需要處理
        ok = m_commandPipe.Flush();
    }
    if (!ok)
    {
        //向完成端口傳遞信號
        m_proactor.SignalMailbox(this);
    }
}

public bool TryRecv(out Command command)
{
    return m_commandPipe.TryRead(out command);
}

public void RaiseEvent()
{
    if (!m_disposed)
    {
        m_mailboxEvent.Ready();
    }
}

命令發送完成調用Flush方法更新指針下標。返回ok若為true,表示管道已全部讀取完畢,無需發送信號量通知處理。若返回false,則需要向內核發送一個信號,IO線程獲取到則調用到指定的命令事件。

IOThreadMailbox的主要就是這三個方法

  1. 當有命令來的時候調用Send方法向管道(隊列)寫入命令。寫完時,會向完成端口傳遞信號。
  2. 當有命令需要處理時調用TryRecv方法讀取
  3. 當完成端口接收到信號需要命令處理時,調用RaiseEvent(實際是信箱的IO線程的RaiseEvent方法)進行處理命令。
public void SignalMailbox(IOThreadMailbox mailbox)
{
    //該方法會向完成端口的隊列中插入一個信號狀態
    m_completionPort.Signal(mailbox);
}

有關於完成端口介紹請查看《消息隊列NetMQ 原理分析2-IO線程和完成端口》

命令處理

當有命令需要處理時,完成端口會接收到信號。

private void Loop()
{
    ...
    int timeout = ExecuteTimers();
    int removed;
    if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
        continue;
    for (int i = 0; i < removed; i++)
    {
        try
        {
            if (completionStatuses[i].OperationType == OperationType.Signal)
            {
                var mailbox = (IOThreadMailbox)completionStatuses[i].State;
                mailbox.RaiseEvent();
            }
            ...
        }
        ...
    }
    ...
}

在線程輪詢方法Loop中,當接收到需要處理的數據時,首先會判斷是否是信號,若為信號,則將狀態(參數)轉化為IOThreadMailbox類型,同時調用RaiseEvent方法處理命令。

public void Ready()
{
    Command command;
    while (m_mailbox.TryRecv(out command))
        command.Destination.ProcessCommand(command);
}

當有命令需要處理時,會調用IOThreadMailboxTryRecv方法從管道(隊列,先進先出)中獲取第一個命令進行處理。

創建Socket(SocketBase)

在介紹回收線程工作之前,我們先看下創建一個新的Socket做了哪些工作,這里的Socket實際是NetMQ中的SocketBase

RequestSocket socket = new RequestSocket();
socket.Connect("tcp://127.0.0.1:12345");

NetMQSocket是NetMQ的Socket的基類。

public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect)
{

}
internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction)
{
    m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);
    m_netMqSelector = new NetMQSelector();
    Options = new SocketOptions(this);
    m_socketEventArgs = new NetMQSocketEventArgs(this);

    Options.Linger = NetMQConfig.Linger;

    if (!string.IsNullOrEmpty(connectionString))
    {
        var endpoints =
            connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
                .Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));

        foreach (string endpoint in endpoints)
        {
            if (endpoint[0] == '@')
            {
                Bind(endpoint.Substring(1));
            }
            else if (endpoint[0] == '>')
            {
                Connect(endpoint.Substring(1));
            }
            else if (defaultAction == DefaultAction.Connect)
            {
                Connect(endpoint);
            }
            else
            {
                Bind(endpoint);
            }
        }
    }
}

首先會根據Socket的類型創建對應的Socket,調用的是ContextCreateSocket方法。具體的請看創建SocketBase。最終創建方法是調用SocketBaseCreate方法

public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{
    switch (type)
    {
        ...
        case ZmqSocketType.Req:
            return new Req(parent, threadId, socketId);
        ...
        default:
            throw new InvalidException("SocketBase.Create called with invalid type of " + type);
    }
}

創建完后,就對地址進行解析。若有多個地址,則可用,分隔。

var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
    .Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));

解析完成后則用默認的方式進行綁定或連接,如RequestSocket默認為連接,而ResponseSocket則為綁定。

創建連接

  1. 首先對地址進行解析,判斷當前是tcp還是其他協議。然后會根據協議類型創建對應的Socket,具體的協議類型分析請查看《消息隊列NetMQ 原理分析6-TCP和Inpoc實現》

    private static void DecodeAddress([NotNull] string addr, out string address, out string protocol)
    {
        const string protocolDelimeter = "://";
        int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);
    
        protocol = addr.Substring(0, protocolDelimeterIndex);
        address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
    }
    
  2. 負載均衡選擇一個IO線程。

  3. 創建Session,SocketSession的關系如圖所示

  4. 創建管道,創建管道會創建一對單向管道,形成“一個”雙向管道。頭尾分別連接SocketSession,如上圖所示。創建管道完畢后需要設置管道的回調事件,管道1設置回調為Socket的回調方法,管道2設置為Session的回調方法。

具體關於SessionPipe的內容請查看《消息隊列NetMQ 原理分析4-Session、Option和Pipe》

  1. 處理SocketSession的關系
protected void LaunchChild([NotNull] Own obj)
{
    // Specify the owner of the object.
    obj.SetOwner(this);
    // Plug the object into the I/O thread.
    SendPlug(obj);
    // Take ownership of the object.
    SendOwn(this, obj);
}
  • Session的宿主設置為該Socket
private void SetOwner([NotNull] Own owner)
{
    Debug.Assert(m_owner == null);
    m_owner = owner;
}
  • 為IO對象設置Session,當管道有數據交互時,Session的回調方法就會觸發。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
{
    if (incSeqnum)
        destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Plug));
}

SessionBaseProcessPlug會被觸發

protected override void ProcessPlug()
{
    m_ioObject.SetHandler(this);
    if (m_connect)
        StartConnecting(false);
}
  • 將當前Session加入到SocketSession集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
{
    destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Own, obj));
}

SocketBase的父類方法SendOwn(Own方法)方法會被觸發,將Session加入到集合中

protected override void ProcessOwn(Own obj)
{
    ...
    // Store the reference to the owned object.
    m_owned.Add(obj);
}

創建綁定

  1. 首先對地址進行解析,判斷當前是tcp還是其他協議。然后會根據協議類型創建對應的Socket,具體的協議類型分析請查看《消息隊列NetMQ 原理分析6-TCP和Inpoc實現》

    private static void DecodeAddress([NotNull] string addr, out string address, out string protocol)
    {
        const string protocolDelimeter = "://";
        int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);
    
        protocol = addr.Substring(0, protocolDelimeterIndex);
        address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
    }
    
  2. 負載均衡選擇一個IO線程。

  3. 處理SocketSession的關系

protected void LaunchChild([NotNull] Own obj)
{
    // Specify the owner of the object.
    obj.SetOwner(this);
    // Plug the object into the I/O thread.
    SendPlug(obj);
    // Take ownership of the object.
    SendOwn(this, obj);
}
  • Listener的宿主設置為該Socket
private void SetOwner([NotNull] Own owner)
{
    Debug.Assert(m_owner == null);
    m_owner = owner;
}
  • 為IO對象設置Listener,當管道有數據交互是,Listener的回調方法就會觸發。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
{
    if (incSeqnum)
        destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Plug));
}

ListenerProcessPlug會被觸發

protected override void ProcessPlug()
{
    m_ioObject.SetHandler(this);
    m_ioObject.AddSocket(m_handle);
    //接收異步socket
    Accept();
}
  • 將當前Listener加入到SocketListener集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
{
    destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Own, obj));
}

SocketBase的父類方法SendOwn(Own方法)方法會被觸發,將Listener加入到集合中

protected override void ProcessOwn(Own obj)
{
    ...
    // Store the reference to the owned object.
    m_owned.Add(obj);
}

SocketBase的創建處理就完成了

回收線程

(垃圾)回收線程是專門處理(清理)異步關閉的Socket的線程,它在NetMQ中起到至關重要的作用。

internal class Reaper : ZObject, IPollEvents
{
   ... 
}

Reaper是一個ZObject對象,同時實現了IPollEvents接口,該接口的作用是當有信息接收或發送時進行處理。回收線程實現了InEvent方法。

internal interface IPollEvents : ITimerEvent
{
    void InEvent();
    void OutEvent();
}

InEvent方法實現和IO線程的Ready方法很像,都是遍歷需要處理的命令進行處理。

public void InEvent()
{
    while (true)
    {
        Command command;
        if (!m_mailbox.TryRecv(0, out command))
            break;
        command.Destination.ProcessCommand(command);
    }
}

初始化回收線程

public Reaper([NotNull] Ctx ctx, int threadId)
    : base(ctx, threadId)
{
    m_sockets = 0;
    m_terminating = false;

    string name = "reaper-" + threadId;
    m_poller = new Utils.Poller(name);

    m_mailbox = new Mailbox(name);

    m_mailboxHandle = m_mailbox.Handle;
    m_poller.AddHandle(m_mailboxHandle, this);
    m_poller.SetPollIn(m_mailboxHandle);
}
  1. 初始化回收線程是會創建一個Poller對象,用於輪詢回收SocketBase
  2. 初始化回收線程會創建一個Mailbox對象用於Command的收發

MailBox

internal class Mailbox : IMailbox{
    ...
}

MailBoxIO線程IOThreadMailbox一樣,實現了IMailbox接口。

釋放SocketBase

當有SocketBase需要釋放時,會向完成端口發送Reap信號。

public void Close()
{
    // Mark the socket as disposed
    m_disposed = true;
    //工作線程向Socket郵箱發送Reap信號
    //回收線程會做剩下的工作
    SendReap(this);
}

發送回收命令

向回收線程的郵箱發送當前SocketBase的回收命令

protected void SendReap([NotNull] SocketBase socket)
{
    SendCommand(new Command(m_ctx.GetReaper(), CommandType.Reap, socket));
}

處理回收命令

Reap接收到釋放信號進行處理

protected override void ProcessReap(SocketBase socket)
{
    // Add the socket to the poller.
    socket.StartReaping(m_poller);
    ++m_sockets;
}

SocketBase回收

  1. 將當前Socket的加入到回收線程的中,當Socket接收到數據時,由回收線程回調該Socket的處理事件進行處理。
  2. 當前Socket終止處理
  3. 最后確認釋放
internal void StartReaping([NotNull] Poller poller)
{
    m_poller = poller;
    m_handle = m_mailbox.Handle;
    m_poller.AddHandle(m_handle, this);
    m_poller.SetPollIn(m_handle);
    Terminate();
    CheckDestroy();
}
終止處理
  1. 終止Socket時,直接終止即可

默認情況下NetMQLinger值被設置為-1,就是說如果網絡讀寫沒有進行完是不能退出的。如果Linger被設置為0,那么中斷時會丟棄一切未完成的網絡操作。如果Linger被設置的大於0,那么將等待Linger毫秒用來完成未完成的網絡讀寫,在指定的時間里完成或者超時都會立即返回。

  1. 若終止的是Session,則需要發送請求清理關聯Socket的當前Session對象
protected void Terminate()
{
    ...
    if (m_owner == null)
    {
        // 釋放的是Socket,Owner為空
        ProcessTerm(m_options.Linger);
    }
    else
    {
        // 釋放的是Session則會關聯一個Socket
        SendTermReq(m_owner, this);
    }
}
終止SocketBase
  1. 終止SocketBase時,需要先中斷當前SocketBase關聯的SessionBase
  2. 然后增加需要終端請求響應的個數,當全部都響應了則處理第四步驟
  3. 清空當前關聯的Session集合
  4. 最后當Session全部終止后發送給當前Socket宿主終端響應(TermAck)
protected override void ProcessTerm(int linger)
{
    ...
    // 斷開所有session的連接
    foreach (Own it in m_owned)
    {
        SendTerm(it, linger);
    }
    RegisterTermAcks(m_owned.Count);
    m_owned.Clear();
    CheckTermAcks();
}
終止當前Socket關聯的Session
  1. 如果終端管道命令在終止命令前處理了,則立即終止當前Session
  2. 標記當前准備終止
  3. Ligner大於0 則等到N毫秒后再終止終止SocketSession之間的管道
  4. 檢查管道是否還有數據要讀取
protected override void ProcessTerm(int linger)
{
    if (m_pipe == null)
    {
        ProceedWithTerm();
        return;
    }

    m_pending = true;

    if (linger > 0)
    {
        Debug.Assert(!m_hasLingerTimer);
        m_ioObject.AddTimer(linger, LingerTimerId);
        m_hasLingerTimer = true;
    }
    // 是否需要等待一定時間后消息處理完再終止管道.
    m_pipe.Terminate(linger != 0);

    // TODO: Should this go into pipe_t::terminate ?
    // In case there's no engine and there's only delimiter in the
    // pipe it wouldn't be ever read. Thus we check for it explicitly.
    m_pipe.CheckRead();
}
終止管道

管道狀態如下所示

private enum State
{
    /// <summary> Active 表示在中斷命令開始前的狀態 </summary>
    Active,
    /// <summary> Delimited 表示在終端命令接收前從管道接收到分隔符</summary>
    Delimited,
    /// <summary> Pending 表示中斷命令已經從管道接收,但是仍有待定消息可讀</summary>
    Pending,
    /// <summary> Terminating 表示所有待定消息都已經讀取等待管道終止確認信號返回 </summary>
    Terminating,
    /// <summary> Terminated 表示終止命令是由用戶顯示調用 </summary>
    Terminated,
    /// <summary> Double_terminated 表示用戶調用了終止命令同時管道也調用了終止命令 </summary>
    DoubleTerminated
}
  1. 終止當前管道
    若當前狀態為TerminatedDoubleTerminatedTerminating不再處理終止命令
public void Terminate(bool delay)
{
    //判斷當前狀態是否可處理終止命令
    ...

    if (m_state == State.Active)
    {
        // 向另一個管道發送終止命令然后等待確認終止
        SendPipeTerm(m_peer);
        m_state = State.Terminated;
    }
    else if (m_state == State.Pending && !m_delay)
    {
        // 若有待處理數據,但是不等待直接終止,則向另一個管道發送確認終止.
        m_outboundPipe = null;
        SendPipeTermAck(m_peer);
        m_state = State.Terminating;
    }
    else if (m_state == State.Pending)
    {
        //若有待處理數據但是需要等到則不處理.
    }
    else if (m_state == State.Delimited)
    {
        //若已經獲取到限定符但是還沒有收到終止命令則忽略定界符,然后發送終止命令給另一個管道 
        SendPipeTerm(m_peer);
        m_state = State.Terminated;
    }
    else
    {
        // 沒有其他狀態
        Debug.Assert(false);
    }
    //停止向外發送的消息
    m_outActive = false;

    if (m_outboundPipe != null)
    {
        //拋棄未發送出的消息.
        Rollback();

        // 這里不會再先查水位,所以即使管道滿了也可再寫入,向管道寫入定界符 .
        var msg = new Msg();
        msg.InitDelimiter();
        m_outboundPipe.Write(ref msg, false);
        Flush();
    }
}
  1. 終止另一個管道
protected override void ProcessPipeTerm()
{
    // 這是一個簡單的例子有道管道終止 
    //若沒有更多待處理消息需要讀取,或者這個管道已經丟去待處理數據,我們直接將狀態設置為正在終止(terminating),否則我們擱置待處理狀態直到所有待處理消息被發送
    if (m_state == State.Active)
    {
        if (!m_delay)
        {
            //不需要等到消息處理
            m_state = State.Terminating;
            m_outboundPipe = null;
            //發送終止確認
            SendPipeTermAck(m_peer);
        }
        else
            m_state = State.Pending;
        return;
    }
    // 若定界符碰巧在終止命令之前到達,將狀態改為正在終止
    if (m_state == State.Delimited)
    {
        m_state = State.Terminating;
        m_outboundPipe = null;
        SendPipeTermAck(m_peer);
        return;
    }
    // 當管道並發關閉,則狀態改為DoubleTerminated
    if (m_state == State.Terminated)
    {
        m_state = State.DoubleTerminated;
        m_outboundPipe = null;
        SendPipeTermAck(m_peer);
        return;
    }
    // pipe_term is invalid in other states.
    Debug.Assert(false);
}
  1. 確認終止
protected override void ProcessPipeTermAck()
{
    // 通知Socket或Session中斷當前管道 .
    Debug.Assert(m_sink != null);
    m_sink.Terminated(this);

    // 若正則處理或double_terminated這里不做任何事 
    // 簡化釋放管道,在已終止狀態,我們必須在釋放這個管道之前確認
    //其他狀態都是非法的 
    if (m_state == State.Terminated)
    {
        m_outboundPipe = null;
        SendPipeTermAck(m_peer);
    }
    else
        Debug.Assert(m_state == State.Terminating || m_state == State.DoubleTerminated);

    // 刪除所有管道中的未讀消息,然后釋放流入管道 
    var msg = new Msg();
    while (m_inboundPipe.TryRead(out msg))
    {
        msg.Close();
    }

    m_inboundPipe = null;
}

整體回收Socket流程圖如下:

public virtual void InEvent()
{
    // 回收線程命令會調用此事件
    try
    {
        ProcessCommands(0, false);
    }
    catch
    {
        // ignored
    }
    finally
    {
        CheckDestroy();
    }
}
private void CheckDestroy()
{
    // socket釋放完則做最后的清除和釋放工作.
    if (m_destroyed)
    {
        // 從回收線程移除輪詢
        m_poller.RemoveHandle(m_handle);
        // 釋放socke.
        DestroySocket(this);
        // 通知已釋放.
        SendReaped();
        // Deallocate.
        base.ProcessDestroy();
    }
}

總結

該篇介紹命令處理方式和回收線程回收Socket,順便介紹了下創建SocketBase的細節性問題。以便對釋放Socket有更清晰的認識。


本文地址:https://www.cnblogs.com/Jack-Blog/p/6774902.html
作者博客:傑哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM