在實際的開發中,我們可能會有這樣的場景:許多客戶端都連接到服務器端,當有某個客戶端的消息的時候,服務器端會主動"推"消息給客戶端,手機app的推送是一個典型的場景(IOS的推送都是要經過蘋果的服務器的,一般是通過蘋果的APNS服務來實現,不需要做過多的開發,安卓的推送就需要我們自己來實現了)
我們可選的技術方案實際上是很多的,使用netty這樣的異步的網絡通信框架或者servlet容器提供的異步的方案都是可以實現的,它們的理念都是一樣的,異步和事件驅動,客戶端請求服務器,當服務器沒有需要推送的數據(或者是需要執行很長時間的IO操作)的時候,請求會被掛起,當服務器端的數據准備好的時候(例如需要向客戶端推送一個消息的時候,或者是服務器端IO操作執行完畢了)請求會被重新激活,數據返回客戶端.
使用jetty的continuations或者是netty來實現這兩種是我覺得比較好的實現方案,今天介紹一下如何使用jetty的continuations來實現一個服務器推的原型,和正式環境中向安卓手機的推送的實現方法是完全一樣的
continuations介紹:jetty的continuations是jetty實現的實現異步請求和事件驅動的組件,從jetty7起,continuations不止在jetty中可以使用,任何支持servlet3.0規范的servlet容器都可以使用continuations來實現異步和事件驅動,相比servlet3.0規范中的異步servlet,continuations提供了更加簡化的編程模型.
目標:用瀏覽器請求服務器的一個URL(用瀏覽器來模擬我們的客戶端),實現任何時候當服務器需要推送數據的時候,瀏覽器能夠立即顯示出來
我們需要提供兩個接口:提供給客戶端做長連接的接口,向客戶端發送數據的接口
提供給客戶端連接的servlet:
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/23 * Time: 23:52 * To change this template use File | Settings | Editor | File and Code Templates */ import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.ContinuationListener; import org.eclipse.jetty.continuation.ContinuationSupport; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.Map; @WebServlet(urlPatterns = "/pull", asyncSupported = true) public class ContinuationServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String user = req.getParameter("user"); Map<String, PushAgent> pushAgentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap"); if (pushAgentMap.containsKey(user)) { PushAgent pushAgent = pushAgentMap.get(user); Continuation continuation = ContinuationSupport.getContinuation(req); continuation.setTimeout(90000000); //第一次請求進來 if (continuation.isInitial()) { resp.setContentType("text/evf;charset=utf-8"); resp.setHeader("Connection", "keep-alive"); resp.setHeader("Keep-Alive", "timeout=2000"); PushAdapter pushAdapter = new PushAdapter(continuation, pushAgent); continuation.setAttribute("adapter", pushAdapter); continuation.addContinuationListener(new ContinuationListener() { @Override public void onComplete(Continuation continuation) { PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter"); if (null != adapter) { continuation.setAttribute("adapter", null); } } @Override public void onTimeout(Continuation continuation) { onComplete(continuation); } }); resp.flushBuffer(); } if (continuation.isExpired()) { return; } Writer writer = getWriter(resp); PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter"); Message message; while (true) { message = adapter.getPushAgent().pull(); if (null == message) break; try { writer.write(message.getContent()); writer.flush(); writer.write("\r\n"); writer.flush(); resp.flushBuffer(); } catch (Exception e) { throw e; } } //若沒有該客戶端的消息,則請求被掛起 continuation.suspend(); } } private Writer getWriter(HttpServletResponse response) throws IOException { OutputStream os = response.getOutputStream(); return new OutputStreamWriter(os, "UTF-8"); } }
向客戶端推送消息的servlet:
package com.jiaoyiping.websample.asyncServlet.jetty; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; import java.util.Map; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:46 * To change this template use File | Settings | Editor | File and Code Templates */ @WebServlet(urlPatterns = "/send") public class MesssageSendServlet extends HttpServlet { @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { //不要在自己實現的servlet中調用 super.doGet(0)或者是super.doPost() //因為在tomcat它們的默認實現是報405(HTTP1.1)或者400(其他版本的HTTP) // super.doPost(req, resp); String target = req.getParameter("target"); String messageStr = req.getParameter("message"); Map<String, PushAgent> agentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap"); if (agentMap.keySet().contains(target)) { Message message = new Message(); message.setTarget(target); message.setContent(messageStr); if (agentMap.get(target).isInited()) { agentMap.get(target).onEvent(message); } agentMap.get(target).send(message); PrintWriter out = resp.getWriter(); out.print("發送成功"); out.flush(); out.close(); } } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { this.doPost(req, resp); } }
推送代理:就是可以拿到客戶端相關信息,並且維護客戶端消息隊列的類,在這個推送代理中,我們可以加入一個監聽器,當有數據需要推送的時候,激活請求
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 22:56 * To change this template use File | Settings | Editor | File and Code Templates */ public interface PushAgent { Terminal getTerminal(); String getAddress(); String getToken(); Message send(Message message); Message pull(); void addListener(MessageListener messageListener); void onEvent(Message message); boolean isInited(); }
默認實現(每個需要接受推送的用戶對應一個PushAgent,和用戶端保持長連接的線程從queue里讀取mesage對象,向某個用戶推送的時候將message對象放到該用戶對應的PushAgent的queue里,這里是一個生產者-消費者模式):
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:17 * To change this template use File | Settings | Editor | File and Code Templates */ import java.util.PriorityQueue; import java.util.Queue; public class DefaultPushAgent implements PushAgent { private Terminal terminal; //客戶端通過長連接連接到服務器時,服務器不斷地從該隊列poll(),若果拿到新的消息,則返回給客戶端 private Queue<Message> messages = new PriorityQueue<>(); private MessageListener messageListener; @Override public Terminal getTerminal() { return this.terminal; } @Override public String getAddress() { return null; } @Override public String getToken() { return null; } @Override public Message send(Message message) { synchronized (message) { messages.add(message); } return message; } @Override public Message pull() { synchronized (messages) { return messages.poll(); } } @Override public void addListener(MessageListener messageListener) { this.messageListener = messageListener; } @Override public void onEvent(Message message) { this.messageListener.onMessage(message); } @Override public boolean isInited() { return this.messageListener != null; } public DefaultPushAgent(Terminal terminal) { this.terminal = terminal; } }
PushAdapter的實現(用戶將Continuation和PushAgent關聯起來):
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:37 * To change this template use File | Settings | Editor | File and Code Templates */ import org.eclipse.jetty.continuation.Continuation; public class PushAdapter { private Continuation continuation; private PushAgent pushAgent; public PushAdapter(Continuation continuation, PushAgent pushAgent) { this.continuation = continuation; this.pushAgent = pushAgent; this.pushAgent.addListener(message -> { if (PushAdapter.this.continuation.isSuspended()) { PushAdapter.this.continuation.resume(); } }); } public Continuation getContinuation() { return continuation; } public void setContinuation(Continuation continuation) { this.continuation = continuation; } public PushAgent getPushAgent() { return pushAgent; } public void setPushAgent(PushAgent pushAgent) { this.pushAgent = pushAgent; } }
MessageListener的實現(監聽需要推送消息的事件,這里為了做演示,並沒有實現一個完整的觀察者模式,只是在需要推送消息的時候,手工調用 onMessage()):
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/26 * Time: 2:03 * To change this template use File | Settings | Editor | File and Code Templates */ public interface MessageListener { void onMessage(Message message); }
測試數據:使用一個listener在應用初始化的時候,初始化一些數據做為測試數據
package com.jiaoyiping.websample.asyncServlet.jetty; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; import java.util.HashMap; import java.util.Map; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:55 * To change this template use File | Settings | Editor | File and Code Templates */ @WebListener public class PushListener implements ServletContextListener { @Override public void contextInitialized(ServletContextEvent sce) { Map<String, PushAgent> agentMap = new HashMap<>(); agentMap.put("zhangsan", new DefaultPushAgent(new Terminal() {{ setAddress("zhangsan"); setToken("zhangsan_token"); }})); agentMap.put("lisi", new DefaultPushAgent(new Terminal() {{ setAddress("lisi"); setToken("lisi_token"); }})); sce.getServletContext().setAttribute("agentmap",agentMap); } @Override public void contextDestroyed(ServletContextEvent sce) { } }
最終的效果是這樣的,我截了一個git圖:

