使用windows消息隊列MessageQueue


Config中appSettings配置:

<--本地消息隊列時 value=".\PRIVATE$\MgrApiRequest"/>-->
<add key="RequestQueueName" value="FormatName:Direct=TCP:192.168.100.102\PRIVATE$\MgrApiRequest" />

 

消息隊列連接,將order類資料加入消息隊列中:

MessageQueue messageOrderQueue = new MessageQueue(ConfigurationManager.AppSettings["RequestQueueName"]);
messageOrderQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

using (Message message = new Message())
{
message.Body = JsonConvert.SerializeObject(order);
message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
messageOrderQueue.Send(message);

}

 

監控消息隊列,獲取消息隊列中的數據進行處理:

private CancellationTokenSource cts = new CancellationTokenSource();

internal void Init()

{

string CommissionQueueName = ConfigurationManager.AppSettings["RequestQueueName"];

if (!MessageQueue.Exists(CommissionQueueName))
{
MessageQueue.Create(CommissionQueueName);
}
messageOrderQueue = new MessageQueue(CommissionQueueName);

messageOrderQueue.SetPermissions("Everyone", MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow);
messageOrderQueue.SetPermissions("ANONYMOUS LOGON", MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow);

messageOrderQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });//new XmlMessageFormatter();

messageOrderQueue.MessageReadPropertyFilter = new MessagePropertyFilter() { Id = true, CorrelationId = true, Body = true, Label = true };

messageOrderQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(RequestReceived);
messageOrderQueue.BeginReceive();

Task.Factory.StartNew(() => HandleRequestThread(), TaskCreationOptions.LongRunning);

}

 

/// <summary>
/// 從消息隊列接收消息,加入待處理隊列RequestQueue中
/// </summary>
/// <param name="source"></param>
/// <param name="args"></param>
private void RequestReceived(object source, ReceiveCompletedEventArgs args)
{
if (!cts.IsCancellationRequested)
{
MessageQueue q = (MessageQueue)source;
//once a message is received, stop receiving
using (var msMessage = q.EndReceive(args.AsyncResult))
{
try
{
//msMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
//do something with the message
string json = msMessage.Body as string;

RequestQueue.Add(json);
}
catch (Exception ex)
{
Logger.LogError(ex.ToString());
}
}

//begin receiving again
q.BeginReceive();
}
}

 

private async void HandleRequestThread()
{
while (!cts.IsCancellationRequested)
{
try
{
var json = RequestQueue.Take(cts.Token);
Logger.LogInfo($"msgContent:{json}");

MT4Order tradeOrder = JsonConvert.DeserializeObject<MT4Order>(json);
......

}
catch (OperationCanceledException)
{
Logger.LogError("HandleRequestThread is shutting down");
}
catch (Exception ex)
{
Logger.LogError(ex.ToString());
}
}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM