Spring Boot WebSocket 示例
这是一个简单的 Spring Boot WebSocket 示例,展示了如何在 Spring Boot 中创建一个基本的 WebSocket 服务器。
示例代码
添加 websocket 依赖
除了 spring-boot-starter-web 之外,还需要添加 spring-boot-starter-websocket 依赖。
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
创建 WebSocket 处理器类
这里负责具体的接收和发送消息处理。
java
package me.liujiajia.example.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
private static final Logger log = LoggerFactory.getLogger(MyWebSocketHandler.class);
private final CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
log.info("New session added, session id: {}", session.getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
log.info("Received message: {} from session: {}", message.getPayload(), session.getId());
// 广播消息给所有连接的客户端
for (WebSocketSession webSocketSession : sessions) {
if (webSocketSession.isOpen()) {
webSocketSession.sendMessage(new TextMessage("Echo: " + message.getPayload()));
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
log.info("Session closed, session id: {}", session.getId());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
sessions.remove(session);
log.error("Transport error for session: {}, error: {}", session.getId(), exception.getMessage());
}
public void broadcast(String message) {
sessions.forEach(session -> {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("Failed to send message to session: {}, error: {}", session.getId(), e.getMessage());
}
});
}
}
注册 WebSocket 处理器
暴露 WebSocket 端点,配置允许的源。
java
package me.liujiajia.example.websocket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private MyWebSocketHandler myWebSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myWebSocketHandler, "/ws")
.setAllowedOrigins("*");
}
}
创建一个向所有 WebSocket 连接发送消息的定时任务
这是用来展示向客户端发送消息的定时任务示例:每 5 秒钟会向所有连接的客户端发送当前的服务器时间。
java
package me.liujiajia.example.websocket;
import jakarta.annotation.Resource;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class BroadcastSeverTimeScheduler {
@Resource
private MyWebSocketHandler myWebSocketHandler;
@Scheduled(fixedRate = 5000)
public void checkSessions() {
myWebSocketHandler.broadcast("Server time: " + System.currentTimeMillis());
}
}
SpringBoot 启动类
因为前面使用了定时任务,所以需要添加 @EnableScheduling
注解。
java
package me.liujiajia.example.websocket;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class WebsocketApplication {
public static void main(String[] args) {
SpringApplication.run(WebsocketApplication.class, args);
}
}
前端测试页面
在 resources/static 目录下创建一个 index.html 文件,并在其中添加以下内容。
本页面会在创建连接后发送一条测试消息,然后在页面中显示接收到的消息。
html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket 测试页面</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
line-height: 1.6;
}
h1, h2 {
color: #333;
}
pre {
background-color: #f4f4f4;
padding: 10px;
border-radius: 5px;
height: 80px;
overflow: scroll;
}
</style>
</head>
<body>
<h1>WebSocket 测试页面</h1>
<h2>说明</h2>
<p>
这个页面展示了如何使用 WebSocket 进行简单的通信,并提供如何在浏览器开发者工具中测试 WebSocket 的步骤。
</p>
<h2>WebSocket 示例</h2>
<pre id="log"></pre>
<script>
const logElement = document.getElementById('log');
// 创建 WebSocket 连接
// 本地启动的 WebSocket 服务器由于没有证书需要使用 ws 协议,生产环境需要使用 wss 协议
const ws = new WebSocket('ws://localhost:8080/ws');
// 连接打开事件
ws.onopen = () => {
log('连接已打开');
// 发送一个测试消息
ws.send('Hello, WebSocket!');
};
// 接收消息事件
ws.onmessage = (event) => {
log(`收到消息: ${event.data}`);
};
// 连接关闭事件
ws.onclose = () => {
log('连接已关闭');
};
// 连接错误事件
ws.onerror = (error) => {
log(`发生错误: ${error.message}`);
};
// 日志函数
function log(message) {
const time = new Date().toLocaleTimeString();
const logEntry = `${time} - ${message}\n`;
logElement.textContent += logEntry;
logElement.scrollTop = logElement.scrollHeight; // 自动滚动到底部
}
</script>
<h2>在浏览器开发者工具中查看 WebSocket 请求</h2>
<ol>
<li>
打开浏览器(例如 Chrome、Firefox 等)。
</li>
<li>
按 <kbd>F12</kbd> 或右键点击页面然后选择“检查”以打开开发者工具。
</li>
<li>
导航到“网络”(Network)标签页。
</li>
<li>
在过滤器中选择“WebSocket”。
</li>
<li>
刷新此页面,你应该能看到与 `ws://localhost:8080/ws` 的 WebSocket 连接。
</li>
<li>
点击该连接以查看详细信息,包括发送和接收的消息。
</li>
</ol>
</body>
</html>
创建 WebSocket 连接的过程
本段内容由文心一言提供。
TCP 三次握手:
- 在建立 WebSocket 连接之前,客户端和服务器之间首先需要进行 TCP 的三次握手,以确保双方之间的网络连接是可靠的。
客户端发送 WebSocket 握手请求:
- 握手请求是一个 HTTP 请求,但包含了特殊的头信息以指示这是一个 WebSocket 连接请求。
- 请求方式必须是 GET,且 HTTP 的版本必须是 1.1。
- 请求头中必须包含以下字段:
Host
:指定服务器的主机名。Connection
:其值为Upgrade
,表示这是一个升级请求。Upgrade
:其值为websocket
,指定要升级到的协议是 WebSocket。Sec-WebSocket-Key
:一个随机生成的 Base64 编码字符串,用于帮助服务器确认连接请求的合法性,并防止恶意攻击。Sec-WebSocket-Version
:表示客户端支持的 WebSocket 版本,常见的是 13(目前的标准版本)。服务器响应 WebSocket 握手请求:
- 如果服务器支持 WebSocket 协议,并且能够处理客户端的连接请求,它会返回一个包含 101 状态码的 HTTP 响应。
- 响应头中包含必要的字段来确认升级协议,如:
Upgrade
:其值为websocket
,确认协议被升级到 WebSocket。Connection
:其值为Upgrade
,指示当前连接已经从 HTTP 协议升级到 WebSocket。Sec-WebSocket-Accept
:这是一个通过特定算法(使用Sec-WebSocket-Key
和一个预定义的 GUID 值)计算出来的响应值,用于确保客户端和服务器之间的握手是有效且安全的。握手完成,WebSocket 连接建立:
- 一旦握手完成,客户端和服务器之间的连接就已经升级为 WebSocket 协议,此时双方可以开始通过 WebSocket 协议进行双向通信,发送和接收消息。
超时时间
调试跟踪的过程中还发现,Upgrade
成功后,读取和写入的超时时间会设置为 -1。
java
public UpgradeProcessorInternal(SocketWrapperBase<?> wrapper, UpgradeToken upgradeToken,
UpgradeGroupInfo upgradeGroupInfo) {
super(upgradeToken);
this.internalHttpUpgradeHandler = (InternalHttpUpgradeHandler) upgradeToken.getHttpUpgradeHandler();
/*
* Leave timeouts in the hands of the upgraded protocol.
*/
wrapper.setReadTimeout(INFINITE_TIMEOUT);
wrapper.setWriteTimeout(INFINITE_TIMEOUT);
internalHttpUpgradeHandler.setSocketWrapper(wrapper);
// HTTP/2 uses RequestInfo objects so does not provide upgradeInfo
UpgradeInfo upgradeInfo = internalHttpUpgradeHandler.getUpgradeInfo();
if (upgradeInfo != null && upgradeGroupInfo != null) {
upgradeInfo.setGroupInfo(upgradeGroupInfo);
}
}
之后发送消息时会将写入的超时时间设置为 20s(WsRemoteEndpointImplBase.sendMessageBlock
)。
java
void sendMessageBlock(CharBuffer part, boolean last) throws IOException {
long timeout = getBlockingSendTimeout();
boolean isDone = false;
while (!isDone) {
encoderBuffer.clear();
CoderResult cr = encoder.encode(part, encoderBuffer, true);
if (cr.isError()) {
throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.encoderError", cr));
}
isDone = !cr.isOverflow();
encoderBuffer.flip();
sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last && isDone, timeout);
}
stateMachine.complete(last);
}
private long getBlockingSendTimeout() {
Object obj = wsSession.getUserProperties().get(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY);
Long userTimeout = null;
if (obj instanceof Long) {
userTimeout = (Long) obj;
}
if (userTimeout == null) {
return Constants.DEFAULT_BLOCKING_SEND_TIMEOUT;
} else {
return userTimeout.longValue();
}
}
// Configuration for blocking sends
public static final String BLOCKING_SEND_TIMEOUT_PROPERTY = "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT";
另外如果发布到生产环境时,服务和客户端之间部署了反向代理(如 Nginx),还需要配置反向代理的超时时间。
nginx
http {
...
upstream backend_websocket_servers {
server 10.0.0.1:8080;
server 10.0.0.2:8080;
}
server {
...
location /ws {
proxy_pass http://backend_websocket_servers;
# WebSocket特有的配置,确保升级请求被正确处理
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
# 设置WebSocket请求的超时时间
proxy_read_timeout 3600s; # 读取超时时间设置为3600秒(1小时)
proxy_send_timeout 3600s; # 发送超时时间设置为3600秒(1小时)
}
}
}
Nginx Ingress 的配置可以参考如下清单 [1]:
yaml
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
annotations:
kubernetes.io/ingress.class: nginx-controller-jiajia
nginx.org/websocket-services: "svc-jiajia-websocket"
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
name: ingress-www-liujiajia-me-websocket
namespace: jiajia-test
spec:
rules:
- host: www.liujiajia.me
http:
paths:
- backend:
serviceName: svc-jiajia-websocket
servicePort: 80
path: /ws/
pathType: ImplementationSpecific
tls:
- hosts:
- www.liujiajia.me
secretName: tls-www-liujiajia-me