前言
介紹
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標准socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。
目的
對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計划如下:
- 消息隊列NetMQ 原理分析1-Context和ZObject
- 消息隊列NetMQ 原理分析2-IO線程和完成端口
- 消息隊列NetMQ 原理分析3-命令產生/處理、創建Socket和回收線程
- 消息隊列NetMQ 原理分析4-Socket、Session、Option和Pipe
- 消息隊列NetMQ 原理分析5-Engine,Encord和Decord
- 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
- 消息隊列NetMQ 原理分析7-Device
- 消息隊列NetMQ 原理分析8-不同類型的Socket
- 消息隊列NetMQ 原理分析9-實戰
友情提示: 看本系列文章時最好獲取源碼,更有助於理解。
Socket
上一章最后我們簡單介紹了SocketBase
和SessionBase
的創建和回收,這一張我們詳細介紹SocketBase
和SessionBase
。
首先SocketBase
繼承自Own
,即也是ZObject
對象,同時由於SocketBase
需要進行消息的傳輸,因此它實現了一些結構,包括IPollEvents
、Pipe.IPipeEvents
。
接口實現
internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{
...
}
IPollEvents
事件上一章回收線程已經介紹過,這里不再做過多說明了,簡單講SocketBase
實現該事件只有在回收線程回收Socket
的時候會觸發。Pipe.IPipeEvents
:是管道事件,它的簽名如下
public interface IPipeEvents
{
void ReadActivated([NotNull] Pipe pipe);
void WriteActivated([NotNull] Pipe pipe);
void Hiccuped([NotNull] Pipe pipe);
void Terminated([NotNull] Pipe pipe);
}
ReadActivated
:表示管道可讀,管道實際調用SocketBase
或SessionBase
的ReadActivated
方法,而SocketBase
實際會調用XReadActivated
方法。WriteActivated
:表示管道可寫,管道實際調用SocketBase
或SessionBase
的WriteActivated
方法,而SocketBase
實際會調用XWriteActivated
方法。Hiccuped
:當連接突然中斷時會調用此方法。WriteActivated
:表示管道終止。
內部結構
SocketBase
的內部維護着一個字段,用於存放連接/綁定地址和它的管道(若當前SocketBase
是TCPListener
,則無需初始化管道,管道為空)。
private readonly Dictionary<string, Endpoint> m_endpoints = new Dictionary<string, Endpoint>();
private readonly Dictionary<string, Pipe> m_inprocs = new Dictionary<string, Pipe>();
Endpoint
對象用於存放SessionBase
和Pipe
或Listener
的引用
private class Endpoint
{
public Endpoint(Own own, Pipe pipe)
{
Own = own;
Pipe = pipe;
}
public Own Own { get; }
public Pipe Pipe { get; }
}
當SocketBase
連接或綁定最后會向將Endpoint
保存到字典中
private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe)
{
LaunchChild(endpoint);
m_endpoints[address] = new Endpoint(endpoint, pipe);
}
在SocketBase
斷開連接時會移除它
public void TermEndpoint([NotNull] string addr)
{
...
if (protocol == Address.InProcProtocol)
{
...
m_inprocs.Remove(addr);
}
else
{
...
m_endpoints.Remove(addr);
}
}
m_inprocs
也是一個字典用於存放inproc
協議的連接。
在第一章創建SocketBase我們介紹了Context
創建SocketBase
所做的一些工作,初始化SocketBase
時,會創建MailBox,用於傳輸Command
。
protected SocketBase([NotNull] Ctx parent, int threadId, int socketId)
: base(parent, threadId)
{
m_options.SocketId = socketId;
m_mailbox = new Mailbox("socket-" + socketId);
}
每個
SocketBase
的命令處理實際都是在工作線程中進行。因此理論上(忽略線程上下文切換時造成的性能損失)線程數越多,NetMQ
的IO吞吐量和工作線程數成正比關系。
在Context
創建SocketBase
會根據Create
靜態方法根據不同類型創建不同的SocketBase
public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{
switch (type)
{
case ZmqSocketType.Pair:
return new Pair(parent, threadId, socketId);
case ZmqSocketType.Pub:
return new Pub(parent, threadId, socketId);
case ZmqSocketType.Sub:
return new Sub(parent, threadId, socketId);
case ZmqSocketType.Req:
return new Req(parent, threadId, socketId);
case ZmqSocketType.Rep:
return new Rep(parent, threadId, socketId);
case ZmqSocketType.Dealer:
return new Dealer(parent, threadId, socketId);
case ZmqSocketType.Router:
return new Router(parent, threadId, socketId);
case ZmqSocketType.Pull:
return new Pull(parent, threadId, socketId);
case ZmqSocketType.Push:
return new Push(parent, threadId, socketId);
case ZmqSocketType.Xpub:
return new XPub(parent, threadId, socketId);
case ZmqSocketType.Xsub:
return new XSub(parent, threadId, socketId);
case ZmqSocketType.Stream:
return new Stream(parent, threadId, socketId);
default:
throw new InvalidException("SocketBase.Create called with invalid type of " + type);
}
}
具體創建SocketBase
的工作在上一章已經做了詳細的介紹,這里不再復述。
Session
首先和SocketBase
一樣,SessionBase
也繼承自Own
,即也是ZObject
對象,同時由於SessionBase
和SocketBase
存在消息傳輸,所以它也實現了IPipeEvents
接口,同時它實現了IProactorEvents
接口,在消息收發是會接收到通知。SessionBase
一端和SocketBase
進行消息的通訊,另一端和Engine
存在消息通訊,它實現了IMsgSink
和IMsgSource
接口和Engine
進行消息傳輸。
internal class SessionBase : Own,
Pipe.IPipeEvents, IProactorEvents,
IMsgSink, IMsgSource{
}
internal interface IMsgSink
{
/// <summary>
/// 傳輸消息.成功時返回true.
/// </summary>
/// <param name="msg">將msg消息寫入到管道中</param>
bool PushMsg(ref Msg msg);
}
internal interface IMsgSource
{
/// <summary>
/// 取一個消息。成功時返回,從管道獲取消息寫入msg參數中;若失敗則返回false,將null寫入到msg參數中。
/// </summary>
/// <param name="msg">從管道獲取消息寫入Msg中</param>
/// <returns>true if successful - and writes the message to the msg argument</returns>
bool PullMsg(ref Msg msg);
}
當
SocketBase
將消息寫入到寫管道時,對應的SessionBase
會從讀管道讀到SocketBase
寫入的數據,然后將數據從管道取出生成一個Msg
,Engine
會和AsyncSocket
交互傳輸數據,關於Engine
下一章再做介紹。
Option
option
參數如下
- Affinity
表示哪個線程是可用的,默認為0,表示所有線程在負載均衡都可使用。 - Backlog
最大Socket
待連接數 - DelayAttachOnConnect
在創建連接時,延遲在Socket
和Session
之間創建雙向的管道,默認創建連接時立即創建管道 - DelayOnClose
若為true
,則在Socket
關閉時Session
先從管道接收所有消息發送出去。
否則直接關閉,默認為true
。 - DelayOnDisconnect
若為true
,則在Pipe
通知我們中斷時Socket
先將接收所有入隊管道消息。
否則直接中斷管道。默認為true
. - Endianness
字節序,數據在內存中是高到低排還是低到高排。 - Identity
響應的Identity
,每個Identity
用於查找Socket
。Identiy
是一個重復的隨機32位整形數字,轉換為字節5位字節數組。每個消息的第一部分是Identity
, - IdentitySize
1個字節用於保存Identity的長度。 - IPv4Only
- Linger
當Socket關閉時,是否延遲一段時間等待數據發送完畢后再關閉管道 - MaxMessageSize
每個消息包最大消息大小 - RawSocket
若設置為true,RouterSocket
可以接收非NetMQ
發送來的tcp
連接。
默認是false,Stream
在構造函數時會設置為true
,設置為true
時會將RecvIdentity
修改為false
(用NetMQ
接收其他系統發送來的Socket
請求應該用StreamSocekt
,否則由於應用層協議不一樣可能會導致一些問題。) - RecvIdentity
若為true,Identity
轉發給Socket
。 - ReconnectIvl
設置最小重連時間間隔,單位ms。默認100ms - ReconnectIvlMax
設置最大重連時間間隔,單位ms。默認0(無用) - RecoveryIvl
PgmSocket
用的 - SendBuffer
發送緩存大小,設置底層傳輸Socket
的發送緩存大小,初始為0 - ReceiveBuffer
接收緩存大小,設置底層傳輸Socket
的接收緩存大小,初始為0 - SendHighWatermark
Socket
發送的管道的最大消息數,當發送水位達到最大時會阻塞發送。 - ReceiveHighWatermark
Socket
接收管道的最大消息數 - SendLowWatermark
Socket
發送低水位,消息的最小數量單位,每次達到多少消息數量才向Session管道才激活寫事件。默認1000 - ReceiveLowWatermark
Socket
接收低水位,消息的最小數量單位,每次達到多少消息數量Session
管道才激活讀事件。默認1000 - SendTimeout
Socket
發送操作超時時間 - TcpKeepalive
TCP保持連接設置,默認-1不修改配置 - TcpKeepaliveIdle
TCP心跳包在空閑時的時間間隔,默認-1不修改配置 - TcpKeepaliveIntvl
TCP心跳包時間間隔,默認-1不修改配置 - DisableTimeWait
客戶端斷開連接時禁用TIME_WAIT
TCP狀態
Pipe
在上一章我們講到過在SocketBase
和SessionBase
是通過2條單向管道進行消息傳輸,傳輸的消息單位是Msg
,消息管道是YPipe<Msg>
類型,那么YPipe<>
又是什么呢?
YPipe
Ypipe
內部實際維護這一個YQueue
類型的先進先出隊列,YPipe
向外暴露了一下方法:
- TryRead
該方法用於判斷當前隊列是否可讀,可讀的話第一個對象出隊
public bool TryRead(out T value)
{
if (!CheckRead())
{
value = default(T);
return false;
}
value = m_queue.Pop();
return true;
}
- Unwrite
取消寫入消息
public bool Unwrite(ref T value)
{
if (m_flushToIndex == m_queue.BackPos)
return false;
value = m_queue.Unpush();
return true;
}
- 寫入消息
將消息寫入到隊列中,若寫入未完成則當前消息的指針索引指向當前隊列塊的后一位。
public void Write(ref T value, bool incomplete)
{
m_queue.Push(ref value);
// Move the "flush up to here" pointer.
if (!incomplete)
{
m_flushToIndex = m_queue.BackPos;
}
}
- 完成寫入
當該部分消息寫完時,則會調用Flush完成寫入並通知另一個管道消息可讀
public void Flush()
{
if (m_state == State.Terminating)
return;
if (m_outboundPipe != null && !m_outboundPipe.Flush())
SendActivateRead(m_peer);
}
Msg
寫入的消息單位是Msg
,它實現了多條數據的存儲,當每次數據寫完還有數據帶寫入時通過將Flag標記為More
表示消息還沒寫入完。
YQueue
YQueue
是由一個個trunk
組成的,每個trunk
就是一個消息塊,每個消息塊可能包含多個Msg
,主要由寫入消息時是否還有更多消息帶寫入(Flag
)決定。trunk
是一個雙向循環鏈表,內部維護着一個數組用於存放數據,每個數據會有2個指針,分別指向前一個塊和后一個塊,每個塊還有一個索引,表示當前塊在隊列中的位置。
private sealed class Chunk
{
public Chunk(int size, int globalIndex)
{
Values = new T[size];
GlobalOffset = globalIndex;
Debug.Assert(Values != null);
}
/// <summary>數據</summary>
public T[] Values { get; }
/// <summary>當前塊在隊列中的位置</summary>
public int GlobalOffset { get; }
/// <summary>前一個塊</summary>
[CanBeNull]
public Chunk Previous { get; set; }
/// <summary>下一個塊</summary>
[CanBeNull]
public Chunk Next { get; set; }
}
每個chunk
默認最多可保存256個部分。
由於每次向SocketBase
寫入的Msg
可能有多個部分,因此消息會寫入到數組中,所有消息寫完后指向trunk
的指針才會后移一位。
YQueue
有以下字段
//用於記錄當前塊消息的個數,默認為256
private readonly int m_chunkSize;
// 當隊列是空的時,下一個塊指向null,首尾塊都指向初始化的一個塊,開始位置的塊僅用於隊列的讀取(front/pop),最后位置的僅用於隊列的寫入(back/push)。
// 開始位置
private volatile Chunk m_beginChunk;
//chunk的當前可讀位置索引
private int m_beginPositionInChunk;
//指向后一個塊
private Chunk m_backChunk;
//chunk的最后一個可讀位置索引
private int m_backPositionInChunk;
//指向后一個塊
private Chunk m_endChunk;
//chunk的下一個可寫位置索引
private int m_endPosition;
//當達到最大Msg數量時,擴展一個chunk,最大為256個塊
private Chunk m_spareChunk;
當前trunk頭部在整個隊列中的的索引位置
private int m_nextGlobalIndex;
YPipe
寫入Msg
實際是向YQueue
入隊
public void Push(ref T val)
{
m_backChunk.Values[m_backPositionInChunk] = val;
//指向后一個塊
m_backChunk = m_endChunk;
//索引更新到最后可讀位置
m_backPositionInChunk = m_endPosition;
//下一個可寫位置向后移動一位
m_endPosition++;
if (m_endPosition != m_chunkSize)
return;
//到達最后一個位置則需要擴充一個塊
Chunk sc = m_spareChunk;
if (sc != m_beginChunk)
{
//已經擴充了塊則更新下一個塊的位置
m_spareChunk = m_spareChunk.Next;
m_endChunk.Next = sc;
sc.Previous = m_endChunk;
}
else
{
//新建一個塊,並更新索引位置
m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex);
m_nextGlobalIndex += m_chunkSize;
m_endChunk.Next.Previous = m_endChunk;
}
m_endChunk = m_endChunk.Next;
當前塊的局部位置從0開始
m_endPosition = 0;
}
每次消息寫完消息時調用YPipe
的Flush
方法完成當前消息的寫入
public bool Flush()
{
//只有一條Msg
if (m_flushFromIndex == m_flushToIndex)
{
return true;
}
//將m_lastAllowedToReadIndex更新為flushToIndex
if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex)
{
//沒有數據寫入時,lastAllowedToReadIndex為-1,表示沒有數據可讀,因此這里不需要關系線程安全
Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex);
m_flushFromIndex = m_flushToIndex;
return false;
}
有數據寫入時更新指針
m_flushFromIndex = m_flushToIndex;
return true;
}
總結
該篇在上一片的基礎上對SocketBase
和SessionBase
進行了一些細節上的補充。同時,對NetMQ
的配置參數進行了一些介紹,最后對消息管道進行了簡單講解。
微信掃一掃二維碼關注訂閱號傑哥技術分享
本文地址:https://www.cnblogs.com/Jack-Blog/p/7117798.html
作者博客:傑哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接