在研究如何實現Pushing功能期間,收集了很多關於Pushing的資料,其中有一個androidnp開源項目用的人比較多,但是由於長時間沒有什么人去維護,聽說bug的幾率挺多的,為了以后自己的產品穩定些,所以就打算自己研究一下asmack的源碼,自己做一個插件,androidnp移動端的源碼中包含了一個叫做asmack的jar。
Reader和Writer
在asmack中有兩個非常重要的對象PacketReader和PacketWriter,那么從類名上看Packet + (Reader/Wirter),而TCP/IP傳輸的數據,叫做Packet(包),asmack使用的是XMPP協議,XMPP簡單講就是使用TCP/IP協議 + XML流協議的組合。所以這個了對象的作用從字面上看應該是,寫包與讀包,作用為從服務端讀寫數據。
PacketWriter中一定含有一個Writer對象,這個Writer是一個輸出流,同樣的PacketReader對象中有一個Reader,而這個Reader是一個輸入流,Writer和Reader對象就是一個簡單的讀寫器,他們是從socket對象中獲取出來后,經過裝飾變成現在這個樣子。
1 reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")); 2 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));
沒有什么神奇的地方,主要看PacketWriter/Reader,這兩個對象分別把對應的Writer和Reader引用到自己的內部進行操作,下面就先看一個PacketWriter。
/** * Creates a new packet writer with the specified connection. * * @param connection the connection. */ protected PacketWriter(XMPPConnection connection) { this.queue = new ArrayBlockingQueue<Packet>(500, true); this.connection = connection; init(); }
還有就是PacketWriter初始化的時候將XMPPConnection對象傳了進來,因為在init方法中使用到了XMPPConnection對象的writer成員,我想說的是,為什么不直接傳遞writer成員?而是將整個對象XMPPConnection傳了過來?其實這就是設計模式的好處,我們如果每次都傳遞的是自己的成員,那么如果后期有改動,實現一個新的XMPPConnection與PacketWriter關聯,那么老的代碼維護起來是很巨大的,如果這里XMPPConnection和他的同事類PacketWriter都有相對應的接口,(XMPPConnection的接口是Connection)那就更完美了,而這里用到的模式應該是中介者,不是絕對意義的中介者,由於形成中介者的條件比較高,所以實際開發中多是變形使用。PacketWriter對象在XMPPConnection中的connect方法中被初始化,它的最大作用是在其自身的內部創建了兩個消息循環,其中一個用30s的heartbeats向服務器發送空白字符,保持長連接。而第二個循環則時刻從隊列中主動取消息並發往服務器,而向外部提供的sendPacket方法則是向queue中添加消息,前面提到的循環機制都是在線程中工作,而消息的隊列用的是ArrayBlockingQueue,這個無邊界阻塞隊列可以存放任何對象,這里存放的是Packet對象。
1 public void sendPacket(Packet packet) { 2 if (!done) { 3 try { 4 queue.put(packet); 5 } 6 catch (InterruptedException ie) { 7 ie.printStackTrace(); 8 return; 9 } 10 synchronized (queue) { 11 queue.notifyAll(); 12 } 13 } 14 }
while (!done && (writerThread == thisThread)) { Packet packet = nextPacket(); if (packet != null) { synchronized (writer) { writer.write(packet.toXML()); writer.flush(); // Keep track of the last time a stanza was sent to the server lastActive = System.currentTimeMillis(); } } }
消息循環則是一個通過各種成員變量控制的while loop,第一行的nextPacket方法是向queue中獲取Packet消息,並且通過weiter將包發出去,這樣生產/消費的模型就搭建好了,這里需要注意的是,我刪減了很多影響閱讀的代碼,並沒有全部貼上。關於heartbeats循環其實也是一個在線程中運行的while loop,也是通過一些成員控制。wirter向服務端寫了寫什么?看下面的這個方法
void openStream() throws IOException { StringBuilder stream = new StringBuilder(); stream.append("<stream:stream"); stream.append(" to=\"").append(connection.getServiceName()).append("\""); stream.append(" xmlns=\"jabber:client\""); stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\""); stream.append(" version=\"1.0\">"); writer.write(stream.toString()); writer.flush(); }
XML,沒錯,這也是符合XMPP協議規范的一種表現吧,至於更多XMPP協議的好處,由於本人的經驗有限,就不多做點評,希望后續會對其深入了解。
下面看一個PacketReader這個類都包含了什么職責。
PacketReader
PacketReader所有的核心邏輯都在一個線程中完成的,PacketReader的工作很專注,同樣的在一個while loop中 不停的解析、刷新reader對象、同時作為事件源發送解析過后的各種Packet,解析這里用的是Android獨特的Pull解析,Pull解析的特點事件驅動,在這里被完全的利用了起來,隨着不同的標簽,PacketReader都會做出不同的處理,處理完這些數據用不同Pocket對象封裝,最后,分發出去,由監聽者做最后的業務處理。
readerThread = new Thread() {
public void run() {
parsePackets(this);
}
};
由於解析過程的代碼量過於多,我寫到什么地方就分解什么地方,大家有時間最好自己看源碼。
一、初始化/重置解析器
private void resetParser() {
try {
//用的是Pull解析
parser = XmlPullParserFactory.newInstance().newPullParser();
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
parser.setInput(connection.reader);
}
catch (XmlPullParserException xppe) {
xppe.printStackTrace();
}
}
上面這個resetParser方法還會在解析的過程中碰到不同的業務需求會不斷的被調用,有用和業務邏輯比較緊密,沒什么技術含量,關鍵是要看解析的方式和同時作為事件源發送解析過后的各種Packet,這兩部分的設計,是非常的迷人的。
二、解析
do {
if (eventType == XmlPullParser.START_TAG) {
if (parser.getName().equals("message")) {
processPacket(PacketParserUtils.parseMessage(parser));
}
else if (parser.getName().equals("iq")) {
processPacket(PacketParserUtils.parseIQ(parser, connection));
}
else if (parser.getName().equals("presence")) {
processPacket(PacketParserUtils.parsePresence(parser));
}
PacketParserUtils是一個工具類,各個靜態方法傳入的還是Parser對象,內部同樣的使用Pull的方式進行解析,但是由於Pull是驅動解析,不會無故的浪費資源只會加載感興趣的內容,試想一下,如果這里用Dom解析……PacketParserUtils的這些靜態解析方法返回的實例對象也不一樣,從方法名可以看出有IQ、message、presence等,他們的父類為Packet,這些對象又被執行processPacket方法的時候傳入
private void processPacket(Packet packet) {
if (packet == null) {
return;
}
// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector: connection.getPacketCollectors()) {
collector.processPacket(packet);
}
// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}
processPacket方法內部有一個循環來轉調collector.processPacket(packet);方法,前提是connection.getPacketCollectors()內部有貨,到目前位置都沒有涉及到PacketCollector這個接口的內容,他的作用其實是一個觀察者模式中的執行者的作用,也就是傳說中的監聽器,凡是注冊了它的對象,都可以通過processPacket這個抽象方法,監聽packet的變化。可是到現在任何對象都沒有注冊它,所以這個Loop還沒有作用,因為目前我們還處在連接的步驟(還沒繞出來)。
listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable
/**
* A runnable to notify all listeners of a packet.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
listenerWrapper.notifyListener(packet);
}
}
}
我們上面看到listenerExecutor是一個線程池,在線程池中執行了一個凡是注冊了ListenerWrapper的對象,都將接收到packet,同樣的,到目前為止沒有對象注冊,(在RegisterTask過程中ListenerWrapper被注冊)
else if (eventType == XmlPullParser.END_TAG) {
if (parser.getName().equals("stream")) {
// Disconnect the connection
connection.disconnect();
}
}
當文檔讀取結束是將斷開連接
void cleanup() {
connection.recvListeners.clear();
connection.collectors.clear();
}
看到了嗎,只是將監聽器接口集合清空而已,並沒有斷開連接,或者取消消息循環
PacketReader對象的startup方法比較復雜,大體上執行了讀取流,並將解析好的Packet對象發送給觀察者,由觀察者繼續后續操作,目前觀察者還沒有出現,還有就是使用了線程池和令牌來操作執行線程,而且維護了一個connectionID成員,這個成員的作用還需要再看,這就不多說了。
關於Packet對象,packet對象有很多子類,上面舉例了3個,其實還有很多,都是在parser時封裝的
AuthMechanism\Challenge\Failure\IQ\Message\Presence\Response\Success
還有就是Pull解析的優點體現了出來,可以一個parser對象包含了很多信息,但可能沒到一個時刻我們需要的信息只是一小部分,這樣用Pull解析的驅動式就大大減少了冗余的過程,PacketReader對象使用了2個監聽器集合對象,PacketCollector、listenerWrapper,還是那句話,還沒看到觀察者,所以還不知道什么情況下需要注冊這兩個監聽。
到目前位置packetReader.startup()方法終於告一個段落了。
register過程分析
RegisterTask這個task在運行中,添加了一個監聽,上面說道的PacketReader中有一個消息機制,在不停的解析服務器返回的結果,然后將解析過后的包分發給各個監聽器(觀察者),而register中就注冊了一個監聽器,比較有意思的是,監聽器被注冊時還加了一個過濾器,這個過濾器的目的是監聽器只接收自己感興趣的內容,這個設計真的很贊。這樣就不必在數據源頭PacketReader中對數據進行過濾了,只要后期擴展自己Packet和自己的過濾器,就能達到排除自己不關心的信息的功能。
Registration registration = new Registration();
PacketFilter packetFilter = new AndFilter(new PacketIDFilter(registration.getPacketID()), new PacketTypeFilter(IQ.class));
其中Registration的類型其實一個IQ的子類,IQ是Packet的子類。
AndFilter是PacketFilter的子類,PacketFilter的種類型有很多,也可以自己擴展,AndFilter就是其中一個、PacketTypeFilter也是、PacketIDFilter也是,
其中PacketTypeFilter的構造方法傳入一個IQ.class,其實就是通過這個類文件來過濾packet,這個PacketTypeFilter就是要設置關心的Packet,這里面它告訴監聽器,只接收類型為IQ的Packet,這些Filter中都有一個關鍵方法,accept(Packet packet).這個accept方法每個Filter的實現方式都不一樣,我們可可以擴展自己的Filter並且重寫這個方法,最有意思的是AndFilter這個類,他的構造方法傳入的是一個動態數組,類型為PacketFilter,你可以傳入你需要的過濾器,將他們當成組合條件使用來過濾Packet,這個就是典型的裝飾設計模式和職責鏈模式的組合使用。
注冊監聽器
1 PacketListener packetListener = new PacketListener() { 2 //這一部分就是監聽器接收到Packet后執行的后續操作 3 public void processPacket(Packet packet) { 4 Log.d("RegisterTask.PacketListener", "processPacket()....."); 5 Log.d("RegisterTask.PacketListener", "packet=" + packet.toXML()); 6 7 if (packet instanceof IQ) { 8 IQ response = (IQ) packet; 9 if (response.getType() == IQ.Type.ERROR) { 10 if (!response.getError().toString().contains("409")) { 11 Log.e(LOGTAG, 12 "Unknown error while registering XMPP account! " 13 + response.getError() 14 .getCondition()); 15 } 16 } else if (response.getType() == IQ.Type.RESULT) { 17 xmppManager.setUsername(newUsername); 18 xmppManager.setPassword(newPassword); 19 Log.d(LOGTAG, "username=" + newUsername); 20 Log.d(LOGTAG, "password=" + newPassword); 21 22 Editor editor = sharedPrefs.edit(); 23 editor.putString(Constants.XMPP_USERNAME, 24 newUsername); 25 editor.putString(Constants.XMPP_PASSWORD, 26 newPassword); 27 editor.commit(); 28 Log 29 .i(LOGTAG, 30 "Account registered successfully"); 31 //執行task 32 xmppManager.runTask(); 33 } 34 } 35 } 36 };
addPacketListener方法傳入一個監聽器和過濾器,看一下內部
/** * Registers a packet listener with this connection. A packet filter determines * which packets will be delivered to the listener. If the same packet listener * is added again with a different filter, only the new filter will be used. * * @param packetListener the packet listener to notify of new received packets. * @param packetFilter the packet filter to use. */ public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { if (packetListener == null) { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); recvListeners.put(packetListener, wrapper); }
可以看到,監聽器和過濾器被 ListenerWrapper 再次封裝,后續的recvListeners這個集合將ListenerWrapper收入囊中,好整個注冊過程完畢,就等待接收信息了,那么發送信息的地方在什么地方呢?分析connect過程時,上面的PacketReader中已經開始循環發送了,代碼如下
listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable
/** * A runnable to notify all listeners of a packet. */ private class ListenerNotification implements Runnable { private Packet packet; public ListenerNotification(Packet packet) { this.packet = packet; } public void run() { for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) { listenerWrapper.notifyListener(packet); } } }
而listenerWrapper的notifyListener(packet)內部,使用了傳入的過濾器對Packet進行了過濾
/** * Notify and process the packet listener if the filter matches the packet. * * @param packet the packet which was sent or received. */ public void notifyListener(Packet packet) { if (packetFilter == null || packetFilter.accept(packet)) { packetListener.processPacket(packet); }
而具體的過濾機制還是轉調了傳入的過濾器本身的過濾方式accept,非常的靈活。過濾完的Packet將被發送出去
這個方法connection.sendPacket(registration);將一個Registration對象發了出去,
public void sendPacket(Packet packet) { if (!isConnected()) { throw new IllegalStateException("Not connected to server."); } if (packet == null) { throw new NullPointerException("Packet is null."); } packetWriter.sendPacket(packet); }
內部轉調的是 packetWriter.sendPacket(packet);以前提到過PacketWirter中有兩個循環機制,其中一個就是在不停的訪問隊列來獲取Packet,而這個sendPacket方法就是將消息寫入隊列中供消費者使用。
/** * Sends the specified packet to the server. * * @param packet the packet to send. */ public void sendPacket(Packet packet) { if (!done) { // Invoke interceptors for the new packet that is about to be sent. Interceptors // may modify the content of the packet. //內部執行了一個發送數據源的動作,也是為某些監聽器對象服務的interceptorWrapper.notifyListener(packet); connection.firePacketInterceptors(packet); try { //將一個Packet對象放入到阻塞隊列中,在上面的witerPacket方法中的wile循環中發送出去 queue.put(packet); } catch (InterruptedException ie) { ie.printStackTrace(); return; } synchronized (queue) { queue.notifyAll(); } // Process packet writer listeners. Note that we're using the sending // thread so it's expected that listeners are fast. connection.firePacketSendingListeners(packet); } }
其實,注冊的過程就是在注冊監聽,這樣在有消息發出時,才可以根據業務需求對消息進行接收和處理。
http://www.cnblogs.com/rioder/archive/2013/01/23/2873176.html