推送系統
說是推送系統有點大,其實就是一個消息廣播功能吧。作用其實也就是由服務端接收到消息然后推送到訂閱的客戶端。
思路
對於推送最關鍵的是服務端向客戶端發送數據,客戶端向服務端訂閱自己想要的消息。這樣的好處就是有消息后才向客戶端推送,相比於拉取數據不會產生許多無效的查詢,實時性也高。
xmpp這種即時通信協議基於TCP長連接還是比較符合這種場景的。只需要在服務端增加一個模塊用於接收用戶訂閱與數據的推送就完成了主體功能。
在xmpp協議里可以擴展組件,這樣我們寫一個組件,然后連接到xmpp服務器,這樣就可以應用於不同的xmpp服務器。
准備工作
主要的環境
因為我比較熟悉openfire的體系,所以自然就用它。客戶端暫時沒有特別的需求,只是用於接收數據,所以用smack或者任何一款xmpp 客戶端都可以。我為了簡單就用smack寫一個簡單的代碼。
需要用到的jar包
用到的了whack的core,在maven工程里直接引用即可,相關的依賴包會自動加載進來
<dependency> <groupId>org.igniterealtime.whack</groupId> <artifactId>core</artifactId> <version>2.0.1-SNAPSHOT</version> <type>jar</type> </dependency>
核心模塊
推送服務
推送服務就是等待或者獲得需要推送的消息數據后向用戶廣播出去的服務。因為這里暫時沒有設定數據的場景,所以就簡單的用一個阻塞隊列來表示。步驟:
- 數據通過推送接口寫入到推送服務
- 推送服務將數據寫入到消息隊列
- 發送線程檢測到消息后取出並發給訂閱的客戶端
在此我寫了一個PushServer的類用於表示推送服務,這個類里包含了:
- 一個消息隊列
- 一個發送線程
- 一個訂閱列表
- 以及一些發送相關的xmpp組件
消息隊列
//消息列表
private BlockingQueue<Packet> packetQueue;
使用到了生產者消費者模式,所以用了一個阻塞隊列,用於存放等待發送的消息數據。
發送線程
private class PacketSenderThread extends Thread { private volatile Boolean shutdown = false; private BlockingQueue<Packet> queue; private Component component; private ComponentManager componentManager; public PacketSenderThread(ComponentManager componentManager, Component component, BlockingQueue<Packet> queue) { this.componentManager = componentManager; this.component = component; this.queue = queue; } public void run() { while (!shutdown) { Packet p; try { p = queue.take(); componentManager.sendPacket(component, p); } catch (InterruptedException e1) { System.err.println(e1.getStackTrace()); } catch (ComponentException e) { e.printStackTrace(); } } } public void shutdown() { shutdown = true; this.interrupt(); } }
這個線程繼承了Thread,線程的功能很簡單,就是一直從queue中獲得消息,因為是阻塞的隊列,所以沒有消息時會阻塞,一旦有消息就會執行發送sendPacket將包發送出去。
這里使用到了componentManager,這個是openfire實現的一個組件管理類,通過這個類的對象可以發送xmpp數據包。
增加shutdown方法,使得線程可以在外部進行退出操作。
訂閱列表
//訂閱列表 private Set<JID> subscriptions; public synchronized void subscription(JID jid) { subscriptions.add(jid); } public synchronized void unsubscription(JID jid) { subscriptions.remove(jid); }
只有訂閱了這個推送服務的客戶端才會進行推送操作,這里的代碼就是用於訂閱與退訂操作。用了一個HashSet來存儲。
xmpp組件
public class PushComponent extends AbstractComponent{ public PushComponent() { } @Override public String getDescription() { return "用於消息推送服務組件,主要功能就是將消息轉發給具體的客戶端,實現消息中轉的功能"; } @Override public String getName() { return "pusher"; } @Override protected void handleMessage(Message message) { } } public class PushManager { private static PushManager _instance = new PushManager(); private Map<String, PushServer> pushServers; private ExternalComponentManager manager; private PushManager() { pushServers = new ConcurrentHashMap<String, PushServer>(); manager = new ExternalComponentManager("192.168.149.214", 5275); manager.setSecretKey("push", "test"); manager.setMultipleAllowed("push", true); } public static PushManager getInstance() { return _instance; } public void init() { try { //初始化PushServer PushServer pushSvr = new PushServer("push", manager); pushServers.put("push", pushSvr); //注冊Component到xmpp服務器 manager.addComponent(pushSvr.getPushDomain(), pushSvr.getComp()); } catch (ComponentException e) { e.printStackTrace(); } } public PushServer getPushServer(String pushDomain) { return pushServers.get(pushDomain); } }
這里的PushComponent就是一個xmpp組件,相當於一個擴展模塊,可以接收消息並處理消息,也就是自己寫一些和xmpp相關的業務功能。
PushManager就是管理組件並連接到xmpp服務器的一個類。
服務端啟動
public class App { public static void main( String[] args ) { PushManager.getInstance().init(); //推送消息 PushServer ps = PushManager.getInstance().getPushServer("push"); ps.start(); JID client1 = new JID("1twja8e8yr@domain/1twja8e8yr"); ps.subscription(client1); try { for (Integer i = 0; i< 200; i++) { ps.putPacket("推送消息200:" + i.toString()); Thread.sleep(1); } Thread.sleep(5000); } catch (InterruptedException e1) { e1.printStackTrace(); } ps.stop(); System.out.println("go die"); } }
這段代碼模擬了服務的啟動,同時為了簡化功能這里直接添加了一個訂閱用戶。
客戶端
public class TestAnonymous { public static void main(String[] args) { AbstractXMPPConnection connection = SesseionHelper.newConn("192.168.149.214", 5223, "domain"); try { connection.login();//匿名登錄 connection.addAsyncStanzaListener(new StanzaListener() { @Override public void processPacket(Stanza packet) throws NotConnectedException { System.out.println((new Date()).toString()+ ":" + packet.toXML()); } }, new StanzaFilter() { @Override public boolean accept(Stanza stanza) { return stanza instanceof Message; } }); } catch (XMPPException | SmackException | IOException e) { e.printStackTrace(); } while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
客戶端代碼啟動一個xmpp連接,然后登錄到服務器,同時訂閱消息,將收到的消息print出來。
整個過程就完成了。