前言
介紹
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標准socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。
目的
對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計划如下:
- 消息隊列NetMQ 原理分析1-Context和ZObject
- 消息隊列NetMQ 原理分析2-IO線程和完成端口
- 消息隊列NetMQ 原理分析3-命令產生/處理、創建Socket和回收線程
- 消息隊列NetMQ 原理分析4-Session、Option和Pipe
- 消息隊列NetMQ 原理分析5-Engine
- 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
- 消息隊列NetMQ 原理分析7-Device
- 消息隊列NetMQ 原理分析8-不同類型的Socket
- 消息隊列NetMQ 原理分析9-實戰
友情提示: 看本系列文章時最好獲取源碼,更有助於理解。
命令
命令結構
Command定義如下
internal struct Command
{
public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this()
{
Destination = destination;
CommandType = type;
Arg = arg;
}
[CanBeNull]
public ZObject Destination { get; }
public CommandType CommandType { get; }
[CanBeNull]
public object Arg { get; private set; }
public override string ToString()
{
return base.ToString() + "[" + CommandType + ", " + Destination + "]";
}
}
其包含了3個信息:調用者,命令類型和命令參數。
命令產生
還記的《消息隊列NetMQ 原理分析1-Context和ZObject》中我們介紹過NetMQ中的命令類型嗎?待處理命令全部會存放着Socket
的信箱中。當Socket
有命令(連接完成、發送完成或接受完成等)需要處理時調用基類ZObject
的SendCommand
方法。
private void SendCommand([NotNull] Command cmd)
{
m_ctx.SendCommand(cmd.Destination.ThreadId, cmd);
}
ZObject
實際調用Context
的SendCommand方法
public void SendCommand(int threadId, [NotNull] Command command)
{
m_slots[threadId].Send(command);
}
m_slots[threadId]
保存的是當前IO線程的IO信箱IOThreadMailbox
,在《消息隊列NetMQ 原理分析2-IO線程和完成端口》
我們簡單介紹了IOThreadMailbox
的結構。
[NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");
IOThreadMailbox
中維護這一個Command
管道,該管道實際就是一個先進先出隊列,詳細解析會在第四章進行介紹。
public void Send(Command command)
{
bool ok;
lock (m_sync)
{
//向管道寫入命令
m_commandPipe.Write(ref command, false);
//成功寫入會返回false,表示有命令需要處理
ok = m_commandPipe.Flush();
}
if (!ok)
{
//向完成端口傳遞信號
m_proactor.SignalMailbox(this);
}
}
public bool TryRecv(out Command command)
{
return m_commandPipe.TryRead(out command);
}
public void RaiseEvent()
{
if (!m_disposed)
{
m_mailboxEvent.Ready();
}
}
命令發送完成調用
Flush
方法更新指針下標。返回ok
若為true,表示管道已全部讀取完畢,無需發送信號量通知處理。若返回false,則需要向內核發送一個信號,IO線程獲取到則調用到指定的命令事件。
IOThreadMailbox
的主要就是這三個方法
- 當有命令來的時候調用
Send
方法向管道(隊列)寫入命令。寫完時,會向完成端口傳遞信號。 - 當有命令需要處理時調用
TryRecv
方法讀取 - 當完成端口接收到信號需要命令處理時,調用
RaiseEvent
(實際是信箱的IO線程的RaiseEvent
方法)進行處理命令。
public void SignalMailbox(IOThreadMailbox mailbox)
{
//該方法會向完成端口的隊列中插入一個信號狀態
m_completionPort.Signal(mailbox);
}
有關於完成端口介紹請查看《消息隊列NetMQ 原理分析2-IO線程和完成端口》
命令處理
當有命令需要處理時,完成端口會接收到信號。
private void Loop()
{
...
int timeout = ExecuteTimers();
int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
continue;
for (int i = 0; i < removed; i++)
{
try
{
if (completionStatuses[i].OperationType == OperationType.Signal)
{
var mailbox = (IOThreadMailbox)completionStatuses[i].State;
mailbox.RaiseEvent();
}
...
}
...
}
...
}
在線程輪詢方法Loop
中,當接收到需要處理的數據時,首先會判斷是否是信號,若為信號,則將狀態(參數)轉化為IOThreadMailbox
類型,同時調用RaiseEvent
方法處理命令。
public void Ready()
{
Command command;
while (m_mailbox.TryRecv(out command))
command.Destination.ProcessCommand(command);
}
當有命令需要處理時,會調用IOThreadMailbox
的TryRecv
方法從管道(隊列,先進先出)中獲取第一個命令進行處理。
創建Socket(SocketBase)
在介紹回收線程工作之前,我們先看下創建一個新的Socket
做了哪些工作,這里的Socket
實際是NetMQ中的SocketBase
。
RequestSocket socket = new RequestSocket();
socket.Connect("tcp://127.0.0.1:12345");
NetMQSocket
是NetMQ的Socket
的基類。
public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect)
{
}
internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction)
{
m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);
m_netMqSelector = new NetMQSelector();
Options = new SocketOptions(this);
m_socketEventArgs = new NetMQSocketEventArgs(this);
Options.Linger = NetMQConfig.Linger;
if (!string.IsNullOrEmpty(connectionString))
{
var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
.Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));
foreach (string endpoint in endpoints)
{
if (endpoint[0] == '@')
{
Bind(endpoint.Substring(1));
}
else if (endpoint[0] == '>')
{
Connect(endpoint.Substring(1));
}
else if (defaultAction == DefaultAction.Connect)
{
Connect(endpoint);
}
else
{
Bind(endpoint);
}
}
}
}
首先會根據Socket
的類型創建對應的Socket
,調用的是Context
的CreateSocket
方法。具體的請看創建SocketBase。最終創建方法是調用SocketBase
的Create
方法
public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{
switch (type)
{
...
case ZmqSocketType.Req:
return new Req(parent, threadId, socketId);
...
default:
throw new InvalidException("SocketBase.Create called with invalid type of " + type);
}
}
創建完后,就對地址進行解析。若有多個地址,則可用,分隔。
var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
.Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));
解析完成后則用默認的方式進行綁定或連接,如RequestSocket
默認為連接,而ResponseSocket
則為綁定。
創建連接
-
首先對地址進行解析,判斷當前是tcp還是其他協議。然后會根據協議類型創建對應的Socket,具體的協議類型分析請查看《消息隊列NetMQ 原理分析6-TCP和Inpoc實現》
private static void DecodeAddress([NotNull] string addr, out string address, out string protocol) { const string protocolDelimeter = "://"; int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal); protocol = addr.Substring(0, protocolDelimeterIndex); address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length); }
-
負載均衡選擇一個IO線程。
-
創建
Session
,Socket
和Session
的關系如圖所示
-
創建管道,創建管道會創建一對單向管道,形成“一個”雙向管道。頭尾分別連接
Socket
和Session
,如上圖所示。創建管道完畢后需要設置管道的回調事件,管道1設置回調為Socket
的回調方法,管道2設置為Session
的回調方法。
具體關於
Session
和Pipe
的內容請查看《消息隊列NetMQ 原理分析4-Session、Option和Pipe》。
- 處理
Socket
和Session
的關系
protected void LaunchChild([NotNull] Own obj)
{
// Specify the owner of the object.
obj.SetOwner(this);
// Plug the object into the I/O thread.
SendPlug(obj);
// Take ownership of the object.
SendOwn(this, obj);
}
- 將
Session
的宿主設置為該Socket
private void SetOwner([NotNull] Own owner)
{
Debug.Assert(m_owner == null);
m_owner = owner;
}
- 為IO對象設置
Session
,當管道有數據交互時,Session
的回調方法就會觸發。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
{
if (incSeqnum)
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Plug));
}
SessionBase
的ProcessPlug
會被觸發
protected override void ProcessPlug()
{
m_ioObject.SetHandler(this);
if (m_connect)
StartConnecting(false);
}
- 將當前
Session
加入到Socket
的Session
集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
{
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Own, obj));
}
SocketBase
的父類方法SendOwn
(Own方法)方法會被觸發,將Session
加入到集合中
protected override void ProcessOwn(Own obj)
{
...
// Store the reference to the owned object.
m_owned.Add(obj);
}
創建綁定
-
首先對地址進行解析,判斷當前是tcp還是其他協議。然后會根據協議類型創建對應的
Socket
,具體的協議類型分析請查看《消息隊列NetMQ 原理分析6-TCP和Inpoc實現》private static void DecodeAddress([NotNull] string addr, out string address, out string protocol) { const string protocolDelimeter = "://"; int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal); protocol = addr.Substring(0, protocolDelimeterIndex); address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length); }
-
負載均衡選擇一個IO線程。
-
處理
Socket
和Session
的關系
protected void LaunchChild([NotNull] Own obj)
{
// Specify the owner of the object.
obj.SetOwner(this);
// Plug the object into the I/O thread.
SendPlug(obj);
// Take ownership of the object.
SendOwn(this, obj);
}
- 將
Listener
的宿主設置為該Socket
private void SetOwner([NotNull] Own owner)
{
Debug.Assert(m_owner == null);
m_owner = owner;
}
- 為IO對象設置
Listener
,當管道有數據交互是,Listener
的回調方法就會觸發。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true)
{
if (incSeqnum)
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Plug));
}
Listener
的ProcessPlug
會被觸發
protected override void ProcessPlug()
{
m_ioObject.SetHandler(this);
m_ioObject.AddSocket(m_handle);
//接收異步socket
Accept();
}
- 將當前
Listener
加入到Socket
的Listener
集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj)
{
destination.IncSeqnum();
SendCommand(new Command(destination, CommandType.Own, obj));
}
SocketBase
的父類方法SendOwn
(Own方法)方法會被觸發,將Listener
加入到集合中
protected override void ProcessOwn(Own obj)
{
...
// Store the reference to the owned object.
m_owned.Add(obj);
}
SocketBase
的創建處理就完成了
回收線程
(垃圾)回收線程是專門處理(清理)異步關閉的Socket
的線程,它在NetMQ中起到至關重要的作用。
internal class Reaper : ZObject, IPollEvents
{
...
}
Reaper
是一個ZObject對象,同時實現了IPollEvents
接口,該接口的作用是當有信息接收或發送時進行處理。回收線程實現了InEvent
方法。
internal interface IPollEvents : ITimerEvent
{
void InEvent();
void OutEvent();
}
InEvent
方法實現和IO線程的Ready
方法很像,都是遍歷需要處理的命令進行處理。
public void InEvent()
{
while (true)
{
Command command;
if (!m_mailbox.TryRecv(0, out command))
break;
command.Destination.ProcessCommand(command);
}
}
初始化回收線程
public Reaper([NotNull] Ctx ctx, int threadId)
: base(ctx, threadId)
{
m_sockets = 0;
m_terminating = false;
string name = "reaper-" + threadId;
m_poller = new Utils.Poller(name);
m_mailbox = new Mailbox(name);
m_mailboxHandle = m_mailbox.Handle;
m_poller.AddHandle(m_mailboxHandle, this);
m_poller.SetPollIn(m_mailboxHandle);
}
- 初始化回收線程是會創建一個
Poller
對象,用於輪詢回收SocketBase
。 - 初始化回收線程會創建一個
Mailbox
對象用於Command
的收發
MailBox
internal class Mailbox : IMailbox{
...
}
MailBox
和IO線程的IOThreadMailbox
一樣,實現了IMailbox
接口。
釋放SocketBase
當有SocketBase
需要釋放時,會向完成端口發送Reap
信號。
public void Close()
{
// Mark the socket as disposed
m_disposed = true;
//工作線程向Socket郵箱發送Reap信號
//回收線程會做剩下的工作
SendReap(this);
}
發送回收命令
向回收線程的郵箱發送當前SocketBase
的回收命令
protected void SendReap([NotNull] SocketBase socket)
{
SendCommand(new Command(m_ctx.GetReaper(), CommandType.Reap, socket));
}
處理回收命令
Reap
接收到釋放信號進行處理
protected override void ProcessReap(SocketBase socket)
{
// Add the socket to the poller.
socket.StartReaping(m_poller);
++m_sockets;
}
SocketBase回收
- 將當前
Socket
的加入到回收線程的中,當Socket
接收到數據時,由回收線程回調該Socket的處理事件進行處理。 - 當前Socket終止處理
- 最后確認釋放
internal void StartReaping([NotNull] Poller poller)
{
m_poller = poller;
m_handle = m_mailbox.Handle;
m_poller.AddHandle(m_handle, this);
m_poller.SetPollIn(m_handle);
Terminate();
CheckDestroy();
}
終止處理
- 終止
Socket
時,直接終止即可
默認情況下
NetMQ
的Linger
值被設置為-1,就是說如果網絡讀寫沒有進行完是不能退出的。如果Linger
被設置為0,那么中斷時會丟棄一切未完成的網絡操作。如果Linger
被設置的大於0,那么將等待Linger
毫秒用來完成未完成的網絡讀寫,在指定的時間里完成或者超時都會立即返回。
- 若終止的是
Session
,則需要發送請求清理關聯Socket的當前Session
對象
protected void Terminate()
{
...
if (m_owner == null)
{
// 釋放的是Socket,Owner為空
ProcessTerm(m_options.Linger);
}
else
{
// 釋放的是Session則會關聯一個Socket
SendTermReq(m_owner, this);
}
}
終止SocketBase
- 終止
SocketBase
時,需要先中斷當前SocketBase
關聯的SessionBase
- 然后增加需要終端請求響應的個數,當全部都響應了則處理第四步驟
- 清空當前關聯的
Session
集合 - 最后當
Session
全部終止后發送給當前Socket
宿主終端響應(TermAck)
protected override void ProcessTerm(int linger)
{
...
// 斷開所有session的連接
foreach (Own it in m_owned)
{
SendTerm(it, linger);
}
RegisterTermAcks(m_owned.Count);
m_owned.Clear();
CheckTermAcks();
}
終止當前Socket關聯的Session
- 如果終端管道命令在終止命令前處理了,則立即終止當前
Session
- 標記當前准備終止
- 若
Ligner
大於0 則等到N毫秒后再終止終止Socket
和Session
之間的管道 - 檢查管道是否還有數據要讀取
protected override void ProcessTerm(int linger)
{
if (m_pipe == null)
{
ProceedWithTerm();
return;
}
m_pending = true;
if (linger > 0)
{
Debug.Assert(!m_hasLingerTimer);
m_ioObject.AddTimer(linger, LingerTimerId);
m_hasLingerTimer = true;
}
// 是否需要等待一定時間后消息處理完再終止管道.
m_pipe.Terminate(linger != 0);
// TODO: Should this go into pipe_t::terminate ?
// In case there's no engine and there's only delimiter in the
// pipe it wouldn't be ever read. Thus we check for it explicitly.
m_pipe.CheckRead();
}
終止管道
管道狀態如下所示
private enum State
{
/// <summary> Active 表示在中斷命令開始前的狀態 </summary>
Active,
/// <summary> Delimited 表示在終端命令接收前從管道接收到分隔符</summary>
Delimited,
/// <summary> Pending 表示中斷命令已經從管道接收,但是仍有待定消息可讀</summary>
Pending,
/// <summary> Terminating 表示所有待定消息都已經讀取等待管道終止確認信號返回 </summary>
Terminating,
/// <summary> Terminated 表示終止命令是由用戶顯示調用 </summary>
Terminated,
/// <summary> Double_terminated 表示用戶調用了終止命令同時管道也調用了終止命令 </summary>
DoubleTerminated
}
- 終止當前管道
若當前狀態為Terminated
、DoubleTerminated
和Terminating
不再處理終止命令
public void Terminate(bool delay)
{
//判斷當前狀態是否可處理終止命令
...
if (m_state == State.Active)
{
// 向另一個管道發送終止命令然后等待確認終止
SendPipeTerm(m_peer);
m_state = State.Terminated;
}
else if (m_state == State.Pending && !m_delay)
{
// 若有待處理數據,但是不等待直接終止,則向另一個管道發送確認終止.
m_outboundPipe = null;
SendPipeTermAck(m_peer);
m_state = State.Terminating;
}
else if (m_state == State.Pending)
{
//若有待處理數據但是需要等到則不處理.
}
else if (m_state == State.Delimited)
{
//若已經獲取到限定符但是還沒有收到終止命令則忽略定界符,然后發送終止命令給另一個管道
SendPipeTerm(m_peer);
m_state = State.Terminated;
}
else
{
// 沒有其他狀態
Debug.Assert(false);
}
//停止向外發送的消息
m_outActive = false;
if (m_outboundPipe != null)
{
//拋棄未發送出的消息.
Rollback();
// 這里不會再先查水位,所以即使管道滿了也可再寫入,向管道寫入定界符 .
var msg = new Msg();
msg.InitDelimiter();
m_outboundPipe.Write(ref msg, false);
Flush();
}
}
- 終止另一個管道
protected override void ProcessPipeTerm()
{
// 這是一個簡單的例子有道管道終止
//若沒有更多待處理消息需要讀取,或者這個管道已經丟去待處理數據,我們直接將狀態設置為正在終止(terminating),否則我們擱置待處理狀態直到所有待處理消息被發送
if (m_state == State.Active)
{
if (!m_delay)
{
//不需要等到消息處理
m_state = State.Terminating;
m_outboundPipe = null;
//發送終止確認
SendPipeTermAck(m_peer);
}
else
m_state = State.Pending;
return;
}
// 若定界符碰巧在終止命令之前到達,將狀態改為正在終止
if (m_state == State.Delimited)
{
m_state = State.Terminating;
m_outboundPipe = null;
SendPipeTermAck(m_peer);
return;
}
// 當管道並發關閉,則狀態改為DoubleTerminated
if (m_state == State.Terminated)
{
m_state = State.DoubleTerminated;
m_outboundPipe = null;
SendPipeTermAck(m_peer);
return;
}
// pipe_term is invalid in other states.
Debug.Assert(false);
}
- 確認終止
protected override void ProcessPipeTermAck()
{
// 通知Socket或Session中斷當前管道 .
Debug.Assert(m_sink != null);
m_sink.Terminated(this);
// 若正則處理或double_terminated這里不做任何事
// 簡化釋放管道,在已終止狀態,我們必須在釋放這個管道之前確認
//其他狀態都是非法的
if (m_state == State.Terminated)
{
m_outboundPipe = null;
SendPipeTermAck(m_peer);
}
else
Debug.Assert(m_state == State.Terminating || m_state == State.DoubleTerminated);
// 刪除所有管道中的未讀消息,然后釋放流入管道
var msg = new Msg();
while (m_inboundPipe.TryRead(out msg))
{
msg.Close();
}
m_inboundPipe = null;
}
整體回收Socket
流程圖如下:
public virtual void InEvent()
{
// 回收線程命令會調用此事件
try
{
ProcessCommands(0, false);
}
catch
{
// ignored
}
finally
{
CheckDestroy();
}
}
private void CheckDestroy()
{
// socket釋放完則做最后的清除和釋放工作.
if (m_destroyed)
{
// 從回收線程移除輪詢
m_poller.RemoveHandle(m_handle);
// 釋放socke.
DestroySocket(this);
// 通知已釋放.
SendReaped();
// Deallocate.
base.ProcessDestroy();
}
}
總結
該篇介紹命令處理方式和回收線程回收Socket
,順便介紹了下創建SocketBase
的細節性問題。以便對釋放Socket
有更清晰的認識。
本文地址:https://www.cnblogs.com/Jack-Blog/p/6774902.html
作者博客:傑哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接