WebSocket API 學習


https://abhirockzz.gitbooks.io/java-websocket-api-handbook/content/

◆WebSocket是什么
簡單來說,WebSocket是一個IETF的標准,RFC6455.具體來說,它是一個基於TCP的協議(就像HTTP)。
你可以把它認為是一個介於長輪詢和服務器推送消息之間的解決方案。WebSocket連接通過HTTP來做初始化握手。
重要:一旦建立了,TCP連接一直打開着。

◆為什么要用WebSocket
它的關鍵特性是
・雙向 服務器端和客戶端都可以發起連接
・全雙工 一旦WebSocket的session建立后,服務器端和客戶端可以同時收發信息。

上述特性使WebSocket非常適合低延遲和高頻度消息傳送需求的程序。比如,聊天,監控,多人在線游戲,廣播實時財務數據等。
其中一些優勢(與其他解決方案相比)包括
・減少冗余信息(相對於HTTP)
・更有效率(相對於長輪詢)
・更豐富的語義(相對於服務器消息推送)

◆WebSocket作為一個java標准
API與實現
對Java來說,這項技術被定義為JSR 356 ,一個在2013年5月發布第一版本的標准API
2014年8月發布了一個1.1版本。像其他JSR(java規范需求)定義的API,WebSocket的java api由一個規范
支持,使它可以有不同的實現而表現一致。
・Tyrus 一個開源項目,並且是Weblogic and GlassFish的參考實現。
・Undertow JBoss EAP and Wildfly使用的實現
・Tomcat 7以上版本 tomcat內部實現

Java EE Platform
JSR 356也是JavaEE7平台的一部分。任何符合JavaEE7的程序服務將包含一個默認的API實現。就像其他JAVAEE技術一樣,
比如EJB, CDI, Security等。

JSR356也被其他容器和框架支持比如Spring,Jetty等。

◆服務器和客戶端模式
・WebSocket Server endpoint
一個服務器端組件是
實現了一些業務邏輯
公開自己,是自己可以被客戶端發現(通過一個主機和端口)
當有客戶端連接到它時觸發

・WebSocket Client endpoint
客戶端組件
實現一些業務邏輯
連接到存在的WebSocket Server endpoint

Java WebSocket API 提供服務器和客戶端組件
Server javax.websocket.server包
Client javax.websocket包 里面也包含了服務器客戶端共通的內容

客戶端組件包括ClientEndpoint, ClientEndpointConfig, ClientEndpointConfig.Builder,
ClientEndpointConfig.Configurator, WebSocketContainer

支持的消息類型
WebSocket規范支持兩種在線數據格式 - 文本和二進制
Java WebSocket API支持這些(顯然),並增加了使用Java對象以及規范定義的健康檢查消息(乒乓)的功能

Text java.lang.String, primitives or their equivalent wrapper classes
Binary java.nio.ByteBuffer or a byte[] (byte array)
Java objects API使您可以在代碼中使用Java對象,並使用自定義變換器(編碼器/解碼器)
將其轉換為WebSocket協議允許的兼容在線格式(文本,二進制)
Ping, Pong javax.websocket.PongMessage是由WebSocket對響應健康檢查(ping)請求發送的確認

◆容器抽象
javax.websocket.WebSocketContainer
提供容器的高級視圖,允許客戶端端點激活和連接到現有(WebSocket)服務器,
設置對服務器和客戶端都適用的全局/公共屬性(空閑連接超時,消息大小,異步發送超時)

javax.websocket.server.ServerContainer
用於服務器端的容器類(繼承了WebSocketContainer) 增加了部署服務器端endpoint的功能

javax.websocket.ContainerProvider
提供了對WebSocketContainer的訪問功能

WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.setDefaultMaxSessionIdleTimeout(60000); //1 min. idle session timeout
container.connectToServer(ChatClient.class, URI.create("ws://letschat:8080")); //connecting to a websocket server

//fetching the ServerContainer instance (from within a Server endpoint)
ServerContainer container = (ServerContainer) session.getContainer();


◆RemoteEndpoint
WebSocket是一個通過兩個對等的個體(客戶端服務器端)交流的協議。
RemoteEndpoint接口是配對的另一端的抽象

Synchronous 阻塞API 用javax.websocket.RemoteEndpoint.Basic執行
Asynchronous 用javax.websocket.RemoteEndpoint.Async控制 調用者不會阻塞,它可以得到一個java.util.concurrent.Future返回
或者提供一個回調實現。

//Getting a handle to the remote endpoint

RemoteEndpoint.Basic basicPeerConnection = session.getBasicRemote();
RemoteEndpoint.Async asyncPeerConnection = session.getAsyncRemote();


◆Endpoint
javax.websocket.Endpoint代表WebSocket endpoint本身,服務器端或者客戶端。
API支持注解或者編程API。
這個類被設計用於擴展(因為它是抽象的),並且適合於程序化樣式優於注釋驅動(聲明性)實現的場景

//a bare bone implementation of a programmatic endpoint

public class ProgrammaticEndpointExample extends Endpoint {
private Session session;
@Override
public void onOpen(Session session, EndpointConfig config) {
this.session = session;
try {
//sends back the session ID to the peer
this.session.getBasicRemote().sendText("Session ID: " + this.session.getId());
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
}


RemoteEndpoint 和 Endpoint 代表不同的視角來看一個Endpoint
RemoteEndpoint也是一個Endpoint
從服務器端看 自身是EndPoint 客戶端是RemoteEndpoint
從客戶端看 反之

◆Session
WebSocket會話(由javax.websocket.Session表示)的概念與HTTP會話的概念沒有很大的不同:
它封裝了兩個端點(對等體)之間的交互,兩個端點之間的會話由連接初始化,消息交換,連接終止,錯誤傳輸等事件組成。
・每個連接對應一個配對
・通過javax.websocket.RemoteEndpoint來允許服務器和客戶端之間交流

//some of the common methods in the Session interface

Set<Session> openSessions = session.getOpenSessions();
boolean isOpen = session.isOpen();
Map<String, String> pathParameters = session.getPathParameters();
Map<String, Object> userProperties = session.getUserProperties();
Principal userPrincipal = session.getUserPrincipal();


◆注解聲明與編程模式
注解聲明 @ServerEndpoint 和 @ClientEndpoint
編程 繼承javax.websocket.Endpoint


◆編碼解碼器
將它們視為由WebSocket API提供的鈎子,它允許您插入您的自定義實現,
該自定義實現負責將Java對象(部分邏輯,域模型等)轉換為WebSocket支持的協議即文本和二進制

Encoder javax.websocket.Encoder Java object to text or binary
Decoder java.websocket.Decoder text or binary to Java object


◆Configuration
配置類只是一些屬性 發布路徑,編碼器,解碼器,擴展,副協議等。
javax.websocket.EndpointConfig
javax.websocket.server.ServerEndpointConfig
javax.websocket.ClientEndpointConfig


◆異步構造
在配對間發消息不一定是一個阻塞調用。可以選擇異步方式。

SendHandler 和 SendResult(javax.websocket) 提供異步消息功能

SendHandler 用於定義消息發送完成或失敗后要做的行為的接口
SendResult 提供消息發送結果的訪問

◆WebSocket Extension
javax.websocket.Extension 代表一個擴展 有名字和參數List屬性
javax.websocket.Extension.Parameter 代表一個擴展參數 有名字和值

◆Exceptions
DeploymentException 在部署服務器端或者客戶端建立連接時發生的異常
DecodeException 在text/binary轉換java對象時發生的異常
EncodeException 在java對象轉換text/binary時發生的異常
SessionException 表示特定的session發生了異常

◆其他
・Path parameters
javax.webocket.PathParam
注入path上的參數到方法參數的注解
//Using @PathParam

@ServerEndpoint(value = "/letschat/{login-id}")
public class WebChatEndpoint {
@OnOpen
public void connected(Session session, @PathParam("login-id") String loggedInUser){
//save the logged in user id
session.getUserProperties().put("USERNAME", loggedInUser);
}
....
}

 

・WebSocket connection handshake
HandshakeRequest和HandshakeResponse的實例(來自javax.websocket pacakge)
允許訪問WebSocket連接的建立(握手)期間的通信狀態


·連接終止的詳細信息
javax.websocket.CloseReason 連接終止的詳細信息
javax.websocket.CloseReason.CloseCode 結束code
javax.websocket.CloseReason.CloseCodes 可以使用的的結束code

//Why did the connection close?

@OnClose
public void disconnected(Session session, CloseReason reason){
String peer = (String) session.getUserProperties().get("USERNAME");
CloseReason.CloseCode closeReasonCode = reason.getCloseCode();
String closeReasonMsg = reason.getReasonPhrase();
System.out.println("User "+ peer + " disconnected. Code: "+ closeReasonCode + ", Message: "+ closeReasonMsg);
}


◆編程模式
・注解
・繼承

注解方式
@ServerEndpoint 定義服務器端 指定uri
@ClientEndpoint 定義客戶端
@OnOpen 定義在連接建立時調用的方法
@OnMessage 定義一個endpoint接收到消息時調用的方法
@OnError 定義出現異常時調用的方法
@OnClose 定義連接關閉時容器調用的方法


//annotated endpoint

@ServerEndpoint("/test/")
public class AnnotatedEndpoint {
@OnOpen
public void onOpenCallback(Session s, EndpointConfig ec){
...
}
@OnMessage
public void onMessageCallback(String messageFromClient){
...
}
@OnClose
public void onCloseCallback(Session s, CloseReason cr){
...
}
@OnError
public void onErrorCallback(Session s, Throwable t){
...
}
}


繼承方式
Endpoint 端點接口
MessageHandler 接收消息時的處理實現接口
MessageHandler.Whole<T> 整條消息接口
MessageHandler.Partial<T> 分段消息接口

//programmatic endpoint

public class ProgrammaticEndpoint extends Endpoint {
@Override
public onOpen(Session session, EndpointConfig config) {
session.addMessageHandler((String s) -> System.out.println("got msg "+ s));
....
}
}


◆發送消息
消息的數據類型可以是 1.文本 2.字節 3.自定義java對象 4.ping/pong消息
消息的發送模式可以分 1.異步 2.同步 3.分段 4.流

同步 Synchronous : 發送消息的客戶端被阻塞直到消息發送過程完成或者異常發生
異步 Asynchronous : 客戶端發出消息立刻被釋放,通過Future對象或者回掉函數來追蹤發送的進程
分段 Partial : 消息被分成幾部分,客戶端必須保持對他們的追蹤並告訴API當所有的部分都發送完了
流 Streaming : 用java的字符流發送消息

・發送文本消息
  同步
   這是最容易理解的方式,使用RemoteEndpoint.Basic 中的public void sendText(String msg)
//synchronous delivery

....
@OnMessage
public void onReceipt(String msg, Session session){
session.getBasicRemote().sendText("got your message ");
}


  異步
   使用RemoteEndpoint.Async
   1.Future對象返回 public Future<Void> sendText(String msg)
   2.提供回調函數 public void sendText(String msg, SendHandler handler)
  
//asynchronous text message delivery

....
@OnMessage
public void onReceipt(String msg, Session session){
Future<Void> deliveryTracker = session.getAsyncRemote().sendText("got your message ");
deliveryTracker.isDone(); //blocks
}

//asynchronous text message delivery using a callback

....
@OnMessage
public void onMsg(String msg, Session session){
session.getAsyncRemote().sendText("got your message ", new SendHandler() {
@Override
public void onResult(SendResult result) {
pushToDB(session.getID(), msg, result.isOK());
}
});
}
....

//Java 8 lambda style

....
session.getAsyncRemote()
.sendText("got your message ",
(SendResult result) -> {pushToDB(session.getId(),msg, result.isOK());}
);
....

  分段
   通過在RemoteEndpoint.Basic接口中使用sendText方法的重載版本可以部分發送消息。 這個過程本質上是同步的

//partial message delivery

....
String partialData = fetch(request);
try {
session.getBasicRemote().sendText(partialData, false); // boolean isLast
} catch (IOException ex) {
throw new RuntimeException(ex);
}
...


  流
   可以將文本(字符)數據流傳輸到RemoteEndpoint.Basic中的public void getSendWriter()提供的java.io.Writer。
   可以使用Writer中任何重載的寫入方法
   流模式是同步的

//streaming strings

....
private Session session;

public void broadcast(String msg){
session.getBasicRemote().getSendWriter().write(msg);
}
....

Text message總結
Synchronous public void sendText(String msg)
Asynchronous public Future<Void> sendText(String msg), void sendText(String msg, SendHandler callback)
Partial public void sendText(String part, boolean isLast)
Streaming public void getSendWriter().write(String msg)

・發送字節消息
  處理(發送)就API而言,二進制數據類似於String。唯一明顯的區別是數據類型-字節是ByteBuffer而文本是String。
  支持的發送模式也相同。
  
  同步
//synchronous delivery of an image

....
public void syncImage(byte[] image, Session session){
ByteBuffer img = ByteBuffer.wrap(image);
session.getBasicRemote().sendBinary(img);
}
....
  
  異步
//asynchronous delivery of an image

....
public void syncLargeImage(byte[] image, Session session){
ByteBuffer img = ByteBuffer.wrap(image);
Future<Void> deliveryProgress = session.getAsyncRemote().sendBinary(img);
boolean delivered = deliveryProgress.isDone(); //blocks until completion or failure
}
....

  分段
//partial delivery of binary data

....
ByteBuffer partialData = fetch(request);
try {
session.getBasicRemote().sendBinary(partialData, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
...
  
  流
   在RemoteEndpoint.Basic上使用getSendStream()方法獲取一個OutputStream,並使用任何重載的寫入方法來傳輸二進制數據
//binary data - streaming style

....
ByteBuffer data = fetch(request);
try {
session.getBasicRemote().getSendStream().write(data.array());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
...

發送字節的總結
Synchronous public void sendBinary(ByteBuffer data)
Asynchronous public Future<Void> sendBinary(ByteBuffer data) ,
public void sendBinary(ByteBuffer data, SendHandler callback)
Partial public void sendBinary(ByteBuffer part, boolean isLast)
Streaming public void getSendStream().write(byte[] data)


・發送java對象
  Synchronous public void sendObject(Object obj)
  Asynchronous public Future<Void> sendObject(Object obj),
   public void sendObject(Object obj, SendHandler callback)
  
....
private Session client;

public void broadcast(String msg) {
Set<String> subscriptions = (Set<String>) client.getUserProperties().get("TICKER_SUBSCRIPTIONS");
StockQuote quote = null; //the Java object
for (String subscription : subscriptions) {
try {
quote = fetchQuote(subscription);
//sending stock quotes with a Java 8 lambda style callback
peer.getAsyncRemote().sendObject(quote,
(SendResult result) -> {audit(session.getId(),quote, result.isOK());}
);
}
catch (Exception e) {
//log and continue...
}
}
}
....

Encoder 上層接口
Encoder.Text<T> java對象T轉換成文本 java.lang.String
Encoder.Binary<T> java對象T轉換成字節 java.nio.ByteBuffer

//encoding a 'StockQuote' Java object to a JSON string

public class StockQuoteJSONEncoder implements Encoder.Text<StockQuote> {
@Override
public void init(EndpointConfig config) {
//for custom initialization logic (details omitted)
}
@Override
public String encode(StockQuote stockQuoteObject) throws EncodeException {
//using the JSON processing API (JSR 353)
return Json.createObjectBuilder()
.add("quote", stockQuoteObject.getQuote())
.add("ticker", stockQuoteObject.getTicker())
.toString();
}
@Override
public void destroy() {
//close resources (details omitted)
}
}

將java對象轉換成流
Encoder.TextStream<T> 轉換java對象並用字符流傳輸 java.io.Writer
Encoder.BinaryStream<T> 轉換java對象並用字節流傳輸 java.io.OutputStream

//sending Java objects as character stream

public class StockQuoteJSONEncoder implements Encoder.TextStream<StockQuote> {
@Override
public void init(EndpointConfig config) {
//for custom initialization logic (details omitted)
}
@Override
public void encode(StockQuote stockQuoteObject, Writer writer) throws EncodeException {
//using the JSON processing API (JSR 353)
String jsonStockQuote = Json.createObjectBuilder()
.add("quote", stockQuoteObject.getQuote())
.add("ticker", stockQuoteObject.getTicker())
.toString();
writer.write(jsonStockQuote);
}
@Override
public void destroy() {
//close resources (details omitted)
}
}


所以如果你要用字節或者字符流發送java對象,你需要為這個java類實現並注冊一個合適的Encoder,
websocket運行時會自動調用。

關於基本類型,websocket實現了默認的轉換,也可以自己寫一個來覆蓋它。


・ping-pong
  ping是請求消息 沒有特定類 byte buffer
  pong是響應消息 javax.websocket.PongMessage 它也可以用作單向心跳消息(不需要ping消息)
  
  他們只是ByteBuffer的字節數據
  不能大於125bytes。只能用來檢測狀態,不應用與業務數據傳輸。
  
  發送方法定義在javax.websocket.RemoteEndpoint中 同步異步相同
Synchronous & Asynchronous
void sendPing(ByteBuffer ping)
void sendPong(ByteBuffer pong)

//sending a ping (health check request)
.....
private Session s;

public void healthCheck(){
s.getBasicRemote().sendPing(ByteBuffer.wrap("health-check".getBytes()));
}
.....

注意點
Ping消息只能發送(不能接收)而Pong可以發送和接收
不需要寫邏輯來明確地返回一個乒乓消息來響應一個ping - Java WebSocket API的實現會為你自動的處理
Pong消息也可以用作自發心跳消息(不僅僅是響應ping)

//sending a pong (as a one-way heart beat)

s.getBasicRemote().sendPong(ByteBuffer.wrap("health-check".getBytes()));


◆異步超時
首先RemoteEndpoint.Async setSendTimeout
其次失敗結果在Future對象或者SendResult中

用SendResult取失敗信息時使用 SendResult.getException()
//bail out if the message is not sent in 1 second

....
public void broadcast(Session s, String msg){
RemoteEndpoint asyncHandle = s.getRemoteAsync();
asyncHandle.setSendTimeout(1000); //1 second
asyncHandle.sendText(msg,
new SendHandler(){
@Override
public void onResult(SendResult result) {
if(!result.isOK()){
System.out.println("Async send failure: "+ result.getException());
}
}
}); //will timeout after 2 seconds
}
....


如果用Future來追蹤完成情況 調用get方法會拋出一個 java.util.concurrent.ExecutionException
//bail out if the message is not sent in 2 seconds

....
public void broadcast(Session s, String msg){
RemoteEndpoint asyncHandle = s.getRemoteAsync();
asyncHandle.setSendTimeout(2000); //2000 ms
Future<Void> tracker = asyncHandle.sendText(msg); //will timeout after 2 seconds
tracker.get(); //will throw java.util.ExecutionException if the process had timed out
}
....


Sending style Text Binary Java object Pong Ping
Synchronous y y y y y
Asynchronous y y y y y
Partial y y n n n
Streaming y y y n n


◆接收消息
從API角度來看,發送消息比接收簡單多了,因為結構簡單。比如javax.websocket.RemoteEndpoint接口
接收消息也有兩種寫法,注解和繼承
・注解 在於傳合適的參數到@OnMessage的方法
・繼承 為javax.websocket.MessageHandler接口做合適的實現來處理接收消息的邏輯

接收模式
・完整 整條消息接收
・分段 這和發送端的分段發送配套使用。如果發送端分段發送,則分段接收。最后一段的標志位last為true
・流 用Java Readers 或 InputStreams形式接收消息


·接收文本消息
·注解
·完整
...
@OnMessage
public void handleChatMsg(String chat) {
System.out.println("Got message - " + chat);
}
...

·分段
...
@OnMessage
public void pushChunk(String partMsg, boolean last) {
String chunkSeq = last ? "intermediate" : "last" ;
System.out.println("Got " + chunkSeq + " chunk - "+ partMsg);
}
...

·流
...
@OnMessage
public void handleChatMsg(Reader charStream) {
System.out.println("reading char stream");
}
...

·繼承
·完整
public class WholeTextMsgHandler extends MessageHandler.Whole<String> {
@Override
public void onMessage(String chat) {
System.out.println("Got message - " + chat);
}
}

·分段
public class PartialTextMsgHandler extends MessageHandler.Partial<String> {
@Override
public void onMessage(String partMsg, boolean last) {
String chunkSeq = last ? "intermediate" : "last" ;
System.out.println("Got " + chunkSeq + " chunk - "+ partMsg);
}
}

·流
public class WholeStreamingTextMsgHandler extends MessageHandler.Whole<Reader> {
@Override
public void onMessage(Reader charStream) {
System.out.println("Got stream message - " + charStream);
}
}


·接收字節
字節數據支持 byte[] (array), java.nio.ByteBuffer 和 java.io.InputStream

·注解
·完整
...
@OnMessage
public void handleImage(ByteBuffer img) {
System.out.println("Got message - " + chat);
}
...

·分段
...
@OnMessage
public void pushChunk(byte[] audioPart, boolean last) {
String chunkSeq = last ? "intermediate" : "last" ;
System.out.println("Got " + chunkSeq + " clip");
}
...

·流
...
@OnMessage
public void handleChatMsg(InputStream binaryStream) {
System.out.println("reading binary stream");
}
...

·繼承
·完整
public class WholeBinaryMsgHandler extends MessageHandler.Whole<byte[]> {
@Override
public void onMessage(byte[] image) {
System.out.println("Got image - " + image.length);
}
}

·分段
public class PartialBinaryMsgHandler extends MessageHandler.Partial<ByteBuffer> {
@Override
public void onMessage(ByteBuffer clip, boolean last) {
String chunkSeq = last ? "intermediate" : "last" ;
System.out.println("Got " + chunkSeq + " chunk");
}
}

·流
public class WholeStreamingBinaryMsgHandler extends MessageHandler.Whole<InputStream> {
@Override
public void onMessage(InputStream binaryStream) {
System.out.println("Got stream binary message");
}
}


◆用java對象接收消息
使用Decoders 將text/binary數據轉換為java對象

Decoder 頂層接口
Decoder.Text<T> 定義將String轉換成T類型java對象的方法
Decoder.Binary<T> 定義將java.nio.ByteBuffer轉換成T類型java對象的方法

public class StockSubscriptionDecoder implements Decoder.Text<Subscription> {

@Override
public Subscription decode(String subscription){
//client sends comma seperated list of subscription e.g. appl,goog,orcl
return new Subscription(Arrarys.asList(subscription.split(",")));
}
@Override
public void willDecode(String subscription){
return subscription!=null && subscription.split(",").length > 0;
}

}

・將Stream轉換為java對象
Decoder.TextStream<T> java.io.Reader 轉換為 java對象
Decoder.BinaryStream<T> java.io.InputStream 轉換為 java對象

public class ConversationDecoder implements Decoder.TextStream<Conversation> {

@Override
//handles new-line delimited content
public Conversation decode(Reader content) {
Conversation conversation = new Conversation();
try(LineNumberReader lineByLineReader = new LineNumberReader(content)){
String line = lineByLineReader.readLine();
while(line != null) {
conversation.add(line);
line = lineByLineReader.readLine();
}
}
return conversation;
}
}

 

◆處理Pong數據
注解
//annotated Pong handler
...
@OnMessage
public void healthCheckCallback(PongMessage pong) {
System.out.println("Pong for Ping! "+ new String(pong.getApplicationData().array());
}
...

繼承
//programmatic Pong handler
public class PongMsgHandler extends MessageHandler.Whole<PongMessage> {
@Override
public void onMessage(PongMessage pong) {
System.out.println("Pong for Ping! "+ new String(pong.getApplicationData().array());
}
}


◆使用MessageHandler
第一步,實現針對特定對象(text/binary whole/partial)的MessageHandler接口
第二步,用Session#addMessageHandler方法注冊到websocket 可以注冊多個實現

//attaching message handlers

public class ProgrammaticEndpoint extends Endpoint {

@Override
public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(new WholeBinaryMsgHandler()); //basic
session.addMessageHandler(String.class, new WholeTextMsgHandler()); //specify class type for Whole message handler
session.addMessageHandler(ByteBuffer.class, new PartialBinaryMsgHandler()); //specify class type for Partial message handler
}
}

◆@OnMessage方法的其他可用參數
除了消息內容本身,@OnMessage方法還能接收以下參數,它們將在運行時自動注入
・帶@javax.websocket.PathParam注解的String參數 個數0到多個
・Session對象
・EndpointConfig對象 (服務器端或客戶端)

public void onMsgCallback(String theMsg, @PathParam("user") String username, Session peer, EndpointConfig condfig){
System.out.println("I have everything I could possibly receive from the WebSocket implementation !");
}

◆java基礎類型
基本類型的decoder有默認實現,也可自定義覆蓋

◆客戶端API
・注解 使用@ClientEndpoint
//annotated client endpoint in action

@ClientEndpoint
public class AnnotatedChatClient {

private ClientEndpointConfig clientConfig;
private String user;
@OnOpen
public void connected(Session session, EndpointConfig clientConfig){
this.clientConfig = (ClientEndpointConfig) clientConfig;
this.user = session.getUserPrincipal().getName();
System.out.println("User " + user + " connected to Chat room");
}
@OnMessage
public void connected(String msg){
System.out.println("Message from chat server: " + msg);
}
@OnClose
public void disconnected(Session session, CloseReason reason){
System.out.println("User "+ user + " disconnected as a result of "+ reason.getReasonPhrase());
}
@OnError
public void disconnected(Session session, Throwable error){
System.out.println("Error communicating with server: " + error.getMessage());
}
}

・繼承 繼承java.websocket.Endpoint類
//a bare bone implementation of a programmatic endpoint

public class WeatherClient extends Endpoint {
private Session session;
@Override
public void onOpen(Session session, EndpointConfig config) {
this.session = session;
try {
//sends back the session ID to the peer
this.session.getBasicRemote().sendText("Session ID: " + this.session.getId());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}

◆用客戶端API連接服務器端
使用javax.websocket.WebSocketContainer接口中的方法
・客戶端類使用注解寫法時
connectToServer(Class<?> annotatedEndpointClass, URI path)
connectToServer(Object annotatedEndpointInstance, URI path) //annotatedEndpointInstance不支持注入
・客戶端類使用繼承寫法時
connectToServer(Class<? extends Endpoint> endpointClass, ClientEndpointConfig cec, URI path)
connectToServer(Endpoint endpointInstance, ClientEndpointConfig cec, URI path) //endpointInstance不支持注入

・注解
用class類型
WebSockerContainer.connectToServer(AnnotatedChatClient.class, URI.create("ws://javaee-chat.com"));

用客戶端實例
WebSockerContainer.connectToServer(new AnnotatedChatClient(), URI.create("ws://javaee-chat.com"));


・繼承
比注解多需要一個ClientEndpointConfig參數
用class類型
WebSockerContainer.connectToServer(WeatherClient.class,
ClientEndpointConfig.Builder.create().build(), //fluent API
URI.create("ws://weather-tracker.com"));

用客戶端實例
WebSockerContainer.connectToServer(new WeatherClient(),
ClientEndpointConfig.Builder.create().build(), //fluent API
URI.create("ws://weather-tracker.com"));

◆配置器 Configuration
EndpointConfig接口的實現 EndpointConfig被ServerEndpointConfig 和 ClientEndpointConfig繼承

◆服務器端配置
ServerEndpointConfig
ServerEndpointConfig.Builder
ServerEndpointConfig.Configurator

・注解寫法的服務器端配置
通過@ServerEndpoint的屬性設置
//annotated server endpoint with all its configuration elements

@ServerEnpdoint(
value = "/chat/",
configurator = ChatEndpointConfigurator.class, //discussed later
decoders = JSONToChatObjectDecoder.class,
encoders = ChatObjectToJSONEncoder.class,
subprotocols = {"chat"}
)
public class ChatServer {
//business logic...
}

EndpointConfig 實例會自動注入到@OnOpen方法的參數
//server endpoint configuration in action

@OnOpen
public void onOpenCallback(Session session, EndpointConfig epConfig){
ServerEndpointConfig serverConfig = (ServerEndpointConfig) epConfig;
Map<String, Object> globalPropertiesMap = serverConfig.getUserProperties();
......
}


・繼承寫法的服務器端配置
需要顯示的編碼
ServerEndpointConfig serverConfig = ServerEndpointConfig.Builder
.create(StockTrackerEndpoint.class , "/pop-stocks/").
.configurator(StockTrackerConfigurator.getInstance()) //discussed later
.decoders(JSONToStockTickerObject.class)
.encoders(StockTickerObjectToJSON.class)
.build();

javax.websocket.Endpoint接口的onOpen方法有ServerEndpointConfig實例參數
public class ProgrammaticChatClient extends Endpoint {
@Override
public void onOpen(Session session, EndpointConfig config){
ServerEndpointConfig serverConfig = (ServerEndpointConfig) epConfig;
.....
}
}

◆客戶端配置
ClientEndPointConfig
ClientEndpointConfig.Builder
ClientEndpointConfig.Configurator

・注解寫法的客戶端配置
@ClientEndpoint(
configurator = ChatClientEndpointConfigurator.class, //discussed later
decoders = JSONToChatObjectDecoder.class,
encoders = ChatObjectToJSONEncoder.class,
subprotocols = {"chat"}
)
public class ChatClient {
//business logic...
}

//server endpoint configuration in action

@OnOpen
public void onOpenCallback(Session session, EndpointConfig epConfig){
ClientEndpointConfig clientConfig = (ClientEndpointConfig) epConfig;
......
}

・繼承寫法的客戶端配置
ClientEndpointConfig cec = ClientEndpointConfig.Builder
.configurator(ChatClientConfigurator.getInstance()) //discussed later
.decoders(JSONToStockTickerObject.class)
.encoders(StockTickerObjectToJSON.class)
.build();

・服務器端配置和客戶端配置區別不大 除了服務端多了path或URI
・EndpointConfig 可以保存實例間共通的屬性 通過getUserProperties()方法 它返回一個map


服務器端配置的設定與取得對應關系
@ServerEndpoint設定屬性 ServerEndpointConfig取得方法 ServerEndpointConfig.Builder設定方法
value getPath() create(Class<?> endpointClass, String path)
configurator getConfigurator() configurator(ServerEndpointConfig.Configurator sec)
decoders getDecoders() decoders(List<Class<? extends Decoder>> decoders)
encoders getEncoders() encoders(List<Class<? extends Encoder>> encoders)
subprotocols getSubprotocols() subprotocols(List<String> subprotocols)
客戶端配置的設定與取得對應關系
@ClientEndpoint 設定屬性 ClientEndpointConfig取得方法 ClientEndpointConfig.Builder設定方法
configurator getConfigurator() configurator(ClientEndpointConfig.Configurator cec)
decoders getDecoders() decoders(List<Class<? extends Decoder>> decoders)
encoders getEncoders() encoders(List<Class<? extends Encoder>> encoders)
subprotocols getPreferredSubprotocols() preferredSubprotocols(List<String> preferredSubprotocols)

◆配置器
可以用於服務器端和客戶端。可以攔截連接的握手階段。它可以做以下的事
1.自定義WebSocket握手過程
2.自定義端點實例的創建過程
3.實現綁定了這個配置器的所有端點的共通邏輯

如果不自定義,會有一個默認的配置器

・服務器端配置器
ServerEndpointConfig.Configurator

Handshake void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response)

endpoint創建 <T> T getEndpointInstance(Class<T> endpointClass)

來源check boolean checkOrigin(String originHeaderValue) // 檢查握手過程中的HTTP Origin header 來做安全確認

子協議匹配 List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested)

擴展匹配 String getNegotiatedSubprotocol(List<String> supported, List<String> requested)

//custom configurator

public class CustomServerEndpointConfigurator extends ServerEndpointConfig.Configurator {

@Override
public <T> T getEndpointInstance(Class<T> endpointClass){
//override the default behavior by providing a 'Singleton'
return (T) StockTickerEndpoint.getInstance();
}

@Override
public boolean checkOrigin(String originHeaderValue){
//just audit this
audit(originHeaderValue);
return true;
}

private String user;

@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response){
//introspect the request headers
System.out.println(request);

//the authenticated user
this.user = request.getUserPrincipal().getName();

}

@Override
public List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested){
//invoke default implementation
return super.getNegotiatedExtensions(installed, requested);
}

@Override
public String getNegotiatedSubprotocol(List<String> supported, List<String> requested){
//invoke default implementation
return super.getNegotiatedSubprotocol(supported, requested);
}
}

//declaring the custom configuration

@ServerEndpoint(value = "/letschat" , configurator = CustomServerEndpointConfigurator.class)
public class AnnotatedServerEndpointExample {
//call back life cycle method(s) implementation...
}

・客戶端配置器
攔截Handshake void afterResponse(HandshakeResponse hr)
void beforeRequest(Map<String,List<String>> headers)
//custom configurator

public class CustomClientEndpointConfigurator extends ClientEndpointConfig.Configurator {

@Override
public void beforeRequest(Map<String,List<String>> headers){
//mutate the header
String token = ...;
headers.put("X-token" , Arrays.asList(token));
}


@Override
public void afterResponse(HandshakeResponse hr){
//introspect the handshake response
System.out.println(hr.getHeaders());
}
}

//declaring the client configuration

@ClientEndpoint(configurator = CustomClientEndpointConfigurator.class)
public class AnnotatedClientEndpointExample {
//call back life cycle method(s) implementation...
}

◆發布
・注解方式的服務器端會自動發布

・繼承方式的服務器端需要用javax.websocket.server.ServerApplicationConfig類編寫代碼
//A 'chat' club using programmatic web socket endpoint style

public class ChatClub extends Endpoint {
....
@Override
public void onOpen(Session joinee, EndpointConfig config) {
System.out.println("Peer " + joinee.getId() + " connected");
joinee.getRemoteBasic().sendText("Welcome to the Chat Club. The first rule of Chat Club is ..... don't talk, just type");
joinee.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
try {
joinee.getBasicRemote().sendText("You sent \n"+message+"\nRead the rulez.. again!");
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
});
}
....
}

//custom implementation to guide the deployment of our programmatic endpoint

public class CustomServerAppConfigProvider implements ServerApplicationConfig {

@Override
public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> endpointClasses) {
Set<ServerEndpointConfig> result = new HashSet<>();
for (Class epClass : endpointClasses) {
if (epClass.equals(ChatClub.class)) {
ServerEndpointConfig sec = ServerEndpointConfig.Builder.create(epClass, "/chatclub").build();
result.add(sec);
}
}
return result;
}

@Override
public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> scanned) {
// we do not have annotated endpoints. if we did, they will not be deployed !
return Collections.emptySet();
}
}

當容器發現有ServerApplicationConfig接口的實現,就會調用它

・要發布繼承方式的端點,必須使用ServerApplicationConfig
・getEndpointConfigs 是發布繼承方式的端點
・getAnnotatedEndpointClasses 是發布注解方式的端點
・上面代碼中getAnnotatedEndpointClasses返回Collections.emptySet()因為我們假設沒有注解方式的端點

只有全部使用注解方式時才不要實現getAnnotatedEndpointClasses

・同時發布注解和繼承方式的服務器端點
getAnnotatedEndpointClasses可以改成
//return ALL the auto-detected (scanned) annotated endpoints which the container will deploy

....
@Override
public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> scanned) {
return scanned;
}
....
這個方法可以控制發布的端點 並不是所有檢測到的端點類都需要發布

◆編程方式的發布API
javax.websocket.server.ServerContainer接口
void addEndpoint(Class<?> endpointClass) // 追加注解的
void addEndpoint(ServerEndpointConfig serverConfig) // 追加繼承的

ServerContainer的實例以不同的方式獲取,具體取決於應用程序是在Servlet(Web容器)還是獨立模式中執行

・Servlet容器中的用法
從javax.servlet.ServletContext的名為"javax.websocket.server.ServerContainer"的屬性來取得ServerContainer的實例
@WebListener
public class ServletCtxBasedDeploymentStrategy implements ServletContextListener {

@Override
public void contextInitialized(ServletContextEvent sce){
//obtain the instance
ServerContainer sc = (ServerContainer) sce.getServletContext().getAttribute("javax.websocket.server.ServerContainer");

//trigger endpoint deployment
deployAnnotatedEndpoint(sc);
deployProgEndpoint(sc);
}

private void deployAnnotatedEndpoint(ServerContainer container) {
container.addEndpoint(StockTicker.class);
container.addEndpoint(WeatherTracker.class);
}

private void deployProgEndpoint(ServerContainer container) {
container.addEndpoint(ServerEndpointConfig.Builder.create(ChatClub.class, "/chatclub").build());
container.addEndpoint(ServerEndpointConfig.Builder.create(RealTimeLocationTracker.class, "/location").build());
}

@Override
public void contextDestroyed(ServletContextEvent sce){}
}

・獨立模式中的用法
ServerContainer的取得取決於特定的容器是如何取得實例的
不能與ServerApplicationConfig的方式混合使用
將忽略重復的端點(要各自的部署技術實現),這是規范要求的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM