周末繼續寫博客,算起來,關於rabbitMQ這個中間件的研究已經持續至兩個星期了,上一篇文章使用sring amqp實現了同步和異步的消息接收功能。這一節繼續實用spring amqp實現一個股票交易系統的主要邏輯。這個例子更為復雜也更具代表意義,因為它是現實世界中的例子。
stock trading這個例子包含一個服務端(server),它用於向指定的Topic Exchange發送股票數據。還要很多客戶端(clients),它們從特定的Queue(綁定了routing patten的Queue)訂閱消息。這個例子的另外一個特性是它實現了一個“請求--響應”的股票交易交互動作,即由客戶端發起請求,而由服務端進行處理。
也就是說,這個demo包含了兩條線索,第一:服務端發送股票行情數據,客戶端根據自己的喜好接收特定的行情數據;第二:客戶端看到喜歡的股票,申請交易,服務端處理,然后將結果返回客戶端。
一.領域對象
在正式開始講解程序的主要邏輯之前,先看看該demo所涉及的領域對象。為節省篇幅,只列出主要的成員,get,set方法就不列出了。
--Quote:股票報價信息
public class Quote { private Stock stock; private String price; private long timestamp; }
--Stock:股票信息
public class Stock { private String ticker; private StockExchange stockExchange; }
其中的StockExchange為一個枚舉類型,代表的是股票交易所,它的定義如下。
--StockExchange:證券交易所
/** * Enumeration for Stock Exchanges. * * @author Mark Fisher */ public enum StockExchange { nyse, nasdaq; }
--TradeRequest:交易請求實體
public class TradeRequest { private String ticker; private long quantity; private BigDecimal price; private String orderType; private String accountName; private boolean buyRequest; private String userName; private String requestId; private String id = UUID.randomUUID().toString(); }
--TradeResponse:交易響應實體
/** * Simple trade request 'data' object. No functionality in this 'domain' class. * @author Mark Pollack * */ public class TradeResponse { private String ticker; private long quantity; private BigDecimal price; private String orderType; private String confirmationNumber; private boolean error; private String errorMessage; private String accountName; private long timestamp = new Date().getTime(); private String requestId; }
二.服務端發送股票報價信息,客戶端接收特定信息
這部分的邏輯可以用以下圖示來表示。
服務端向名為app.stock.marketdata的TopicExchange里面發布股票行情報價消息,並且每次發布消息以app.stock.quotes.[stockExchange].[ticker]作為routekey,其中后面兩個為可變參數,即不同的股票,因為他們stockExchange和ticker不一樣,所以發送該股票行情信息的RouteKey也不一樣。這為消費者按指定類別接收股票提供了基礎。
客戶端的消費者可以按照需要訂閱自己感興趣的股票,只需要指定與Queue綁定的key,如app.stock.uotes.nasdaq.*表示接收所有納斯達克交易所的股票行情。這一部分的原理在講述topic的Exchange的時候就有解釋,此處不再贅述。感興趣的讀者可以點這里。五.RabbitMQ之路由(Routing)和主題(topics)
首先來看主要的配置文件,該配置文件為服務端和客戶端共享。需要注意的是,在獲取RabbitTemplate的bean中,調用了抽象方法configureRabbitTemplate,該方法會在其子類中實現。
package org.springframework.amqp.rabbit.stocks.config; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Provides shared configuration between Client and Server. (客戶端和服務端共用的) * <p>The abstract method configureRabbitTemplate lets the Client and Server further customize * the rabbit template to their specific needs. * * @author Mark Pollack * @author Mark Fisher */ @Configuration public abstract class AbstractStockAppRabbitConfiguration { /** * Shared topic exchange used for publishing any market data (e.g. stock quotes) */ protected static String MARKET_DATA_EXCHANGE_NAME = "app.stock.marketdata";//topic exchange的名稱 /** * The server-side consumer's queue that provides point-to-point semantics for stock requests. */ protected static String STOCK_REQUEST_QUEUE_NAME = "app.stock.request"; /** * Key that clients will use to send to the stock request queue via the default direct exchange. */ protected static String STOCK_REQUEST_ROUTING_KEY = STOCK_REQUEST_QUEUE_NAME; @Value("${amqp.port:5672}") private int port = 5672; protected abstract void configureRabbitTemplate(RabbitTemplate template); @Bean public ConnectionFactory connectionFactory() { //TODO make it possible to customize in subclasses. CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.1.195"); connectionFactory.setUsername("xdx"); connectionFactory.setPassword("xxxx"); connectionFactory.setPort(port); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMessageConverter(jsonMessageConverter()); configureRabbitTemplate(template); return template; } @Bean public MessageConverter jsonMessageConverter() { return new JsonMessageConverter(); } @Bean public TopicExchange marketDataExchange() { return new TopicExchange(MARKET_DATA_EXCHANGE_NAME); } /** * @return the admin bean that can declare queues etc. */ @Bean public AmqpAdmin amqpAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); return rabbitAdmin ; } }
接下來就是服務端的配置文件,它繼承自AbstractStockAppRabbitConfiguration並且實現了configureRabbitTemplate方法。指定server端的TopicExchange的名稱。同時建立一個Queue,Exchange為匿名,Queue名稱為app.stock.request,默認綁定的routekey為app.stock.request,該Queue用於接收來自客戶端的交易請求消息。(現在我們暫且不管接收消息部分,專注於發送消息部分)
package org.springframework.amqp.rabbit.stocks.config.server; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.stocks.config.AbstractStockAppRabbitConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Configures RabbitTemplate for the server. * * @author Mark Pollack * @author Mark Fisher */ @Configuration public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration { /** * The server's template will by default send to the topic exchange named * {@link AbstractStockAppRabbitConfiguration#MARKET_DATA_EXCHANGE_NAME}. * 服務端繼承自AbstractStockAppRabbitConfiguration * ,重寫了父類方法configureRabbitTemplate,將topic * Exchange的名稱定為MARKET_DATA_EXCHANGE_NAME,這樣發送消息的時候就不必每次指定Exchange這個參數 */ public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) { rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME); } /** * We don't need to define any binding for the stock request queue, since * it's relying on the default (no-name) direct exchange to which every * queue is implicitly bound. * 生成一個綁定了默認(無名稱)的DirectExchange的Queue實例,名稱為該app.stock.request,這個隊列是服務為股票交易開設的(股票請求隊列)。 * 服務端監聽這個隊列里面的消息(即交易請求,來自客戶端),並做處理 * 名稱為:app.stock.request */ @Bean public Queue stockRequestQueue() { return new Queue(STOCK_REQUEST_QUEUE_NAME); } }
接下來就是編寫發送消息的代碼。如RabbitMarketDataGateway類所示,該類模擬發送股票行情數據(隨機生成一個行情價)的功能。可以看到它將消息發送到了topic Exchange名為app.stock.marketdata,而routekey為app.stock.quotes.nyse.AAPL或者app.stock.quotes.nasdaq.IBM這樣的通道中。
package org.springframework.amqp.rabbit.stocks.gateway; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.rabbit.core.RabbitGatewaySupport; import org.springframework.amqp.rabbit.stocks.domain.Quote; import org.springframework.amqp.rabbit.stocks.domain.Stock; import org.springframework.amqp.rabbit.stocks.domain.StockExchange; /** * Rabbit implementation of the {@link MarketDataGateway} for sending Market * data. * * @author Mark Pollack * @author Mark Fisher */ public class RabbitMarketDataGateway extends RabbitGatewaySupport implements MarketDataGateway { private static Log logger = LogFactory .getLog(RabbitMarketDataGateway.class); private static final Random random = new Random(); private final List<MockStock> stocks = new ArrayList<MockStock>(); public RabbitMarketDataGateway() { this.stocks.add(new MockStock("AAPL", StockExchange.nasdaq, 255)); this.stocks.add(new MockStock("CSCO", StockExchange.nasdaq, 22)); this.stocks.add(new MockStock("DELL", StockExchange.nasdaq, 15)); this.stocks.add(new MockStock("GOOG", StockExchange.nasdaq, 500)); this.stocks.add(new MockStock("INTC", StockExchange.nasdaq, 22)); this.stocks.add(new MockStock("MSFT", StockExchange.nasdaq, 29)); this.stocks.add(new MockStock("ORCL", StockExchange.nasdaq, 24)); this.stocks.add(new MockStock("CAJ", StockExchange.nyse, 43)); this.stocks.add(new MockStock("F", StockExchange.nyse, 12)); this.stocks.add(new MockStock("GE", StockExchange.nyse, 18)); this.stocks.add(new MockStock("HMC", StockExchange.nyse, 32)); this.stocks.add(new MockStock("HPQ", StockExchange.nyse, 48)); this.stocks.add(new MockStock("IBM", StockExchange.nyse, 130)); this.stocks.add(new MockStock("TM", StockExchange.nyse, 76)); } /** * 服務端發送消息,發送到的Exchange為app.stock.marketdata,routekey為app.stock.quotes.+ * stock.getStockExchange()+.stock.getTicker() * 比如app.stock.quotes.nyse.AAPL或者app.stock.quotes.nasdaq.IBM */ public void sendMarketData() { Quote quote = generateFakeQuote();// 將股票按照原來的basePrice進行包裝后得到一個新的報價信息 Stock stock = quote.getStock(); logger.info("Sending Market Data for " + stock.getTicker()); String routingKey = "app.stock.quotes." + stock.getStockExchange() + "." + stock.getTicker(); getRabbitTemplate().convertAndSend(routingKey, quote); } /** * 生成一條行情數據 * * @return */ private Quote generateFakeQuote() { MockStock stock = this.stocks.get(random.nextInt(this.stocks.size())); String price = stock.randomPrice(); return new Quote(stock, price); } /** * 對股票stock類進行封裝,給它一個基本當價格basePrice * * @author xdx * */ private static class MockStock extends Stock { private final int basePrice; private final DecimalFormat twoPlacesFormat = new DecimalFormat("0.00"); private MockStock(String ticker, StockExchange stockExchange, int basePrice) { super(stockExchange, ticker); this.basePrice = basePrice; } private String randomPrice() { return this.twoPlacesFormat.format(this.basePrice + Math.abs(random.nextGaussian())); } } }
接下來配置一下上述的bean,使其可以被spring調用。
<!-- 發送消息,注入了服務端的rabbitTemplate-->
<bean id="marketDataGateway"
class="org.springframework.amqp.rabbit.stocks.gateway.RabbitMarketDataGateway">
<property name="rabbitTemplate" ref="rabbitTemplate" />
</bean>
然后配置任務管理,讓服務端每隔一段時間調用marketDataGateway來發送股票行情。如下所示。
<task:scheduled-tasks> <task:scheduled ref="marketDataGateway" method="sendMarketData" fixed-delay="5000"/> </task:scheduled-tasks>
到此為止,我們就完成了發送一端的功能。接下來我們來完成接收消息這一端的功能。
同樣的,客戶端也有一個主配置文件,也是繼承自AbstractStockAppRabbitConfiguration,在該配置文件中定義了關於客戶端發送和接收消息的一些bean,我們還是先專注於接收消息這一端。
package org.springframework.amqp.rabbit.stocks.config.client; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AnonymousQueue; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.rabbit.stocks.config.AbstractStockAppRabbitConfiguration; import org.springframework.amqp.rabbit.stocks.gateway.RabbitStockServiceGateway; import org.springframework.amqp.rabbit.stocks.gateway.StockServiceGateway; import org.springframework.amqp.rabbit.stocks.handler.ClientHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Configures RabbitTemplate and creates the Trader queue and binding for the * client. 與服務端共用一個RabbitTemplate,但是配置略有不同 * * @author Mark Pollack * @author Mark Fisher */ @Configuration public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration { @Value("${stocks.quote.pattern}") // app.stock.quotes.nasdaq.* 接收消息的pattern private String marketDataRoutingKey; @Autowired private ClientHandler clientHandler; /** * The client's template will by default send to the exchange defined in * {@link org.springframework.amqp.rabbit.config.AbstractRabbitConfiguration#rabbitTemplate()} * with the routing key * {@link AbstractStockAppRabbitConfiguration#STOCK_REQUEST_QUEUE_NAME} * <p> * The default exchange will delivery to a queue whose name matches the * routing key value. * Exchange為default,即無名的Exchange,RoutingKey為app.stock.request,這是客戶端發送信息的配置 * 也就是說客戶端的信息將發送至匿名的Exchange,RoutingKey為app.stock.request的通道 */ @Override public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) { rabbitTemplate.setRoutingKey(STOCK_REQUEST_QUEUE_NAME);// 客戶端將信息發送到defaultExchange,RouteKey為app.stock.request } /** * 這個bean主要用於從客戶端向服務端發送交易請求 * * @return */ @Bean public StockServiceGateway stockServiceGateway() { RabbitStockServiceGateway gateway = new RabbitStockServiceGateway(); gateway.setRabbitTemplate(rabbitTemplate()); // 此處設置DefaultReplyTo為traderJoeQueue().getName(),它將作為一個回調的Queue,接收來自服務端的響應。 // 隱式的注入RabbitTemplate對象到RabbitStockServiceGateway中 // 這個bean在client-message.xml中也有配置 gateway.setDefaultReplyTo(traderJoeQueue().getName()); return gateway; } /** * 這個bean用於監聽服務端發過來的消息,包含兩類消息,一類是服務端發送的行情消息,另外一類是 服務端處理完客戶端的交易請求以后的響應消息 * * @return */ @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer( connectionFactory()); // 設置該監聽器監聽的為marketDataQueue()和traderJoeQueue(),其中marketDataQueue()綁定了app.stock.marketdata這個Exchange和 // app.stock.quotes.nasdaq.*這個routeKey。所以他可以監聽到服務端發過來的nasdaq交易所下的證券信息 // traderJoeQueue()是一個系統自動命名的Queue,當客戶端發送trade // request會用它作為確認(replyTo)的Queue,好讓服務端在處理完后發送確認信息到這個Queue // 所以此處我們也要監聽它。這個bean在client-message.xml中也有配置 container.setConcurrentConsumers(5); container.setQueues(marketDataQueue(), traderJoeQueue()); container.setMessageListener(messageListenerAdapter()); container.setAcknowledgeMode(AcknowledgeMode.AUTO); return container; // container(using(connectionFactory()).listenToQueues(marketDataQueue(), // traderJoeQueue()).withListener(messageListenerAdapter()). } /** * 這個bean為監聽適配器,主要的作用是監聽消息 * * @return */ @Bean public MessageListenerAdapter messageListenerAdapter() { // return new MessageListenerAdapter(clientHandler, jsonMessageConverter()); } // Broker Configuration // @PostContruct // public void declareClientBrokerConfiguration() { // declare(marketDataQueue); // declare(new Binding(marketDataQueue, MARKET_DATA_EXCHANGE, // marketDataRoutingKey)); // declare(traderJoeQueue); // // no need to bind traderJoeQueue as it is automatically bound to the // default direct exchanage, which is what we will use // // //add as many declare statements as needed like a script. // } /** * 這個bean是用於接收股票行情的Queue,這是一個匿名的Queue。 * * @return */ @Bean public Queue marketDataQueue() { return new AnonymousQueue(); } /** * Binds to the market data exchange. Interested in any stock quotes. */ /** * 將marketDataQueue與發送股票行情的topic Exchange關聯,並且以marketDataRoutingKey作為綁定 * 的key。這樣就可以接收特定的股票行情。 * @return */ @Bean public Binding marketDataBinding() { return BindingBuilder.bind(marketDataQueue()).to(marketDataExchange()) .with(marketDataRoutingKey); } /** * This queue does not need a binding, since it relies on the default * exchange. * 該bean用於接收服務端發送回來的響應消息。 */ @Bean public Queue traderJoeQueue() { return new AnonymousQueue(); } @Bean public AmqpAdmin rabbitAdmin() { return new RabbitAdmin(connectionFactory()); } }
按照上述配置文件,我們來探究客戶端是如何接收消息的。
客戶端采用的是異步接收消息的策略,記得我們在上一篇文章(hello world例子中)闡述過,異步接收策略需要用到一個監聽器,監聽特定的Queue,然后調用一個回調的接口的回調方法(一般是handleMessage方法)。此處我們也是按照這種方式來配置的。
首先,定義一個Queue,用於接收服務端發送的行情數據。
/** * 這個bean是用於接收股票行情的Queue,這是一個匿名的Queue。 * * @return */ @Bean public Queue marketDataQueue() { return new AnonymousQueue(); }
接着,給這個Queue綁定特定的TopicExchange和RouteKey。
/** * Binds to the market data exchange. Interested in any stock quotes. */ /** * 將marketDataQueue與發送股票行情的topic Exchange關聯,並且以marketDataRoutingKey作為綁定 * 的key。這樣就可以接收特定的股票行情。 * @return */ @Bean public Binding marketDataBinding() { return BindingBuilder.bind(marketDataQueue()).to(marketDataExchange()) .with(marketDataRoutingKey); }
其中的RouteKey為marketDataRoutingKey,它的定義如下。
@Value("${stocks.quote.pattern}") // app.stock.quotes.nasdaq.* 接收消息的pattern private String marketDataRoutingKey;
這是從client.properties配置文件中讀取的,此處為stocks.quote.pattern=app.stock.quotes.nasdaq.*。這樣做的好處是當我們要修改這個篩選模式的時候,不需要重新編譯整個項目。
通過上述兩個步驟,我們構造了一個可以從topic Exchange為app.stock.marketdata,routekey為app.stock.quotes.nasdaq.*的Queue中去接收消息,也就是接收所有nasdaq交易所下面的股票信息。
編寫一個監聽器,監聽特定的queue。
/** * 這個bean為監聽適配器,主要的作用是監聽消息 * * @return */ @Bean public MessageListenerAdapter messageListenerAdapter() { // return new MessageListenerAdapter(clientHandler, jsonMessageConverter()); }
該監聽器指定了一個handler作為回調的類,並且指定了消息轉換器為Json類型的消息,clientHandler的代碼如下。
package org.springframework.amqp.rabbit.stocks.handler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.rabbit.stocks.domain.Quote; import org.springframework.amqp.rabbit.stocks.domain.Stock; import org.springframework.amqp.rabbit.stocks.domain.TradeResponse; import org.springframework.amqp.rabbit.stocks.ui.StockController; /** * POJO handler that receives market data and trade responses. Calls are * delegated to the UI controller. * * @author Mark Pollack * @author Mark Fisher */ public class ClientHandler { private static Log log = LogFactory.getLog(ClientHandler.class); private StockController stockController; public StockController getStockController() { return stockController; } public void setStockController(StockController stockController) { this.stockController = stockController; } /** * 處理股票行情消息 * * @param quote */ public void handleMessage(Quote quote) { Stock stock = quote.getStock(); log.info("Received market data. Ticker = " + stock.getTicker() + ", Price = " + quote.getPrice()); stockController.displayQuote(quote); } /** * 處理交易請求的響應消息 * * @param tradeResponse */ public void handleMessage(TradeResponse tradeResponse) { log.info("Received trade repsonse. [" + tradeResponse + "]"); stockController.updateTrade(tradeResponse); } }
上述ClientHandler 處理器有兩個handleMessage方法,分別用於處理行情消息以及交易請求的響應消息,我們現在只需關注第一個方法,第一個方法,打印出股票的行情信息,並且調用stockController.displayQuote(quote);將其顯示在可視化的容器Panel里。
監聽器寫好了,接下來就是把它放入一個container里面。
/** * 這個bean用於監聽服務端發過來的消息,包含兩類消息,一類是服務端發送的行情消息,另外一類是 服務端處理完客戶端的交易請求以后的響應消息 * * @return */ @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer( connectionFactory()); // 設置該監聽器監聽的為marketDataQueue()和traderJoeQueue(),其中marketDataQueue()綁定了app.stock.marketdata這個Exchange和 // app.stock.quotes.nasdaq.*這個routeKey。所以他可以監聽到服務端發過來的nasdaq交易所下的證券信息 // traderJoeQueue()是一個系統自動命名的Queue,當客戶端發送trade // request會用它作為確認(replyTo)的Queue,好讓服務端在處理完后發送確認信息到這個Queue // 所以此處我們也要監聽它。這個bean在client-message.xml中也有配置 container.setConcurrentConsumers(5); container.setQueues(marketDataQueue(), traderJoeQueue()); container.setMessageListener(messageListenerAdapter()); container.setAcknowledgeMode(AcknowledgeMode.AUTO); return container; // container(using(connectionFactory()).listenToQueues(marketDataQueue(), // traderJoeQueue()).withListener(messageListenerAdapter()). }
做完上述步驟,只需要運行服務端的任務管理器,模擬定時發送行情消息,再運行客戶端的監聽器,就可以實現股票行情監測的功能了。
看看服務端的main入口。
package org.springframework.amqp.rabbit.stocks; import org.junit.After; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Server application than can be run as an app or unit test. * * @author Mark Pollack */ public class Server { private ClassPathXmlApplicationContext context; public static void main(String[] args) { new Server().run(); } @After public void close() { if (context != null) { context.close(); } } @Test public void run() { context = new ClassPathXmlApplicationContext("server-bootstrap-config.xml"); } }
它做的只是載入主xml配置文件server-bootstrap-config.xml,由該配置文件去將所有的bean加載。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <context:property-placeholder system-properties-mode="OVERRIDE" /> <!-- pick up rabbit broker configuration --> <context:component-scan base-package="org.springframework.amqp.rabbit.stocks.config.server" /> <import resource="classpath:server-handlers.xml" /> <import resource="classpath:server-messaging.xml" /> <import resource="classpath:server-services.xml" /> <!-- <import resource="classpath:server-jmx.xml" /> --> <!-- 任務我管理器,定時發送行情消息 --> <task:scheduled-tasks> <task:scheduled ref="marketDataGateway" method="sendMarketData" fixed-delay="5000" /> </task:scheduled-tasks> </beans>
看看客戶端的main入口。
package org.springframework.amqp.rabbit.stocks; import javax.swing.JFrame; import org.junit.After; import org.junit.Test; import org.springframework.amqp.rabbit.stocks.ui.StockController; import org.springframework.amqp.rabbit.stocks.ui.StockPanel; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Main client application, can run as an application or unit test. * * @author Mark Pollack */ public class Client { private ConfigurableApplicationContext context; public static void main(String[] args) { new Client().run(); } @Test public void run() { context = new ClassPathXmlApplicationContext("client-bootstrap-config.xml"); StockController controller = context.getBean(StockController.class); JFrame f = new JFrame("Rabbit Stock Demo"); f.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); // TODO consider @Configurable f.add(new StockPanel(controller)); f.pack(); f.setVisible(true); } @After public void close() { if (context != null) { context.close(); } } }
它除了加載主配置文件,還調起了一個Panel,我們可以在該Panel上面進行股票交易請求的操作。
客戶端的主配置xml文件如下。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <!-- pick up rabbit broker configuration --> <context:component-scan base-package="org.springframework.amqp.rabbit.stocks.config.client"/> <context:property-placeholder location="classpath:/client.properties"/> <import resource="classpath:client-handlers.xml" /> <!-- XML version of RabbitClientConfiguration --> <!-- import resource="classpath:client-messaging.xml" /--> </beans>
三.客戶端發送交易請求,服務端接收請求並響應
這一部分的內容可以用以下的流程圖來表示。
1.客戶端發現自己感興趣的股票,執行交易請求操作,將請求消息發送到一個Default Exchange(就是無名稱的Exchange),綁定的RouteKey為app.stock.request.在發送消息的同時,指定了一個匿名的ReplyTo的Queue,即上圖中的tradeJoeQueue。
2.服務端生成名為app.stock.request的queue,因為沒有綁定特定的Exchange,所以其默認的綁定的routeKey即為自己的名稱app.stock.request。
3.服務端監聽上述名為app.stock.request的queue,就可以堅挺到客戶端發送的交易請求。
4.監聽到客戶端的交易請求,服務端對其進行處理,然后將響應沿着客戶端在第一步指定的tradeJoeQueue發送回去。
5.客戶端監聽tradeJoeQueue的消息,接收到服務端返回的響應,打印出來。
接下來我們一步一步的講解上述步驟的具體實現。
首先是客戶端指定發送消息的Exchange和RouteKey,如下所示。
/** * The client's template will by default send to the exchange defined in * {@link org.springframework.amqp.rabbit.config.AbstractRabbitConfiguration#rabbitTemplate()} * with the routing key * {@link AbstractStockAppRabbitConfiguration#STOCK_REQUEST_QUEUE_NAME} * <p> * The default exchange will delivery to a queue whose name matches the * routing key value. * Exchange為default,即無名的Exchange,RoutingKey為app.stock.request,這是客戶端發送信息的配置 * 也就是說客戶端的信息將發送至匿名的Exchange,RoutingKey為app.stock.request的通道 */ @Override public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) { rabbitTemplate.setRoutingKey(STOCK_REQUEST_QUEUE_NAME);// 客戶端將信息發送到defaultExchange,RouteKey為app.stock.request }
接下來是發送消息,發送消息的動作是通過一個可視化的panel來進行的,我們暫且不關心,只看與RabbitMQ相關的發送部分。
package org.springframework.amqp.rabbit.stocks.gateway; import java.io.UnsupportedEncodingException; import java.util.UUID; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitGatewaySupport; import org.springframework.amqp.rabbit.stocks.domain.TradeRequest; /** * Rabbit implementation of {@link StockServiceGateway} to send trade requests * to an external process. * * @author Mark Pollack * @author Gary Russell */ public class RabbitStockServiceGateway extends RabbitGatewaySupport implements StockServiceGateway { private String defaultReplyTo; public void setDefaultReplyTo(String defaultReplyTo) { this.defaultReplyTo = defaultReplyTo; } /** * 此處將發送股票交易請求到DefaultExchange,RouteKey為app.stock.request的通道 * 並且指定了回調的Queue名字為traderJoeQueue().getName() */ public void send(TradeRequest tradeRequest) { getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() { public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setReplyTo( defaultReplyTo); try { message.getMessageProperties().setCorrelationId( UUID.randomUUID().toString() .getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { throw new AmqpException(e); } return message; } }); } }
這是一個發送消息的類,在主配置文件中,我們有配置defaultReplyTo參數,如下。
/** * 這個bean主要用於從客戶端向服務端發送交易請求 * * @return */ @Bean public StockServiceGateway stockServiceGateway() { RabbitStockServiceGateway gateway = new RabbitStockServiceGateway(); gateway.setRabbitTemplate(rabbitTemplate()); // 此處設置DefaultReplyTo為traderJoeQueue().getName(),它將作為一個回調的Queue,接收來自服務端的響應。 // 隱式的注入RabbitTemplate對象到RabbitStockServiceGateway中 // 這個bean在client-message.xml中也有配置 gateway.setDefaultReplyTo(traderJoeQueue().getName()); return gateway; }
其中的traderJoeQueue的代碼如下。
/** * This queue does not need a binding, since it relies on the default * exchange. * 該bean用於接收服務端發送回來的響應消息。 */ @Bean public Queue traderJoeQueue() { return new AnonymousQueue(); }
這個Queue在客戶端主配置文件被載入的時候就生成,它是一個匿名的Queue,也就是說由系統自動命名,它默認綁定的是無名稱的topicExchange,routeKey為自己的名字。它被指定作為交易響應的回調Queue.
客戶端發送完請求消息以后,服務端需要監聽這些請求消息,如下所示。
<!-- 監聽消息 --> <listener-container concurrency="5" connection-factory="connectionFactory" message-converter="jsonMessageConverter" xmlns="http://www.springframework.org/schema/rabbit"> <!-- 從stockRequestQueue.name(app.stock.request)中去監聽消息 --> <listener ref="serverHandler" method="handleMessage" queue-names="#{stockRequestQueue.name}" /> </listener-container>
對應的serverHandler代碼如下所示。
package org.springframework.amqp.rabbit.stocks.handler; import java.util.ArrayList; import java.util.List; import org.springframework.amqp.rabbit.stocks.domain.TradeRequest; import org.springframework.amqp.rabbit.stocks.domain.TradeResponse; import org.springframework.amqp.rabbit.stocks.service.CreditCheckService; import org.springframework.amqp.rabbit.stocks.service.ExecutionVenueService; import org.springframework.amqp.rabbit.stocks.service.TradingService; import org.springframework.util.StringUtils; /** * POJO handler that receives trade requests and sends back a trade response. Main application * logic sits here which coordinates between {@link ExecutionVenueService}, {@link CreditCheckService}, * and {@link TradingService}. * * @author Mark Pollack * */ public class ServerHandler { private ExecutionVenueService executionVenueService; private CreditCheckService creditCheckService; private TradingService tradingService; public ServerHandler(ExecutionVenueService executionVenueService, CreditCheckService creditCheckService, TradingService tradingService) { this.executionVenueService = executionVenueService; this.creditCheckService = creditCheckService; this.tradingService = tradingService; } //委托方法,用於處理客戶端發來的交易請求tradeRequest並且處理 public TradeResponse handleMessage(TradeRequest tradeRequest) { TradeResponse tradeResponse; List<?> errors = new ArrayList<Object>(); if (creditCheckService.canExecute(tradeRequest, errors)) { tradeResponse = executionVenueService.executeTradeRequest(tradeRequest); } else { tradeResponse = new TradeResponse(); tradeResponse.setError(true); tradeResponse.setErrorMessage(StringUtils.arrayToCommaDelimitedString(errors.toArray())); } tradingService.processTrade(tradeRequest, tradeResponse); return tradeResponse; } }
我們可以簡單的理解該處理方法將股票交易的請求數據做了簡單的價格處理(比如隨機將價格換成另外一個數字),然后返回一個響應的對象。
接下來就是客戶端來監聽這些響應消息了,這部分的代碼在第二部分介紹服務端發送行情消息,客戶端監聽消息的時候已經有提及,主要是如下代碼。可以看到它不僅監聽marketDataQueue(),而且監聽了traderJoeQueue(),后者就是用於回傳響應的QUeue.
/** * 這個bean用於監聽服務端發過來的消息,包含兩類消息,一類是服務端發送的行情消息,另外一類是 服務端處理完客戶端的交易請求以后的響應消息 * * @return */ @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer( connectionFactory()); // 設置該監聽器監聽的為marketDataQueue()和traderJoeQueue(),其中marketDataQueue()綁定了app.stock.marketdata這個Exchange和 // app.stock.quotes.nasdaq.*這個routeKey。所以他可以監聽到服務端發過來的nasdaq交易所下的證券信息 // traderJoeQueue()是一個系統自動命名的Queue,當客戶端發送trade // request會用它作為確認(replyTo)的Queue,好讓服務端在處理完后發送確認信息到這個Queue // 所以此處我們也要監聽它。這個bean在client-message.xml中也有配置 container.setConcurrentConsumers(5); container.setQueues(marketDataQueue(), traderJoeQueue()); container.setMessageListener(messageListenerAdapter()); container.setAcknowledgeMode(AcknowledgeMode.AUTO); return container; // container(using(connectionFactory()).listenToQueues(marketDataQueue(), // traderJoeQueue()).withListener(messageListenerAdapter()). }
我們最后來看看客戶端如何處理這些響應消息。如下所示。它只是把它打印出來,並且顯示在可視化的panel上。
/** * 處理交易請求的響應消息 * * @param tradeResponse */ public void handleMessage(TradeResponse tradeResponse) { log.info("Received trade repsonse. [" + tradeResponse + "]"); stockController.updateTrade(tradeResponse); }
至此,關於這個demo的所有知識點都已介紹完畢,相信掌握了這個demo,我們就可以利用spring amqp在自己的項目中實現異步消息隊列。后續有時間我會利用spring amqp實現一個日志管理系統,集成到舊有的系統中去。
PS:demo的下載地址為http://download.csdn.net/download/xxjoy_777/10173957
github下載地址:https://github.com/xdxxdx/stocks