Tigase 發送消息的流程源碼分析


XMPP 的<message/>節是使用基本的”push”方法來從一個地方到另一個地方得到消息。因為消息通常是不告知的,它們是一種”fire-and-forget”(發射后自尋目的)的機制來從一個地方到另一個地方快速獲取信息
消息節有五種不同的類型,通過 type 屬性來進行區分:例如 chat 類型為 chat 的消息在兩個實體間的實時對話中交換,例如兩個朋友之間的即時通訊聊天。除了 type 屬性外,消息節還包括一個 to 和 from 地址,並且也可以包含一個用於跟蹤目的的 id  屬性(我們在使用更為廣泛的 IQ  節中詳細的討論 IDs)。to  地址是預期接收人的
JabberID,from 地址是發送者的JabberID。from 地址不由發送客戶端提供,而是由發送者的服務器添加郵戳,以避免地址欺騙。
在Tigase中,有兩個重要的組成,一個組件,二是插件,可以去官方網去看下他的架構介紹 https://docs.tigase.net/tigase-server/7.1.4/Development_Guide/html/#writePluginCode
例如最著名的組件的一個例子是MUC或PubSub。在Tigase中,幾乎所有東西實際上都是一個組件:會話管理器、s2s連接管理器、消息路由器等等,組件是根據服務器配置加載的,新的組件可以在運行時加載和激活。您可以輕松地替換組件實現,唯一要做的更改是配置條目中的類名。

Tigase 中定義一個最簡單的消息組件,需要實現MessageReceiver或繼承 extends AbstractMessageReceiver 類, MessageReceiver 的抽象類: AbstractMessageReceiver 子類 :
一、ClientConnectionManager
二、SessionManager
三、 MessageRouter
public void setProperties(Map<String, Object> props){
    for (String name : msgrcv_names) {
        mr = conf.getMsgRcvInstance(name);
        if (mr instanceof MessageReceiver) {
            ((MessageReceiver) mr).setParent(this);
            ((MessageReceiver) mr).start();
        }
    }
}

1、當客戶端發送的message消息到tigase服務端,每個一SOCKET連接都會被包裝成IOService對象,IOService包含一系列操作socket的方法(接收發送數據等),processSocketData()接收網絡數據,由tigase.net處理解析成xml對象,並將packet放到接收隊列receivedPackets中再調用serviceListener.packetsReady(this)。由於ConnectionManager實現IOServiceListener接口,實現上調用的的是ConnectionManager中的packetsReady()來開始處理數據

此時的packet :packetFrom=null,packetTo=null。
 
ClientConnectionManager.processSocketData方法中設置packet的一些屬性:
此時: packetFrom=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, packetTo=sess-man@llooper
ClientConnectionManager.processSocketData(XMPPIOService<Object>serv)
    JID id = serv.getConnectionId(); //c2s@llooper/192.168.0.33_5222_192.168.0.33_38624
    p.setPacketFrom(id); //packetFrom 設置為onnectionId
    p.setPacketTo(serv.getDataReceiver()); //packetTo 設置為sess-man --> SessionManager 
    addOutPacket(p);//將會委托給父 MessageRouter 路由
    
}
//packet 被設置上一些源信息,和目的地信息,接下來,這個數據包將會委托給父 MessageRouter 幫忙路由到 SessionManager組件中進行處理
packet = (tigase.server.Message) from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, DATA=<message xmlns="jabber:client" id="44grM-176" type="chat" to="llooper@llooper"><thread>SWjZv5</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=170, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat
 
packet = from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, DATA=<message to="admin@llooper" type="chat" id="2jePE-253" xmlns="jabber:client"><thread>7VKMRq</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=168, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat
 
2、MessageRouter.processPacket(Packet packet)部分代碼如下:
 
//我們不會處理沒有目標地址的數據包,只是丟棄它們並寫一個日志消息
if (packet.getTo() == null) {
    log.log(Level.WARNING, "Packet with TO attribute set to NULL: {0}", packet);
    return;
}   


//它不是一個服務發現包,我們必須找到一個處理組件
//下面的代碼塊是“快速”找到一個組件if

//這個包TO 組件ID,格式在以下一項:
// 1。組件名+“@”+默認域名
// 2。組件名+“@”+任何虛擬主機名
// 3。組件名+ "."+默認域名
// 4。組件名+ "."+任何虛擬主機名

ServerComponent comp = getLocalComponent(packet.getTo()); //SessionManager
comp.processPacket(packet, results);

 3、SessionManager.processPacket(final Packet packet)處理,有要代碼如下。 例如A->B,這樣做的目的是為了首先確定用戶A有權限發送packet,然后是確定用戶B有權限接收數據。如果用戶B不在線,那么離線消息處理器會把packet保存到數據庫當中。

//XMPPResourceConnection session——用戶會話保存所有用戶會話數據,並提供對用戶數據存儲庫的訪問。它只允許在會話的生命周期內將信息存儲在永久存儲或內存中。如果在分組處理時沒有聯機用戶會話,則此參數可以為空。
XMPPResourceConnection conn = getXMPPResourceConnection(packet);
//現在要走SessionManager的處理函數,主要是走插件流程,插件在Tigase中也是一個重要的組成,入口就是在這里,SM plugin
processPacket(packet, conn);

   插入下SM plugin 流程說明 :

這個設計有一個驚人的結果。如果你看下面的圖片,顯示了兩個用戶之間的通信,你可以看到數據包被復制了兩次才送到最終目的地: 

會話管理器(SessionManager)必須對數據包進行兩次處理。第一次以用戶A的名義將其作為傳出包進行處理,第二次以用戶B的名義將其作為傳入包進行處理。
這是為了確保用戶A有權限發送一個包,所有的processor都應用到packet上,也為了確保用戶B有權限接收packet,所有的processor都應用到packet了。例如,如果用戶B是脫機的,那么有一個脫機消息processor應該將包發送到數據庫,而不是用戶B。
 
protected XMPPResourceConnection getXMPPResourceConnection(Packet p) {
        XMPPResourceConnection conn = null;
        
        //首先根據這個包的發起者,來查找他的連接資源類,找不到則找接收者的資源類
        JID    from = p.getPacketFrom();
        if (from != null) {
            conn = connectionsByFrom.get(from);
            if (conn != null) {
                return conn;
            }
        }

        //這個接收者它可能是這個服務器上某個用戶的消息,讓我們為這個用戶查找已建立的會話
        JID to = p.getStanzaTo();

        if (to != null) {
            if (log.isLoggable(Level.FINEST)) {
                log.finest("Searching for resource connection for: " + to);
            }
            conn = getResourceConnection(to);
        } else {

            // Hm, not sure what should I do now....
            // Maybe I should treat it as message to admin....
            log.log(Level.INFO,
                    "Message without TO attribute set, don''t know what to do wih this: {0}", p);
        }    // end of else

        return conn;
    }
    
    
protected void processPacket(Packet packet, XMPPResourceConnection conn) {

    ...
    packet.setPacketTo(getComponentId()); //sess-man@llooper
    ...

    if (!stop) {
        //授權匹配的processor處理packet
        walk(packet, conn);
        try {
            if ((conn != null) && conn.getConnectionId().equals(packet.getPacketFrom())) {
                handleLocalPacket(packet, conn);
            }
        } catch (NoConnectionIdException ex) {
            ...
        }
    }
    
    ...
}

 

packetTo被設置為組件ID(sess-man@llooper),其值原先也是這個。
其中walk(packet, conn)方法,匹配處理器(授權)。對於message,此處匹配到的processor是amp和message-carbons,message-carbons沒有怎么處理,主要是amp在處理,packet被塞amp的隊列中等待處理。

private void walk(final Packet packet, final XMPPResourceConnection connection) {

        for (XMPPProcessorIfc proc_t : processors.values()) {
            XMPPProcessorIfc processor = proc_t;
            //根據element和xmlns,授權匹配成功的processor
            Authorization    result    = processor.canHandle(packet, connection);

            if (result == Authorization.AUTHORIZED) {
                ....
            
                ProcessingThreads pt = workerThreads.get(processor.id());

                if (pt == null) {
                    pt = workerThreads.get(defPluginsThreadsPool);
                }
                //packet 放到(addItem)授權了的processor的隊列
                if (pt.addItem(processor, packet, connection)) {
                    packet.processedBy(processor.id());
                } else {

                    ...
                }
            } else {
                ...
            }
        }   
    }
WorkerThread.run() 從隊列中取出packet,由SessionManager.process(QueueItem item)給amp處理。
SessionManager.pocess(QueueItem item) 如下:
@Override
public void process(QueueItem item) {
    
    XMPPProcessorIfc processor = item.getProcessor();

    try {
        //由授權的 processor 處理 packet
        processor.process(item.getPacket(), item.getConn(), naUserRepository,local_results, plugin_config.get(processor.id()));
        if (item.getConn() != null) {
            setPermissions(item.getConn(), local_results);
        }
        addOutPackets(item.getPacket(), item.getConn(), local_results);
    } catch (PacketErrorTypeException e) {
        ...
    } catch (XMPPException e) {
        ...
    }
}


//其中processor.process()------> MessageAmp.process(),如下:

@Override
public void process(Packet packet, XMPPResourceConnection session,
        NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException {
    if (packet.getElemName() == "presence") {
        ...
        
    } else {
        Element amp = packet.getElement().getChild("amp", XMLNS);

        if ((amp == null) || (amp.getAttributeStaticStr("status") != null)) {
            messageProcessor.process(packet, session, repo, results, settings);
        } else {
            ...
    }
}

// 其中messageProcessor.process() --------> Message.process(),如下 

@Override
public void process(Packet packet, XMPPResourceConnection session,
        NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException {

    ...
    try {
        ...
        // 在比較JIDs之前,記住要去除資源部分
        id = (packet.getStanzaFrom() != null)
                ? packet.getStanzaFrom().getBareJID()
                : null;

        // 檢查這是否是來自客戶端的數據包
        if (session.isUserId(id)) {
            // 這是來自這個客戶端的數據包,最簡單的操作是轉發到它的目的地:
            // Simple clone the XML element and....
            // ... putting it to results queue is enough
            results.offer(packet.copyElementOnly());

            return;
        }

        
    } catch (NotAuthorizedException e) {
        ...
    }    // end of try-catch
}

 

檢查stanzaFfrom與session匹配通過后,將packet.copyElementOnly()放到results中,作后續投遞,原來的packet 就丟棄了。
此時投遞的packet :packetFrom=null,packetTo=null。
packet在SessionManager.addOutPacket(Packet packet)中判斷packetFrom是否為空,為空則將其設置為ComponentId(此處為sess-man@llooper),然后調用父類(AbstractMessageReceiver.java) 的addOutPacket(packet)方法塞到out_queue 隊列中。
此時packet::packetFrom=sess-man@llooper,packetTo=null。
 

4、上層組件MessageRouter處理,把packet塞到in_queues. 又回到了MessageRouter.processPacket(Packet packet)處理:

 
不同的是 PacketTo為空,packet.getTo()的返回值是stanzaTo。
getLocalComponent(packet.getTo());方法根據stanzaTo與compId、comp name、Component都匹配不到。
此時packet會給組件SessionManager處理,Packet will be processed by: sess-man@llooper,由AbstractMessageReceiver的非阻塞性方法addPacketNB(Packet packet)加入到in_queues。
 
 5、第二次來到SessionManager.processPacket(final Packet packet)處理。不同的是在getXMPPResourceConnection(packet)方法中,
conn = connectionsByFrom.get(from)返回值是null,所以是根據stanzaTo取獲取接收方的session,返回接收方連接的Connection。
protected XMPPResourceConnection getXMPPResourceConnection(Packet p) {
    XMPPResourceConnection conn = null;
    JID                    from = p.getPacketFrom();

    if (from != null) {
        conn = connectionsByFrom.get(from);
        if (conn != null) {
            return conn;
        }
    }

    // It might be a message _to_ some user on this server
    // so let's look for established session for this user...
    JID to = p.getStanzaTo();

    if (to != null) {
        ...
        conn = getResourceConnection(to);
    } else {

        ...
    }    // end of else

    return conn;
}

 

 6、如同步驟3,此時packet作為一個以用戶B的名義將其作為傳入包進行處理。

然后packetTo被設置為組件ID(sess-man@llooper)

此時packet: packetFrom = sess-man@llooper,packetTo =sess-man@llooper。

之后packet又經walk(packet, conn)方法,匹配處理器(授權),扔給amp處理。

 如同前面: 直到Message.process(),如下:
@Override
public void process(Packet packet, XMPPResourceConnection session,
        NonAuthUserRepository repo, Queue<Packet> results, Map<String, Object> settings) throws XMPPException {

    // For performance reasons it is better to do the check
    // before calling logging method.
    if (log.isLoggable(Level.FINEST)) {
        log.log(Level.FINEST, "Processing packet: {0}, for session: {1}", new Object[] {
                packet,
                session });
    }

    // You may want to skip processing completely if the user is offline.
    if (session == null) {
        processOfflineUser( packet, results );
        return;
    }    // end of if (session == null)
    try {

        // Remember to cut the resource part off before comparing JIDs
        BareJID id = (packet.getStanzaTo() != null)
                ? packet.getStanzaTo().getBareJID()
                : null;

        // Checking if this is a packet TO the owner of the session
        if (session.isUserId(id)) {
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "Message 'to' this user, packet: {0}, for session: {1}",
                        new Object[] { packet,
                        session });
            }

            if (packet.getStanzaFrom() != null && session.isUserId(packet.getStanzaFrom().getBareJID())) {
                JID connectionId = session.getConnectionId();
                if (connectionId.equals(packet.getPacketFrom())) {
                    results.offer(packet.copyElementOnly());
                    // this would cause message packet to be stored in offline storage and will not
                    // send recipient-unavailable error but it will behave the same as a message to
                    // unavailable resources from other sessions or servers
                    return;
                }
            }

            // Yes this is message to 'this' client
            List<XMPPResourceConnection> conns = new ArrayList<XMPPResourceConnection>(5);

            // This is where and how we set the address of the component
            // which should rceive the result packet for the final delivery
            // to the end-user. In most cases this is a c2s or Bosh component
            // which keep the user connection.
            String resource = packet.getStanzaTo().getResource();

            if (resource == null) {

                // If the message is sent to BareJID then the message is delivered to
                // all resources
                conns.addAll(getConnectionsForMessageDelivery(session));
            } else {

                // Otherwise only to the given resource or sent back as error.
                XMPPResourceConnection con = session.getParentSession().getResourceForResource(
                        resource);

                if (con != null) {
                    conns.add(con);
                }
            }

            // MessageCarbons: message cloned to all resources? why? it should be copied only
            // to resources with non negative priority!!

            if (conns.size() > 0) {
                for (XMPPResourceConnection con : conns) {
                    Packet result = packet.copyElementOnly();

                    result.setPacketTo(con.getConnectionId());

                    // In most cases this might be skept, however if there is a
                    // problem during packet delivery an error might be sent back
                    result.setPacketFrom(packet.getTo());

                    // Don't forget to add the packet to the results queue or it
                    // will be lost.
                    results.offer(result);
                    if (log.isLoggable(Level.FINEST)) {
                        log.log(Level.FINEST, "Delivering message, packet: {0}, to session: {1}",
                                new Object[] { packet,
                                con });
                    }
                }
            } else {
                // if there are no user connections we should process packet
                // the same as with missing session (i.e. should be stored if
                // has type 'chat'
                processOfflineUser( packet, results );
            }

            return;
        }    // end of else

        // Remember to cut the resource part off before comparing JIDs
        id = (packet.getStanzaFrom() != null)
                ? packet.getStanzaFrom().getBareJID()
                : null;

        // Checking if this is maybe packet FROM the client
        if (session.isUserId(id)) {

            // This is a packet FROM this client, the simplest action is
            // to forward it to is't destination:
            // Simple clone the XML element and....
            // ... putting it to results queue is enough
            results.offer(packet.copyElementOnly());

            return;
        }

        // Can we really reach this place here?
        // Yes, some packets don't even have from or to address.
        // The best example is IQ packet which is usually a request to
        // the server for some data. Such packets may not have any addresses
        // And they usually require more complex processing
        // This is how you check whether this is a packet FROM the user
        // who is owner of the session:
        JID jid = packet.getFrom();

        // This test is in most cases equal to checking getElemFrom()
        if (session.getConnectionId().equals(jid)) {

            // Do some packet specific processing here, but we are dealing
            // with messages here which normally need just forwarding
            Element el_result = packet.getElement().clone();

            // If we are here it means FROM address was missing from the
            // packet, it is a place to set it here:
            el_result.setAttribute("from", session.getJID().toString());

            Packet result = Packet.packetInstance(el_result, session.getJID(), packet
                    .getStanzaTo());

            // ... putting it to results queue is enough
            results.offer(result);
        }
    } catch (NotAuthorizedException e) {
        log.log(Level.FINE, "NotAuthorizedException for packet: " + packet + " for session: " + session, e);
        results.offer(Authorization.NOT_AUTHORIZED.getResponseMessage(packet,
                "You must authorize session first.", true));
    }    // end of try-catch
}

檢查stanzaTo與session匹配通過后,根據session拿到接收方所有的連接(可能多端登陸),然后Packet result = packet.copyElementOnly()生成新的packet(原packet丟棄了),並將packetTo設置為接收方連接的ConnectionId(例如:c2s@llooper/192.168.0.33_5222_192.168.0.33_38624),通過addOutPacket()方法塞到out_queue隊列。
此時packet:packetFrom = sess-man@llooper,packetTo =c2s@llooper/192.168.0.33_5222_192.168.0.33_38624。

7、 如同前面步驟2,不同的是根據packetTo匹配到組件 c2s@llooper

8、 組件 c2s@llooper 從queue中取出packet,分發到目的地

public void processPacket(final Packet packet) {
    ...
    if (packet.isCommand() && (packet.getCommand() != Command.OTHER)) {
        ...
    } else {
        // 把packet 發送給客戶端
        if (!writePacketToSocket(packet)) {

            ...
            
        }
    }    // end of else
}

 

后續有時間會不斷更新,歡迎加入QQ群 310790965 更多的交流

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM