Java消息機制


在長期的Java客戶端開發過程中,一個常用的機制就是消息傳送。無論是同步消息傳送還是異步消息傳送,應該說是建立在Observer設計模式基礎上的。在Java中提供了基於這種模式的Observable/Observer事件框架,分別由java.util.Observable類和java.util.Observer接口組成,其中,Observer是觀察者角色,Observable是被觀察目標(subject)角色。

我們先簡單的看一下這兩個類(接口):Observable是一個封裝了基本功能的類,比如注冊observer(attach功能),注銷observer(detatch功能)等。我們一般只需從Observalbe派生我們自己的觀察者。應該注意的是,Observable必須是“有變化”才觸發通知observer這一任務。即如果我們不主動設置changed屬性為true,將不會有任何變化,也就是說不會有“通知”。因此,設置changed屬性的值是我們應用jdk observer 設計模式的關鍵所在。Observable提供了setChange()來設置changed屬性,符合了“只有observalbe才能直接或間接通知observer”(observable設計模式的)要求。

當然我們的實現中也不一定完全按Observer設計模式來做,也許我們通常會

1. 定義封裝的消息類,作為消息數據的承載體,

2. 定義監聽器,其中定義消息處理方法。

3. 定義消息發送類,增加注冊和通知發送實現

當調用者實現監聽器,並注冊到消息發送類中,就可接收到消息了。這也就是Java的事件發送機制。示例如下:

public class MyObservable extends Observable {
    private String data;

    public void changeValue(String fValue) {
        data = fValue;
        setChanged();
    }
}

public class ObserverTest {
    public static void main(String[] args) {
        MyObservable myOservable = new MyObservable();
        myOservable.addObserver(new Observer() {
            //注冊匿名內部類Observer,當數據改變時將通知該類的update方法
            public void update(Observable o, Object arg) {
                System.out.println("This value has been changed to " + (String) arg);
            }
        });
        String sValue = "Hello Msg";
        myOservable.changeValue(sValue);
        myOservable.notifyObservers(sValue + "!");//數據的改變由observable主動通知給Observer。
    }
}

作為CS結構的消息發送而言,我們還需要再擴充Java的事件發送機制,以實現C和S間的消息傳送。當然C和S之間的消息傳送還需要一些基礎設施。這里我們利用Java RMI結合回調機制來完成C和S之間的消息傳送。我們知道一般RPC間的通訊是同步的,所以我們還可結合采用線程來實現異步調用。

簡單介紹一下RMI,RMI supports two classes that implement the same interface. The first class is the implementation of the behavior, and it runs on the server. The second class acts as a proxy for the remote service and it runs on the client.

The Transport Layer makes the connection between JVMs. All connections are stream-based network connections that use TCP/IP.

On the client side, the RMI Registry is accessed through the static class Naming. It provides the method lookup() that a client uses to query a registry. The method lookup() accepts a URL that specifies the server host name and the name of the desired service. The method returns a remote reference to the service object. The URL takes the form:

 rmi://<host_name> [:<name_service_port>]/<service_name>

再來看一下回調機制,回調實現了被調用一端在接口被調用時也會調用對端的接口。即客戶端C調用服務端S中的某一函數fo,然后S又在某個時候反過來再調用C中的函數fn,對於C來說,這個fn就叫做回調函數。在CS間消息傳送過程中,C端使用回調用實現注冊到服務端。當服務端消息到達時,就會主動通知客戶端接收消息。在CS結構中,按我們以上描述消息傳送就需要經過RMI來實現,一個示例如下(本例參考了網上代碼以說明消息傳送的回調方式):

 首先,定義Client與Server間交互接口:

public interface RemoteIntf extends Remote {
    public void regist(DispTimeIntf client, int second) throws RemoteException;
    public void unregist(DispTimeIntf client) throws RemoteException;
}

實現這個接口,代碼如下:

public class RemoteService extends UnicastRemoteObject implements RemoteIntf {
    //保存客服端遠程對象
    Map<DispTimeIntf, Runnable> clients = new HashMap<DispTimeIntf, Runnable>();

    //必須顯式寫出構造方法並且拋出異常
    public RemoteService() throws RemoteException {
    }

    //服務器端需要持有客戶端的遠程對象(接口),以便回調
    @Override
    public void regist(final DispTimeIntf client, final int seconds) {
        Thread t;
        t = new Thread(new Runnable() {
            @Override
            public void run() {
                Timer t = new Timer();
                TimerTask callback_task = new TimerTask() {
                    public void run() {
                        try {
                            Date now = new Date();
                            DateFormat d = DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.LONG);
                            String time = d.format(now);
                            //調用客戶端應用
                            try {
                                client.dispTime(time);
                            } catch (Exception e) {
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                t.schedule(callback_task, 0, seconds * 1000);
            }
        });
        t.run();
        clients.put(client, t);
    }

    @Override
    public void unregist(DispTimeIntf client) {
        Thread t = (Thread) clients.remove(client);
        t.interrupt();
    }   
}

然后將這個接口實現綁定到端口上,如下:

public class Server {
    public static void main(String[] args) {
        try {
            //創建接受對9001端口的遠程調用的注冊表
            LocateRegistry.createRegistry(9001);
            //創建遠程對象的實例(向客戶端提供的服務)
            final RemoteIntf service = new RemoteService();
            //將提供給客戶端遠端調用的遠程對象根據指定名稱加入遠程調用注冊表
            Naming.bind("//localhost:9001/server", service);
        } catch (final Exception e1) {
            e1.printStackTrace();
        }
    }
}

以上過程是符合RMI開發的一般過程,但注意的是遠程接口中定義的參數DispTimeIntf,這是同樣是一個遠程接口定義,用於服務端調用。定義如下:

public interface DispTimeIntf extends Remote {
    //提供給Server端調用的回調接口
    public String dispTime(String time) throws RemoteException;
}

這個遠程接口的實現,如下:

class DispTimeImpl extends UnicastRemoteObject implements DispTimeIntf {
    protected DispTimeImpl() throws RemoteException {
        super();
    }

    public String dispTime(final String time) throws RemoteException {
        System.err.println("get from Server: " + time);
        return time;
    }
}

最后看客戶端如何調用服務端的RMI服務,如下:

public class Client {
    public static void main(String[] args) {
        try {
            String ip = "127.0.0.1";
            //在指定地址查找遠程對象實例
            RemoteIntf service = (RemoteIntf) Naming.lookup("//" + ip + ":9001/server");
            DispTimeImpl dispTime = new DispTimeImpl();
            service.regist(dispTime, 1);
        } catch (RemoteException e1) {
            e1.printStackTrace();
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
}

這也是一個符合RMI客戶端調用的過程,不同的是在調用服務端服務時,將一個用於回調的遠程接口作為參數傳遞給服務器,服務端將調用回調接口進行消息處理。在RMI環境中,用於遠程回調的接口必須是一個遠程接口。在部署時,其stub和skeleton的部署方向正好相反。

 這只是示例了一個簡單的遠程消息傳送機制,而更復雜的實現就是實現一個消息服務。當我們嘗試實現一個簡單的Java消息服務機制時,還要考慮到一些基本的要素:消息數據類型、消息傳送管道、消息接入點、Topic和filter、消息框架管理。

消息數據類型要定義遠程傳送過程中的消息包的數據結構,包括消息頭和消息體。消息頭主要定義消息的Topic、消息序列、消息類型(定義消息意圖,比如一如命令消息、狀態改變消息、告警警告等)。

消息傳送管道則說明消息傳送機制:一對一或是一對多消息傳送、失效消息或無法傳送到目的地的消息如何進行處理、消息傳送管道故障后如何處理消息等等。

消息接入點要說明應用如何連接到消息服務中來發送或接收消息。消息接入點最關心的是接收消息的流量控制。在消息接收上分為推模型或拉模型,它們是使用輪詢或事件驅動方式來進入消息接入。對消息服務而言,還需考慮消息消費模式:消息分派還是消息獲取,消息的訂閱與過濾機制、消息接入應用是同步處理還是異步處理。

消息框架管理要提供一些簡單的debug或跟蹤手段,進行部署前的測試與調試。

當前有不少消息服務的開源實現,當進行選擇時,考慮這些基本要素,作出合適消息服務選擇去做合適的事情尤為重要。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM