- 引入相应webflux包
- 实现自定义的请求处理类WebSocketHandler
- 配置url映射关系及WebSocketHandlerAdapter
- 通过页面进行测试
Server
要创建WebSocket服务器,您可以先创建一个MyWebSocketHandler实现WebSocketHandler
。以下示例显示了如何执行此操作:
消息通过getPayloadAsText()
获取内容后,再次获取则为空
package com.example.webflux.handler;
import com.example.webflux.service.TokenService;
import org.eclipse.jetty.util.MultiMap;
import org.eclipse.jetty.util.UrlEncoded;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.reactive.CorsConfigurationSource;
import org.springframework.web.reactive.socket.*;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class MyWebSocketHandler implements WebSocketHandler, CorsConfigurationSource {
@Autowired
private TokenService tokenService;
/**
* 实现自定义的请求处理类WebSocketHandler
*/
@Override
public Mono<Void> handle(WebSocketSession session) {
// 在生产环境中,需对url中的参数进行检验,如token,不符合要求的连接的直接关闭
HandshakeInfo handshakeInfo = session.getHandshakeInfo();
if (handshakeInfo.getUri().getQuery() == null) {
return session.close(CloseStatus.REQUIRED_EXTENSION);
} else {
// 对参数进行解析,在些使用的是jetty-util包
MultiMap<String> values = new MultiMap<String>();
UrlEncoded.decodeTo(handshakeInfo.getUri().getQuery(), values, "UTF-8");
String token = values.getString("token");
boolean isValidate = tokenService.validate(token);
if (!isValidate) {
return session.close();
}
}
Flux<WebSocketMessage> output = session
.receive() //访问入站消息流。
.doOnNext(message ->{
//对每条消息执行一些操作
//对于嵌套的异步操作,您可能需要调用message.retain()使用池化数据缓冲区的基础服务器
WebSocketMessage retain = message.retain();
})
.concatMap(mapper -> {
//执行使用消息内容的嵌套异步操作
String msg = mapper.getPayloadAsText();
System.out.println("mapper: " + msg);
return Flux.just(msg);
})
.map(value -> {
// 创建出站消息,生成组合流
System.out.println("value: " + value);
return session.textMessage("Echo " + value);
});
return session.send(output);
}
/**
* 直接使自定义的WebSocketHandler实现CorsConfigurationSource接口,并返回一个CorsConfiguration
*/
@Override
public CorsConfiguration getCorsConfiguration(ServerWebExchange exchange) {
CorsConfiguration corsConfiguration = new CorsConfiguration();
corsConfiguration.addAllowedOrigin("*");
return corsConfiguration;
}
}
TokenService
: 校验参数,在生产环境需要对每个连接进行校验,符合要求的才允许连接
package com.example.webflux.service;
import org.springframework.stereotype.Service;
@Service
public class TokenService {
// demo演示,在引只对长度做校验
public boolean validate(String token) {
if (token.length() > 5) {
return true;
}
return false;
}
}
WebConfig
: 服务器配置
package com.example.webflux.config;
import com.example.webflux.handler.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class WebConfig {
@Bean
public MyWebSocketHandler getMyWebsocketHandler() {
return new MyWebSocketHandler();
}
@Bean
public HandlerMapping handlerMapping() {
// 对相应的URL进行添加处理器
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/hello", getMyWebsocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
index.html
: 测试代码,内容如下:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<textarea id="msgBoxs"></textarea><br>
待发送消息:<input type="text" id="msg"><input type="button" id="sendBtn" onclick="send()" value="发送">
<script type="application/javascript">
var msgBoxs = document.getElementById("msgBoxs")
var msgBox = document.getElementById("msg")
document.cookie="token2=John Doe";
var ws = new WebSocket("ws://localhost:9000/hello?token=aabb123&demo=1&userType=0")
ws.onopen = function (evt) {
console.log("Connection open ...");
ws.send("Hello WebSocket!");
}
ws.onmessage = function (evt) {
console.log("Received Message: ", evt.data)
var msgs = msgBoxs.value
msgBoxs.innerText = msgs + "\n" + evt.data
msgBoxs.scrollTop = msgBoxs.scrollHeight;
}
ws.onclose = function (evt) {
console.log("Connect closed.");
}
function send() {
var msg = msgBox.value
ws.send(msg)
msgBox.value = ""
}
</script>
</body>
</html>
CORS
- 直接使自定义的WebSocketHandler实现
CorsConfigurationSource
接口,并返回一个CorsConfiguration
public class MyWebSocketHandler implements WebSocketHandler, CorsConfigurationSource {
/**
* 实现自定义的请求处理类WebSocketHandler
*/
@Override
public Mono<Void> handle(WebSocketSession session) {
...
}
/**
* 直接使自定义的WebSocketHandler实现CorsConfigurationSource接口,并返回一个CorsConfiguration
*/
@Override
public CorsConfiguration getCorsConfiguration(ServerWebExchange exchange) {
CorsConfiguration corsConfiguration = new CorsConfiguration();
corsConfiguration.addAllowedOrigin("*");
return corsConfiguration;
}
}
- 可以在
SimpleUrlHandler
上设置corsConfigurations属性