使用jetty的continuations實現"服務器推"


 在實際的開發中,我們可能會有這樣的場景:許多客戶端都連接到服務器端,當有某個客戶端的消息的時候,服務器端會主動"推"消息給客戶端,手機app的推送是一個典型的場景(IOS的推送都是要經過蘋果的服務器的,一般是通過蘋果的APNS服務來實現,不需要做過多的開發,安卓的推送就需要我們自己來實現了)

我們可選的技術方案實際上是很多的,使用netty這樣的異步的網絡通信框架或者servlet容器提供的異步的方案都是可以實現的,它們的理念都是一樣的,異步和事件驅動,客戶端請求服務器,當服務器沒有需要推送的數據(或者是需要執行很長時間的IO操作)的時候,請求會被掛起,當服務器端的數據准備好的時候(例如需要向客戶端推送一個消息的時候,或者是服務器端IO操作執行完畢了)請求會被重新激活,數據返回客戶端.

使用jetty的continuations或者是netty來實現這兩種是我覺得比較好的實現方案,今天介紹一下如何使用jetty的continuations來實現一個服務器推的原型,和正式環境中向安卓手機的推送的實現方法是完全一樣的

continuations介紹:jetty的continuations是jetty實現的實現異步請求和事件驅動的組件,從jetty7起,continuations不止在jetty中可以使用,任何支持servlet3.0規范的servlet容器都可以使用continuations來實現異步和事件驅動,相比servlet3.0規范中的異步servlet,continuations提供了更加簡化的編程模型.

 

目標:用瀏覽器請求服務器的一個URL(用瀏覽器來模擬我們的客戶端),實現任何時候當服務器需要推送數據的時候,瀏覽器能夠立即顯示出來

我們需要提供兩個接口:提供給客戶端做長連接的接口,向客戶端發送數據的接口

提供給客戶端連接的servlet:

 

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: jiaoyiping@gmail.com
  * Date: 2016/10/23
  * Time: 23:52
  * To change this template use File | Settings | Editor | File and Code Templates
 */

import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.continuation.ContinuationSupport;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Map;

@WebServlet(urlPatterns = "/pull", asyncSupported = true)
public class ContinuationServlet extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String user = req.getParameter("user");
        Map<String, PushAgent> pushAgentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");
        if (pushAgentMap.containsKey(user)) {
            PushAgent pushAgent = pushAgentMap.get(user);
            Continuation continuation = ContinuationSupport.getContinuation(req);
            continuation.setTimeout(90000000);
            //第一次請求進來
            if (continuation.isInitial()) {
                resp.setContentType("text/evf;charset=utf-8");
                resp.setHeader("Connection", "keep-alive");
                resp.setHeader("Keep-Alive", "timeout=2000");
                PushAdapter pushAdapter = new PushAdapter(continuation, pushAgent);
                continuation.setAttribute("adapter", pushAdapter);
                continuation.addContinuationListener(new ContinuationListener() {
                    @Override
                    public void onComplete(Continuation continuation) {
                        PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");
                        if (null != adapter) {
                            continuation.setAttribute("adapter", null);
                        }
                    }

                    @Override
                    public void onTimeout(Continuation continuation) {
                        onComplete(continuation);
                    }

                });
                resp.flushBuffer();
            }
            if (continuation.isExpired()) {
                return;
            }
            Writer writer = getWriter(resp);
            PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");
            Message message;
            while (true) {
                message = adapter.getPushAgent().pull();
                if (null == message)
                    break;
                try {
                    writer.write(message.getContent());
                    writer.flush();
                    writer.write("\r\n");
                    writer.flush();
                    resp.flushBuffer();
                } catch (Exception e) {
                    throw e;

                }
            }
            //若沒有該客戶端的消息,則請求被掛起
            continuation.suspend();
        }

    }

    private Writer getWriter(HttpServletResponse response) throws IOException {
        OutputStream os = response.getOutputStream();
        return new OutputStreamWriter(os, "UTF-8");
    }


}

 

向客戶端推送消息的servlet:

package com.jiaoyiping.websample.asyncServlet.jetty;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;

/*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: jiaoyiping@gmail.com
  * Date: 2016/10/25
  * Time: 23:46
  * To change this template use File | Settings | Editor | File and Code Templates
 */
@WebServlet(urlPatterns = "/send")
public class MesssageSendServlet extends HttpServlet {
    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        //不要在自己實現的servlet中調用 super.doGet(0)或者是super.doPost()
        //因為在tomcat它們的默認實現是報405(HTTP1.1)或者400(其他版本的HTTP)
        //        super.doPost(req, resp);

        String target = req.getParameter("target");
        String messageStr = req.getParameter("message");

        Map<String, PushAgent> agentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");
        if (agentMap.keySet().contains(target)) {
            Message message = new Message();
            message.setTarget(target);
            message.setContent(messageStr);
            if (agentMap.get(target).isInited()) {
                agentMap.get(target).onEvent(message);
            }
            agentMap.get(target).send(message);
            PrintWriter out = resp.getWriter();
            out.print("發送成功");
            out.flush();
            out.close();
        }


    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        this.doPost(req, resp);
    }
}

 

推送代理:就是可以拿到客戶端相關信息,並且維護客戶端消息隊列的類,在這個推送代理中,我們可以加入一個監聽器,當有數據需要推送的時候,激活請求

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: jiaoyiping@gmail.com
  * Date: 2016/10/25
  * Time: 22:56
  * To change this template use File | Settings | Editor | File and Code Templates
 */

public interface PushAgent {

    Terminal getTerminal();

    String getAddress();

    String getToken();

    Message send(Message message);

    Message pull();

    void addListener(MessageListener messageListener);

    void onEvent(Message message);

    boolean isInited();
}

默認實現(每個需要接受推送的用戶對應一個PushAgent,和用戶端保持長連接的線程從queue里讀取mesage對象,向某個用戶推送的時候將message對象放到該用戶對應的PushAgent的queue里,這里是一個生產者-消費者模式):

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: jiaoyiping@gmail.com
  * Date: 2016/10/25
  * Time: 23:17
  * To change this template use File | Settings | Editor | File and Code Templates
 */

import java.util.PriorityQueue;
import java.util.Queue;

public class DefaultPushAgent implements PushAgent {

    private Terminal terminal;
    //客戶端通過長連接連接到服務器時,服務器不斷地從該隊列poll(),若果拿到新的消息,則返回給客戶端
    private Queue<Message> messages = new PriorityQueue<>();
    private MessageListener messageListener;

    @Override
    public Terminal getTerminal() {
        return this.terminal;
    }

    @Override
    public String getAddress() {
        return null;
    }

    @Override
    public String getToken() {
        return null;
    }

    @Override
    public Message send(Message message) {
        synchronized (message) {
            messages.add(message);
        }

        return message;
    }

    @Override
    public Message pull() {
        synchronized (messages) {
            return messages.poll();
        }

    }

    @Override
    public void addListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    @Override
    public void onEvent(Message message) {
        this.messageListener.onMessage(message);
    }

    @Override
    public boolean isInited() {
        return this.messageListener != null;
    }


    public DefaultPushAgent(Terminal terminal) {
        this.terminal = terminal;
    }
}

 

PushAdapter的實現(用戶將Continuation和PushAgent關聯起來):
package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: jiaoyiping@gmail.com
  * Date: 2016/10/25
  * Time: 23:37
  * To change this template use File | Settings | Editor | File and Code Templates
 */

import org.eclipse.jetty.continuation.Continuation;

public class PushAdapter {
    private Continuation continuation;
    private PushAgent pushAgent;

    public PushAdapter(Continuation continuation, PushAgent pushAgent) {
        this.continuation = continuation;
        this.pushAgent = pushAgent;
        this.pushAgent.addListener(message -> {
            if (PushAdapter.this.continuation.isSuspended()) {
                PushAdapter.this.continuation.resume();
            }
        });
    }

    public Continuation getContinuation() {
        return continuation;
    }

    public void setContinuation(Continuation continuation) {
        this.continuation = continuation;
    }

    public PushAgent getPushAgent() {
        return pushAgent;
    }


    public void setPushAgent(PushAgent pushAgent) {
        this.pushAgent = pushAgent;
    }
}

 

MessageListener的實現(監聽需要推送消息的事件,這里為了做演示,並沒有實現一個完整的觀察者模式,只是在需要推送消息的時候,手工調用 onMessage()):

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: jiaoyiping@gmail.com
  * Date: 2016/10/26
  * Time: 2:03
  * To change this template use File | Settings | Editor | File and Code Templates
 */

public interface MessageListener {
    void onMessage(Message message);
}    

 

測試數據:使用一個listener在應用初始化的時候,初始化一些數據做為測試數據

 

package com.jiaoyiping.websample.asyncServlet.jetty;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import java.util.HashMap;
import java.util.Map;

/*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: jiaoyiping@gmail.com
  * Date: 2016/10/25
  * Time: 23:55
  * To change this template use File | Settings | Editor | File and Code Templates
 */
@WebListener
public class PushListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        Map<String, PushAgent> agentMap = new HashMap<>();
        agentMap.put("zhangsan", new DefaultPushAgent(new Terminal() {{
            setAddress("zhangsan");
            setToken("zhangsan_token");
        }}));
        agentMap.put("lisi", new DefaultPushAgent(new Terminal() {{
            setAddress("lisi");
            setToken("lisi_token");
        }}));

        sce.getServletContext().setAttribute("agentmap",agentMap);
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}

 

最終的效果是這樣的,我截了一個git圖:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM