消息隊列NetMQ 原理分析1-Context和ZObject


前言

介紹

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標准socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.0-rc5。本文檔是對4.0.0-rc5分支代碼進行分析。

zeromq的英文文檔
NetMQ的英文文檔

目的

對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計划如下:

  1. 消息隊列NetMQ 原理分析1-Context和ZObject
  2. 消息隊列NetMQ 原理分析2-IO線程和完成端口
  3. 消息隊列NetMQ 原理分析3-命令產生/處理和回收線程
  4. 消息隊列NetMQ 原理分析4-Session和Pipe
  5. 消息隊列NetMQ 原理分析5-Engine
  6. 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
  7. 消息隊列NetMQ 原理分析7-Device
  8. 消息隊列NetMQ 原理分析8-不同類型的Socket
  9. 消息隊列NetMQ 原理分析9-實戰

友情提示: 看本系列文章時最好獲取源碼,更有助於理解。


Context

NetMQ有一個Context對象,用於初始化並保存當前NetMQ底層的對象狀態,如IO線程、回收線程、進程間傳輸節點字典、插槽m_slots(用於保存IO對象,回收對象和socket對象的Mailbox)、初始化但未用到的Socket對象指針數組以及當前Mailbox(用於接收終止信號)等。

初始化Context

當創建第一個Socket對象時會初始化IO線程,回收線程以及工作線程。默認Socket數量1024個,IO線程1個,回收線程1個。
m_slots = new Mailbox[m_slotCount];//m_soltCount = 1 + 1 + 1024
m_slots[0]保存的是Context的Mailbox。
m_slots[TermTid] = m_termMailbox;//用於當前Context接收終止信號
m_slots[1]保存的是回收對象的Mailbox,保存完畢后就會啟動回收對象輪詢線程。

m_reaper = new Reaper(this, ReaperTid);//ReaperTid = 1
m_slots[ReaperTid] = m_reaper.Mailbox;
m_reaper.Start();

m_slots[2]保存的是IO線程對象的Mailbox。

for (int i = 2; i != ios/*ios = 1,默認用1個io線程*/ + 2; i++)
{
    IOThread ioThread = new IOThread(this, i);
    m_ioThreads.Add(ioThread);
    m_slots[i] = ioThread.Mailbox;
    ioThread.Start();
}

其余1024個slot保存的是socket對象的Mailbox,當socket還沒使用是,slots保存的是null,占個位置,同時m_emptySlots。

//m_soltCount = 1 + 1 + 1024
for (int i = (int)m_slotCount - 1; >= (int)ios + 2; i--)
{
    m_emptySlots.Push(i);
    m_slots[i] = null;
}

創建SocketBase

無論是什么類型的Socket全都是在Context中進行創建或釋放的。NetMQ中不同Socket都繼承自SocketBase,在Context未中止且Socket未滿時,會從m_emptySlots棧中Pop出一個未使用的指針。若創建失敗,則重新加回到棧中,否則更新當前使用的Socket的集合加入該Socket並更新m_slots的Mailbox

//slot是當前socket在s_slots中的位置,也用於生成SocketBase的`ThreadId`
int slot = m_emptySlots.Pop();
//  sid是生成並遞增的唯一的socket ID,用於SocketBase創建MailBox命名用,並無實際其他作用。
int sid = Interlocked.Increment(ref s_maxSocketId);
s = SocketBase.Create(type, this, slot, sid);
if (s == null)
{
    m_emptySlots.Push(slot);
    return null;
}
m_sockets.Add(s);
m_slots[slot] = s.Mailbox;

釋放SocketBase

Reaper要釋放某個SocketBase時,最終會調用Context的DestroySocket方法。

tid = socket.ThreadId;
//重新加入到可用socket棧中
m_emptySlots.Push(tid);
//關閉連接
m_slots[tid].Close();
//清空引用
m_slots[tid] = null;
//  從當前使用socket集合移除
m_sockets.Remove(socket);
//若當前接收到中止信號且當前socket全部已釋放時停止回收線程
if (m_terminating && m_sockets.Count == 0)
    m_reaper.Stop();

緩存進程內通信Socket

NetMQ除了支持TCP以外還支持inproc(進程內通訊),ipc(進程間通訊),pgm和epgm(多路廣播)等傳輸協議。
Context會用一個字典管理當前使用inpoc的socket。
當inpoc的socket進行綁定時會加入到字典緩存中。釋放時會從字典緩存中移除。當使用inpoc協議連接時,增加當前綁定inpoc地址的連接數。

ZObject

ZObject是NetMQ的Session(狀態),IOThread(IO線程),Repear(回收線程),Pipe(管道),Own(所屬關系)對象的基類,它是包含2個信息,當前全局Context對象,以及當前對象處理的線程Id。所有socket最終都是繼承自該對象。因此ZObject對象需要知道IO對象接收到不同命令時如何進行處理命令。
NetMQ中一共定義了一下的命令類型

public enum CommandType
{
    //  發送給IO線程表示當前對象需要停止
    Stop,
    //  發送給IO線程表示當前對象需要注冊到IO線程中
    Plug,
    //  將創建的對象Session的加入到當前Socket的所屬集合中
    Own,
    //  附加engine到Session中
    Attach,
    //  建立session到Socket之間的管道,在握手之前調用inc_seqnum.
    Bind,
    //  通過寫管道發送通知給讀管道多少信息可讀
    ActivateRead,
    //  通過讀管道發送通知給寫讀管道多少信息可寫
    ActivateWrite,
    //  創建一個新的管道后通過讀管道發送給寫管道
    //  參數是管道類型,然而,他的目的地是私有的,因此我們必須用void指針, however,
    Hiccup,
    //  通過讀管道發送到寫管道告訴他中止所有管道
    PipeTerm,
    //  寫管道對PipeTerm命令響應
    PipeTermAck,
    //  通過IO對象發送給socket請求終端IO對象
    TermReq,
    //  通過socket發送給IO對象他自己開始關閉
    Term,
    //  通過IO對象發送給socket讓它知道已經關閉
    TermAck,
    //  將關閉套接字的所有權轉移給回收線程.
    Reap,
    //  關閉套接字通知回收線程他已經釋放
    Reaped,
    //  當所有socket都被釋放通過回收線程發送給 term 線程
    Done
}

根據不同命令類型進行處理,處理方式由具體的Socket子類去重載。

public void ProcessCommand(Command cmd)
{
    switch (cmd.CommandType)
    {
        case CommandType.ActivateRead:
            ProcessActivateRead();
            break;
        case CommandType.ActivateWrite:
            ProcessActivateWrite((long)cmd.Arg);
            break;
        case CommandType.Stop:
            ProcessStop();
            break;
        case CommandType.Plug:
            ProcessPlug();
            ProcessSeqnum();
            break;
        case CommandType.Own:
            ProcessOwn((Own)cmd.Arg);
            ProcessSeqnum();
            break;
        case CommandType.Attach:
            ProcessAttach((IEngine)cmd.Arg);
            ProcessSeqnum();
            break;
        case CommandType.Bind:
            ProcessBind((Pipe)cmd.Arg);
            ProcessSeqnum();
            break;
        case CommandType.Hiccup:
            ProcessHiccup(cmd.Arg);
            break;
        case CommandType.PipeTerm:
            ProcessPipeTerm();
            break;
        case CommandType.PipeTermAck:
            ProcessPipeTermAck();
            break;
        case CommandType.TermReq:
            ProcessTermReq((Own)cmd.Arg);
            break;
        case CommandType.Term:
            ProcessTerm((int)cmd.Arg);
            break;
        case CommandType.TermAck:
            ProcessTermAck();
            break;
        case CommandType.Reap:
            ProcessReap((SocketBase)cmd.Arg);
            break;
        case CommandType.Reaped:
            ProcessReaped();
            break;
        default:
            throw new ArgumentException();
    }
}

處理進程間通信協議

當創建進程間通信socket時,會調用ZObejct的RegisterEndpoint將socket對象加入到Context的使用inpoc協議的socket字段緩存中,而ZObject實際是調用Context的方法RegisterEndpoint,釋放使用inpoc協議的socket和使用inpoc進行連接方式和RegisterEndpoint一樣。

protected void RegisterEndpoint(String addr, Ctx.Endpoint endpoint)
{
    //m_ctx是在ZObejct初始化是傳進來的Context引用
    m_ctx.RegisterEndpoint(addr, endpoint);
}

多個IO線程

默認的IO線程數量是1個,當然也可以使用多個IO線程並發去處理,因此當創建監聽對象或創建連接時則需要進行負載均衡,平分到多個IO線程去處理,切換IO線程也是在Context中實現的。

protected IOThread ChooseIOThread(long affinity)
{
    return m_ctx.ChooseIOThread(affinity);
}
public IOThread ChooseIOThread(long affinity)
{
    //affinity表示哪些IO線程有資格,默認為0表示所有IO線程都可以處理。
    if (m_ioThreads.Count == 0)
        return null;
    //  Find the I/O thread with minimum load.
    int minLoad = -1;
    IOThread selectedIOThread = null;
    for (int i = 0; i != m_ioThreads.Count; i++)
    {
        if (affinity == 0 || (affinity & (1L << i)) > 0)
        {
            //獲取IO線程socket載入次數
            int load = m_ioThreads[i].Load;
            //這里對IO線程進行負載均衡
            if (selectedIOThread == null || load < minLoad)
            {
                minLoad = load;
                selectedIOThread = m_ioThreads[i];
            }
        }
    }
    return selectedIOThread;
}

總結

該篇介紹了Context和ZObject。NetMQ所有的socket對象創建,釋放都離不開Context,由於Context內部對必要操作都加了鎖,因此它是線程安全的。


20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
本文地址:https://www.cnblogs.com/Jack-Blog/p/6287458.html
作者博客:傑哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM