Java 控制台程序實現類似廣播功能
服務器端代碼
添加 maven 依賴
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
服務器端代碼
package com.seliote.web.http;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* 每次有 WebSocket 連接請求都會創建一個該類的實例
*/
@ServerEndpoint(value = "/broadcast")
public class SocketServer {
private static final List<Session> onlinePeople = new ArrayList<>();
@OnOpen
public void onOpen(Session aSession) {
System.out.println(System.currentTimeMillis() + ": OnOpen:::" + onlinePeople.size() + 1);
if (!onlinePeople.contains(aSession)) {
onlinePeople.add(aSession);
}
}
// 該方法是用於被動接收信息的
@OnMessage
public void onMessage(Session aSession, String aS) throws IOException {
System.out.println(System.currentTimeMillis() + ": OnMessage:::" + aS);
for (Session session : onlinePeople) {
session.getBasicRemote().sendText(aS);
}
}
// OnMessage 可以有多個不同簽名的
@OnMessage
public void onMessage(Session aSession, InputStream aInputStream) {
System.out.println(System.currentTimeMillis() + ": OnMessage");
// TODO
}
/**
* 每次有客戶端異常關閉該方法也會調用
* @param aSession
* @param aCloseReason
*/
@OnClose
public void onClose(Session aSession, CloseReason aCloseReason) {
System.out.println(System.currentTimeMillis() + ": OnClose:::" + aCloseReason.getReasonPhrase());
if (onlinePeople.contains(aSession)) {
onlinePeople.remove(aSession);
}
}
@OnError
public void onError(Session aSession, Throwable aThrowable) {
System.out.println(System.currentTimeMillis() + ": OnError");
aThrowable.printStackTrace();
}
}
如果連接時需要攜帶客戶端信息,那么可以在路徑中加入參數,如客戶端路徑加入用戶 Token 變為 127.0.0.1/broadcast/123456,服務器端的標注就可改為 @ServerEndpoint(value = "/broadcast/{token}")
,之后的 @OnOpen
方法中就可以有一個 @PathParam("token") String aToken
代表客戶端傳入的 Token
客戶端代碼
添加 maven 依賴,注意這里使用的是 tyrus-standalone-client
而非 javax.websocket-client-api
后者會報錯
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.3.3</version>
<scope>compile</scope>
</dependency>
客戶端代碼
package com.seliote;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;
@ClientEndpoint
public class Demo {
private static Session sSession;
public static void main(String... args) throws URISyntaxException, DeploymentException, IOException {
// https 協議對應使用 wss
URI uri = new URI("ws", "127.0.0.1:8080", "/broadcast", null, null);
// 通過 ContainerProvider 的 static 方法 getWebSocketContainer() 獲得 WebSocketContainer
sSession = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri);
try (Scanner scanner = new Scanner(System.in)) {
String broadcastMsg = "";
while (true) {
broadcastMsg = scanner.nextLine();
// 通過 Session 對象主動發送信息
sSession.getBasicRemote().sendText(broadcastMsg);
//sSession.getBasicRemote().getSendStream().write(....);
}
}
}
@OnOpen
public void onOpen() {
System.out.println(System.currentTimeMillis() + ": OnOpen ");
}
// 該方法是用於被動接收信息的
@OnMessage
public void onMessage(String aS) {
System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS);
}
}
在一個客戶端輸入信息后服務器會及時收到信息並廣播給所有在線的客戶端
------------------------------------------2019.01.09 更新
如果需要支持相應的實體類型,WebSocket 服務器端大概長 這樣,而客戶端配置如下
Maven 依賴(這里用了 JSONObject 而不是服務器端的 Jackson)
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.3.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180813</version>
<scope>compile</scope>
</dependency>
package com.seliote.demo;
/**
* @author seliote
* @date 2019-01-09
* @description WebSocket 信息實體
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public class BroadcastMsg {
private String mSessionId;
private String mTimestamp;
private String mMsg;
public BroadcastMsg() {}
public BroadcastMsg(String aSessionId, String aTimestamp, String aMsg) {
mSessionId = aSessionId;
mTimestamp = aTimestamp;
mMsg = aMsg;
}
public String getSessionId() {
return mSessionId;
}
public void setSessionId(String aSessionId) {
mSessionId = aSessionId;
}
public String getTimestamp() {
return mTimestamp;
}
public void setTimestamp(String aTimestamp) {
mTimestamp = aTimestamp;
}
public String getMsg() {
return mMsg;
}
public void setMsg(String aMsg) {
mMsg = aMsg;
}
@Override
public String toString() {
return mSessionId + " - " + mTimestamp + " - " + mMsg;
}
}
package com.seliote.demo;
import org.json.JSONObject;
import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
/**
* @author seliote
* @date 2019-01-09
* @description BroadcastMsg 用於 WebSocket 的編碼與解碼器
*/
public class BroadcastMsgCoder implements Encoder.BinaryStream<BroadcastMsg>, Decoder.BinaryStream<BroadcastMsg> {
@Override
public void init(EndpointConfig aEndpointConfig) {
}
@Override
public void destroy() {
}
@Override
public void encode(BroadcastMsg aBroadcastMsg, OutputStream aOutputStream) throws IOException {
aOutputStream.write(new JSONObject(aBroadcastMsg).toString().getBytes(StandardCharsets.UTF_8));
}
@Override
public BroadcastMsg decode(InputStream aInputStream) throws IOException {
byte[] buffer = new byte[1024];
int length;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
while ((length = aInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, length);
}
String json = new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
JSONObject jsonObject = new JSONObject(json);
return new BroadcastMsg(
jsonObject.getString("sessionId"),
jsonObject.getString("timestamp"),
jsonObject.getString("msg")
);
}
}
package com.seliote.demo;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.EncodeException;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;
@ClientEndpoint(
encoders = BroadcastMsgCoder.class,
decoders = BroadcastMsgCoder.class
)
public class Demo {
public static void main(String... args) throws URISyntaxException, DeploymentException, IOException {
// https 協議對應使用 wss
URI uri = new URI("ws", "127.0.0.1:8080", "/time/1", null, null);
// 通過 ContainerProvider 的 static 方法 getWebSocketContainer() 獲得 WebSocketContainer
Session session = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri);
try (Scanner scanner = new Scanner(System.in)) {
//noinspection InfiniteLoopStatement
while (true) {
// 通過 Session 對象主動發送信息
try {
String msg = scanner.nextLine();
BroadcastMsg broadcastMsg = new BroadcastMsg(
session.getId(),
System.currentTimeMillis() + "",
msg
);
session.getBasicRemote().sendObject(broadcastMsg);
} catch (EncodeException exp) {
exp.printStackTrace();
}
//sSession.getBasicRemote().getSendStream().write(....);
}
}
}
@OnOpen
public void onOpen() {
System.out.println(System.currentTimeMillis() + ": OnOpen ");
}
@OnMessage
public void onMessage(String aS) {
System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS);
}
@OnMessage
public void onMessage(BroadcastMsg aBroadcastMsg) {
System.out.println(aBroadcastMsg);
}
}