Zeromq的資源:
Zeromq模式:
http://blog.codingnow.com/2011/02/zeromq_message_patterns.html
zeromq主頁:
Zeromq Guild:
http://zguide.zeromq.org/page:all#Fixing-the-World
Zeromq 中文簡介:
http://blog.csdn.net/program_think/article/details/6687076
Zero wiki:
http://en.wikipedia.org/wiki/%C3%98MQ
zeromq系列:
http://iyuan.iteye.com/blog/972949
Zeromq資源閱讀:
ØMQ(Zeromq) 是一個更為高效的傳輸層
優勢是:
1 程序接口庫是一個並發框架
2 在集群和超級計算機上表現得比TCP更快
3 通過inproc, IPC, TCP, 和 multicast進行傳播消息
4 通過發散,訂閱,流水線,請求的方式連接
5 對於不定規模的多核消息傳輸應用使用異步IO
6 有非常大並且活躍的開源社區
7 支持30+的語言
8 支持多種系統
Zeromq定義為“史上最快的消息隊列”
從網絡通信的角度看,它處於會話層之上,應用層之下。
ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.
Zeromq中傳遞的數據格式是由用戶自己負責,就是說如果server發送的string是有帶"\0"的,那么client就必須要知道有這個
Pub_Sub模式。
the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.
在這種模式下很可能發布者剛啟動時發布的數據出現丟失,原因是用zmq發送速度太快,在訂閱者尚未與發布者建立聯系時,已經開始了數據發布(內部局域網沒這么誇張的)。官網給了兩個解決方案;1,發布者sleep一會再發送數據(這個被標注成愚蠢的);2,使用proxy。
Zeromq示例:
1 獲取例子
git clone --depth=1 git://github.com/imatix/zguide.git
2 服務器端:
(當服務器收到消息的時候,服務器回復“World”)
<?php
/*
* Hello World server
* Binds REP socket to tcp://*:5555
* Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext(1);
// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while(true) {
// Wait for next request from client
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);
// Do some 'work'
sleep (1);
// Send reply back to client
$responder->send("World");
}
3 客戶端:
(客戶端發送消息)
<?php
/*
* Hello World client
* Connects REQ socket to tcp://localhost:5555
* Sends "Hello" to server, expects "World" back
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
// Socket to talk to server
echo "Connecting to hello world server…\n";
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$requester->connect("tcp://localhost:5555");
for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
printf ("Sending request %d…\n", $request_nbr);
$requester->send("Hello");
$reply = $requester->recv();
printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
}
天氣氣候訂閱系統:(pub-sub)
1 server端:
<?php
/*
* Weather update server
* Binds PUB socket to tcp://*:5556
* Publishes random weather updates
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
// Prepare our context and publisher
$context = new ZMQContext();
$publisher = $context->getSocket(ZMQ::SOCKET_PUB);
$publisher->bind("tcp://*:5556");
$publisher->bind("ipc://weather.ipc");
while (true) {
// Get values that will fool the boss
$zipcode = mt_rand(0, 100000);
$temperature = mt_rand(-80, 135);
$relhumidity = mt_rand(10, 60);
// Send message to all subscribers
$update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity);
$publisher->send($update);
}
2 client端:
<?php
/*
* Weather update client
* Connects SUB socket to tcp://localhost:5556
* Collects weather updates and finds avg temp in zipcode
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
// Socket to talk to server
echo "Collecting updates from weather server…", PHP_EOL;
$subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$subscriber->connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
$filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001";
$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
// Process 100 updates
$total_temp = 0;
for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) {
$string = $subscriber->recv();
sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity);
$total_temp += $temperature;
}
printf ("Average temperature for zipcode '%s' was %dF\n",
$filter, (int) ($total_temp / $update_nbr));
------------------------
pub-sub的proxy模式:
圖示是:
Proxy節點的代碼:
<?php
/*
* Weather proxy device
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext();
// This is where the weather server sits
$frontend = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$frontend->connect("tcp://192.168.55.210:5556");
// This is our public endpoint for subscribers
$backend = new ZMQSocket($context, ZMQ::SOCKET_PUB);
$backend->bind("tcp://10.1.1.0:8100");
// Subscribe on everything
$frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
// Shunt messages out to our own subscribers
while(true) {
while(true) {
// Process all parts of the message
$message = $frontend->recv();
$more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
$backend->send($message, $more ? ZMQ::SOCKOPT_SNDMORE : 0);
if(!$more) {
break; // Last message part
}
}
} 其實就是proxy同時是作為pub又作為sub的
----------------------
作者:yjf512(軒脈刃)
出處:http://www.cnblogs.com/yjf512/
本文版權歸yjf512和cnBlog共有,歡迎轉載,但未經作者同意必須保留此段聲明

![clip_image001_thumb[2] clip_image001_thumb[2]](/image/aHR0cHM6Ly9pbWFnZXMuY25ibG9ncy5jb20vY25ibG9nc19jb20veWpmNTEyLzIwMTIwMy8yMDEyMDMwMzA4NDIwMzU4NDEucG5n.png)