教你如何把openfire的muc聊天室改造為群


openfire群聊與QQ群對比

應該是去年的時候開始接觸openfire,當時在分析后發現基於xmpp協議的openfire已經具備了群聊的功能。也就沒太當回事,覺得加點功能就可以做成類似於QQ群的那種模式。后來仔細了解后才發現並不是這么簡單:

  • muc其實聊天室的形式,房間創建后可以加入聊天,用戶離開就退出聊天室了,並沒有一個用戶固化的功能,所以要單獨為這部分開發
  • muc因為沒有固化的成員關系,所以並沒有1對1聊天的那種離線消息。而且考慮到消息量是群發的原因,所以服務器對於加入聊天室的成員只會推送一定量的消息,當然這個可以通過策略來配置為全部推送。事實上考慮到群聊天的特性,推送指定條數可能是更靠譜的。
  • 還有一些QQ特有的功能,比如邀請進群需要管理員審核之類的管理功能就更少了,這塊都需要擴展實現

改造Openfire群聊天室為群

實際上對於openfire的muc改造來說,持久化成員是第一個重要的工作。我們期望的是這個房間里的人都是固定的成員,這些成員可以離開聊天室,但下次可以進來繼續聊天。其實實現起來也挺簡單的:

基於openfire的實現

  1. 建立數據表,用於保存成員列表
    在openfire里已經有一系列的表用於保存muc相關的數據:
  • ofMucRoom-這個是房間表,保存了聊天室的信息
  • ofMucAffiliation-這個是保存房間里管理員角色人員的表(owner(10)、admin(20)、outcast(40))
  • ofMucMember-這個是房間里成員的列表(對應的是member(30))

這里ofMucAffiliation+ofMucMember保存的數據其實是用於記錄的是用戶權限的,當然會發現其實這已經對應上我們需求上的群成員咯?確實是這樣的。

只不過有一個問題,就是ofMucAffiliation+ofMucMember里只能知道用戶的jid,但是群的話可能每個用戶會修改自己的昵稱,對不對?所以還要加一個表用於保存這種用戶的個性化數據。當然這個表也挺簡單的就不細寫了。

  1. 通過openfire的插件體系增加一個插件,在服務端實現加群、退群等功能
    畢竟xmpp協議里是沒有獲得群列表和房間成員的功能的,以及一些加群、退群的管理功能都沒有,所以要自己開發。這里可以通過openfire的插件體系來做,這樣比較獨立,不影響openfire內核功能。

這塊涉及到寫插件的技術,網上有很多,我就不多說了。

  1. 自己定義一套協議來完成客戶端與服務端的通訊
    因為要走openfire,所以還是要定義xmpp協議,我用的是IQ。考慮到我使用的是smack做的,所以這部分就不再寫了。有興趣或者需要的網上找找IQ協議的寫法就行了。

其他方式

其實這些功能無非就是增刪改查,而且我們添加的功能完成可以獨立於openfire之外,所以自己寫一套也是可以的。比如用web的方式實現也是可以的。

特別是可以設計成rest api,這樣對於端來說是比較友好通用的,兼顧PC、移動端就簡單多了,特別是移動端走http協議總比走長鏈接方便吧。

分析openfire muc群聊歷史消息的實現

簡單的介紹了群的實現,另外一個比較頭痛的問題就是muc離線消息。在openfire里是有類似的支持的,這里就做一些簡單的分析吧。

歷史消息策略HistoryStrategy

因為在openfire里歷史消息推送策略是這樣的,我們看一下它的策略類HistoryStrategy,里面設定了一個枚舉:

/**
 * Strategy type.
 */
public enum Type {
    defaulType, none, all, number;
}

可以看出,其實就是三種:none(不顯示歷史記錄)、all(顯示整個歷史記錄)、number(指定條數記錄)。默認的是number。

策略類會維護一個內存列表,用於給新加入的用戶發送歷史記錄用:

private ConcurrentLinkedQueue<Message> history = new ConcurrentLinkedQueue<>();

實際上自己也可以實現一套策略來代替它,比如將消息存在redis之類。只不過Openfire並沒有提供擴展,只能是修改openfire代碼來實現咯。

歷史消息的保存與維護

歷史消息的保存是在openfire里的MultiUserChatServiceImpl里實現的,它會啟動一個TimerTask,定時的將消息保存到歷史消息表里。下面是定時任務的實現

/**
 * Logs the conversation of the rooms that have this feature enabled.
 */
private class LogConversationTask extends TimerTask {
    @Override
	public void run() {
        try {
            logConversation();
        }
        catch (Throwable e) {
            Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
        }
    }
}

private void logConversation() {
    ConversationLogEntry entry;
    boolean success;
    for (int index = 0; index <= log_batch_size && !logQueue.isEmpty(); index++) {
        entry = logQueue.poll();
        if (entry != null) {
            success = MUCPersistenceManager.saveConversationLogEntry(entry);
            if (!success) {
                logQueue.add(entry);
            }
        }
    }
}

這是具體的保存聊天歷史的代碼,可以看到消息是放在logQueue里的,然后定時任務從里面取一定的條數保存到數據庫存中。MUCPersistenceManager就是數據庫的訪問類。

在start方法里啟動

@Override
public void start() {
    XMPPServer.getInstance().addServerListener( this );

    // Run through the users every 5 minutes after a 5 minutes server startup delay (default
    // values)
    userTimeoutTask = new UserTimeoutTask();
    TaskEngine.getInstance().schedule(userTimeoutTask, user_timeout, user_timeout);
    // Log the room conversations every 5 minutes after a 5 minutes server startup delay
    // (default values)
    logConversationTask = new LogConversationTask();
    TaskEngine.getInstance().schedule(logConversationTask, log_timeout, log_timeout);
    // Remove unused rooms from memory
    cleanupTask = new CleanupTask();
    TaskEngine.getInstance().schedule(cleanupTask, CLEANUP_FREQUENCY, CLEANUP_FREQUENCY);

    // Set us up to answer disco item requests
    XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this);
    XMPPServer.getInstance().getIQDiscoInfoHandler().setServerNodeInfoProvider(this.getServiceDomain(), this);
    XMPPServer.getInstance().getServerItemsProviders().add(this);

    ArrayList<String> params = new ArrayList<>();
    params.clear();
    params.add(getServiceDomain());
    Log.info(LocaleUtils.getLocalizedString("startup.starting.muc", params));
    // Load all the persistent rooms to memory
    for (LocalMUCRoom room : MUCPersistenceManager.loadRoomsFromDB(this, this.getCleanupDate(), router)) {
        rooms.put(room.getName().toLowerCase(), room);
    }
}

這里是聊天室服務啟動的過程,它會啟動LogConversationTask用於定期將聊天記錄保存到庫里。而且這里最后幾句會發現啟動時會從庫里讀取數據(MUCPersistenceManager.loadRoomsFromDB),loadRoomsFromDB實現了讀取Hitory數據到historyStrategy里。

具體的數據保存在ofMucConversationLog表中。

如何推送歷史消息給客戶端

有了歷史消息推送策略和數據,那么怎么樣推送給客戶端呢?這里有一個history協議,在LocalMUCUser處理packet的時候,如果這個packet是Presence,並且帶有history節說明是客戶端發來要歷史記錄的。

在LocalMUCUser.process(Presence packet)里有history消息節的處理代碼,因為代碼太多,就截取關鍵的部分:

// Get or create the room
MUCRoom room = server.getChatRoom(group, packet.getFrom());
// User must support MUC in order to create a room
HistoryRequest historyRequest = null;
String password = null;
// Check for password & requested history if client supports MUC
if (mucInfo != null) {
    password = mucInfo.elementTextTrim("password");
    if (mucInfo.element("history") != null) {
        historyRequest = new HistoryRequest(mucInfo);
    }
}
// The user joins the room
role = room.joinRoom(recipient.getResource().trim(),
        password,
        historyRequest,
        this,
        packet.createCopy());

這里可以看到,先獲取到historyRequest節的信息,然后調用room.joinRoom方法。這里的room.joinRoom就是用戶加入聊天室的關鍵部分。在joinRoom里會發送歷史消息給這個用戶:

 if (historyRequest == null) {
    Iterator<Message> history = roomHistory.getMessageHistory();
    while (history.hasNext()) {
        joinRole.send(history.next());
    }
}
else {
    historyRequest.sendHistory(joinRole, roomHistory);
}

這里會發現有兩種情況,1種是historyRequest為空的情況時,服務端默認按照策略的設置向用戶發送歷史消息。如果不為空,則根據客戶端的請求參數發送。那么這里我們看看historyRequest的實現:

public class HistoryRequest {

	private static final Logger Log = LoggerFactory.getLogger(HistoryRequest.class);
	private static final XMPPDateTimeFormat xmppDateTime = new XMPPDateTimeFormat();

    private int maxChars = -1;
    private int maxStanzas = -1;
    private int seconds = -1;
    private Date since;

    public HistoryRequest(Element userFragment) {
        Element history = userFragment.element("history");
        if (history != null) {
            if (history.attribute("maxchars") != null) {
                this.maxChars = Integer.parseInt(history.attributeValue("maxchars"));
            }
            if (history.attribute("maxstanzas") != null) {
                this.maxStanzas = Integer.parseInt(history.attributeValue("maxstanzas"));
            }
            if (history.attribute("seconds") != null) {
                this.seconds = Integer.parseInt(history.attributeValue("seconds"));
            }
            if (history.attribute("since") != null) {
                try {
                    // parse since String into Date
                    this.since = xmppDateTime.parseString(history.attributeValue("since"));
                }
                catch(ParseException pe) {
                    Log.error("Error parsing date from history management", pe);
                    this.since = null;
                }
            }
        }
    }
    
    /**
     * Returns the total number of characters to receive in the history.
     * 
     * @return total number of characters to receive in the history.
     */
    public int getMaxChars() {
        return maxChars;
    }

    /**
     * Returns the total number of messages to receive in the history.
     * 
     * @return the total number of messages to receive in the history.
     */
    public int getMaxStanzas() {
        return maxStanzas;
    }

    /**
     * Returns the number of seconds to use to filter the messages received during that time. 
     * In other words, only the messages received in the last "X" seconds will be included in 
     * the history.
     * 
     * @return the number of seconds to use to filter the messages received during that time.
     */
    public int getSeconds() {
        return seconds;
    }

    /**
     * Returns the since date to use to filter the messages received during that time. 
     * In other words, only the messages received since the datetime specified will be 
     * included in the history.
     * 
     * @return the since date to use to filter the messages received during that time.
     */
    public Date getSince() {
        return since;
    }

    /**
     * Returns true if the history has been configured with some values.
     * 
     * @return true if the history has been configured with some values.
     */
    private boolean isConfigured() {
        return maxChars > -1 || maxStanzas > -1 || seconds > -1 || since != null;
    }

    /**
     * Sends the smallest amount of traffic that meets any combination of the requested criteria.
     * 
     * @param joinRole the user that will receive the history.
     * @param roomHistory the history of the room.
     */
    public void sendHistory(LocalMUCRole joinRole, MUCRoomHistory roomHistory) {
        if (!isConfigured()) {
            Iterator<Message> history = roomHistory.getMessageHistory();
            while (history.hasNext()) {
                joinRole.send(history.next());
            }
        }
        else {
            Message changedSubject = roomHistory.getChangedSubject();
            boolean addChangedSubject = (changedSubject != null) ? true : false;
            if (getMaxChars() == 0) {
                // The user requested to receive no history
                if (addChangedSubject) {
                    joinRole.send(changedSubject);
                }
                return;
            }
            int accumulatedChars = 0;
            int accumulatedStanzas = 0;
            Element delayInformation;
            LinkedList<Message> historyToSend = new LinkedList<>();
            ListIterator<Message> iterator = roomHistory.getReverseMessageHistory();
            while (iterator.hasPrevious()) {
                Message message = iterator.previous();
                // Update number of characters to send
                String text = message.getBody() == null ? message.getSubject() : message.getBody();
                if (text == null) {
                    // Skip this message since it has no body and no subject  
                    continue;
                }
                accumulatedChars += text.length();
                if (getMaxChars() > -1 && accumulatedChars > getMaxChars()) {
                    // Stop collecting history since we have exceded a limit
                    break;
                }
                // Update number of messages to send
                accumulatedStanzas ++;
                if (getMaxStanzas() > -1 && accumulatedStanzas > getMaxStanzas()) {
                    // Stop collecting history since we have exceded a limit
                    break;
                }

                if (getSeconds() > -1 || getSince() != null) {
                    delayInformation = message.getChildElement("delay", "urn:xmpp:delay");
                    try {
                        // Get the date when the historic message was sent
                        Date delayedDate = xmppDateTime.parseString(delayInformation.attributeValue("stamp"));
                        if (getSince() != null && delayedDate != null && delayedDate.before(getSince())) {
                            // Stop collecting history since we have exceded a limit
                            break;
                        }
                        if (getSeconds() > -1) {
                            Date current = new Date();
                            long diff = (current.getTime() - delayedDate.getTime()) / 1000;
                            if (getSeconds() <= diff) {
                                // Stop collecting history since we have exceded a limit
                                break;
                            }
                        }
                    }
                    catch (Exception e) {
                        Log.error("Error parsing date from historic message", e);
                    }

                }

                // Don't add the latest subject change if it's already in the history.
                if (addChangedSubject) {
                    if (changedSubject != null && changedSubject.equals(message)) {
                        addChangedSubject = false;
                    }
                }

                historyToSend.addFirst(message);
            }
            // Check if we should add the latest subject change.
            if (addChangedSubject) {
                historyToSend.addFirst(changedSubject);
            }
            // Send the smallest amount of traffic to the user
            for (Object aHistoryToSend : historyToSend) {
                joinRole.send((Message) aHistoryToSend);
            }
        }
    }
}

這里面主要是用於約定發送歷史消息的一些參數:

private int maxChars = -1;
private int maxStanzas = -1;
private int seconds = -1;
private Date since;

這是可以設定的幾個參數,具體的對應關系如下面的表格所示

歷史管理屬性

屬性 數據類型 含義
maxchars int 限制歷史中的字符總數為"X" (這里的字符數量是全部 XML 節的字符數, 不只是它們的 XML 字符數據).
maxstanzas int 制歷史中的消息總數為"X".
seconds int 僅發送最后 "X" 秒收到的消息.
since datetime 僅發送從指定日期時間 datetime 之后收到的消息 (這個datatime必須 MUST 符合XMPP Date and Time Profiles 13 定義的DateTime 規則,).

還有sendHistory

當然這里還實現了一個sendHistory方法,也就是針對客戶端提交了查詢要求時的歷史消息發送方法。具體的實現上面的代碼吧。也就是根據歷史管理屬性里設定的幾個參數進行針對性的發送。

但是這里有個關鍵點就是since屬性,它表示客戶端可以設定一個時間戳,服務端根據發送這個時間戳之后的增量數據給客戶端。這個對於客戶端而已還是很有作用的。

實現群離線消息的方法

那么看完了openfire的歷史消息的實現,再來實現離線消息是不是就簡單的多了。群聊天歷史消息有幾個問題:

  • 問題1:群人員龐大歷史消息巨大服務端如何緩存這些歷史數據?比如一個群1000人,一人一天發10條,就有10000條/天,一個月就是30萬,這還只是一個聊天群的,100個群就是3000萬。
  • 問題2:對於群成員而言,可能一個月未登錄,那么可能就要接收這一個月的離線消息,客戶端基本就崩了,網絡流量也很巨大,怎么處理?

利用HistoryStrategy限制服務端推送條數

所以不用舉太多問題,就這兩個就夠了,那么我覺得openfire的這種歷史消息策略中使用number(條數)是很重要的。比如服務器只緩存最近1000條聊天歷史,這樣整體的服務器緩存量就低了。這就解決了第一個問題。

如果群用戶需要查詢歷史上的數據,應該是另開一個服務接口專門用於查詢歷史數據,這樣就不用在剛上線進入群時接收一堆的離線消息。

利用HistoryRequest來獲取增量數據

前面分析HistoryRequest時提到了它可以設置一個時間戳參數,這個是告訴服務端從這個參數之后的歷史消息推送過來。

比如,用戶A昨天晚20:00下的線(最后消息時間戳是2017-06-07 20:00:00),今天早上8:00上線。在用戶A離線的時間里有100條離心線消息記錄。

那么用戶A上線,客戶端發送HistoryRequest(since=2017-06-07 20:00:00),服務器則只發送2017-06-07 20:00:00之后的聊天記錄100條。這樣就實現了增量的消息,對於服務端和客戶端都是友好的。

當然,這里能發多少消息最終還是要看服務端緩存了多少消息用於發送給客戶端,畢竟就像問題2中提出的那樣,用戶可能一個月都不上線,這期間的歷史消息要是都推送那肯定崩了。所以上線時的歷史消息推送這個功能僅適合推送少量的數據。這個在具體的系統設計時應該根據實際情況來設計。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM