- $(function () {
- getMsg();
- });
- function getMsg()
- {
- $.ajax({
- url:"/polling/msg",
- type:"get",
- data:{},
- success:function(data)
- {
- if(data != null && data!="")
- alertShow(data.msg);
- getMsg();
- }
- });
- }
- /**
- *
- * @author {chensg}:2016年6月1日
- * example
- *
- */
- @Controller
- @RequestMapping("/polling/")
- public class PollingController {
- @Autowired
- MessageContainer messageContainer; //全局存放每一個user創建的DeferredResult實例,key:userId,value:DeferredResult
- @Autowired
- RabbitTemplate rabbitTemplate;
- /**
- * 長輪詢
- * @return
- */
- @RequestMapping(value="msg", method=RequestMethod.GET)
- public @ResponseBody DeferredResult<UserMessage> getMessage() {
- final String userId = (UserDetails) SecurityContextHolder.getContext()
- .getAuthentication()
- .getPrincipal().getUsername();
- DeferredResult<UserMessage> result = new DeferredResult<UserMessage>(30000l,null); //設置超時30s,超時返回null
- final Map<String, DeferredResult> resultMap=messageContainer.getUserMessages();
- resultMap.put(userId, result);
- result.onCompletion(new Runnable()
- {
- @Override
- public void run() {
- resultMap.remove(userId);
- }
- });
- return result;
- }
- /**
- * test 新增需要推給某某用戶的消息
- * @return
- */
- @RequestMapping(value="msg", method=RequestMethod.POST)
- public @ResponseBody RestResult addMessage(String msg,String userId) {
- UserMessage userMsg = new UserMessage();
- userMsg.setUserId(userId);
- userMsg.setMsg(msg);
- //系統或者其他用戶需要推送的消息放入消息隊列
- rabbitTemplate.convertAndSend("test.exchange", "test.binding", userMsg);
- return null;
- }
- }
頁面加載完成時,該用戶請求/polling/msg控制器接口,接口里會創建一個DeferredResult實例,設置超時30S,超時返回null。DeferredResult<?> 允許應用程序從一個線程中返回,而何時返回則由線程決定。
消息實體類
- public class UserMessage implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private String userId;
- private String msg;
- public String getUserId() {
- return userId;
- }
- public void setUserId(String userId) {
- this.userId = userId;
- }
- public String getMsg() {
- return msg;
- }
- public void setMsg(String msg) {
- this.msg = msg;
- }
- }
配置rabbitMQ
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd">
- <!-- 創建一個connectionFactory -->
- <rabbit:connection-factory id="rabbitConnectionFactory"
- host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"
- virtual-host="/" />
- <!-- 創建一個rabbitTemplate, 設置retryTemplate -->
- <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory"
- retry-template="retryTemplate" />
- <!-- 創建一個retryTemplate -->
- <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
- <property name="backOffPolicy">
- <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
- <property name="initialInterval" value="500" />
- <property name="multiplier" value="10.0" />
- <property name="maxInterval" value="10000" />
- </bean>
- </property>
- </bean>
- <rabbit:admin connection-factory="rabbitConnectionFactory" />
- <!-- 創建一個用於消息推送的隊列 -->
- <rabbit:queue id="testQueue" name="test.polling" />
- <rabbit:direct-exchange name="test.exchange">
- <rabbit:bindings>
- <rabbit:binding queue="test.polling" key="test.binding" />
- </rabbit:bindings>
- </rabbit:direct-exchange>
- <!-- 創建一個消息處理器 -->
- <bean id="servicePollingHandler"
- class="com.xxx.controller.test.ServicePollingHandler" />
- <!-- 綁定監聽器和隊列 -->
- <rabbit:listener-container connection-factory="rabbitConnectionFactory">
- <rabbit:listener ref="servicePollingHandler"
- method="testPollingHandle"
- queues="testQueue" />
- </rabbit:listener-container>
- </beans>
- public class ServicePollingHandler {
- @Autowired
- MessageContainer messageContainer;
- public void testPollingHandle(UserMessage userMessage)
- {
- Map<String, DeferredResult> msgContainer = messageContainer.getUserMessages();
- DeferredResult<UserMessage> deferredResult = msgContainer.get(userMessage.getUserId());
- if (deferredResult!=null){
- deferredResult.setResult(userMessage); //調用setResult(),線程返回信息。
- }
- }
- }
- @PropertySource(value="classpath:application.properties")
- @ImportResource({"classpath:amqp.xml"})
- public class RootConfig {
- @Bean
- public MessageContainer messageContainer() {
- return new MessageContainer();
- }
- }
- public class MessageContainer {
- private ConcurrentHashMap<String, DeferredResult> userMessages = new ConcurrentHashMap<String, DeferredResult>(); //線程安全
- public ConcurrentHashMap<String, DeferredResult> getUserMessages() {
- return userMessages;
- }
- }
該例子的用途,當一個用戶登錄頁面時,異步請求后台/polling/msg,后台創建一個線程,維持改長連接30s,當超時或者返回信息,頁面則再次請求后台,維持一個30s的長連接(長輪詢)。
系統或者其他用戶調用/polling/msg method:post,傳入msg與userId,控制器把消息放入消息隊列,消息隊列把消息推送到ServicePollingHandler類testPollingHandle()方法,該方法根據userId獲得該用戶登陸之后的頁面長輪詢創建的deferredResult實例,調用setResult,頁面接受到線程返回消息。
可以基於以上代碼,實現web聊天
轉自:http://chenshangge.iteye.com/blog/2302710