Skip to content
欢迎扫码关注公众号

Spring Boot WebSocket 示例

这是一个简单的 Spring Boot WebSocket 示例,展示了如何在 Spring Boot 中创建一个基本的 WebSocket 服务器。

示例代码

添加 websocket 依赖

除了 spring-boot-starter-web 之外,还需要添加 spring-boot-starter-websocket 依赖。

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

创建 WebSocket 处理器类

这里负责具体的接收和发送消息处理。

java
package me.liujiajia.example.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@Component
public class MyWebSocketHandler extends TextWebSocketHandler {

    private static final Logger log = LoggerFactory.getLogger(MyWebSocketHandler.class);

    private final CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessions.add(session);
        log.info("New session added, session id: {}", session.getId());
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("Received message: {} from session: {}", message.getPayload(), session.getId());
        // 广播消息给所有连接的客户端
        for (WebSocketSession webSocketSession : sessions) {
            if (webSocketSession.isOpen()) {
                webSocketSession.sendMessage(new TextMessage("Echo: " + message.getPayload()));
            }
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        sessions.remove(session);
        log.info("Session closed, session id: {}", session.getId());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        sessions.remove(session);
        log.error("Transport error for session: {}, error: {}", session.getId(), exception.getMessage());
    }

    public void broadcast(String message) {
        sessions.forEach(session -> {
            try {
                session.sendMessage(new TextMessage(message));
            } catch (IOException e) {
                log.error("Failed to send message to session: {}, error: {}", session.getId(), e.getMessage());
            }
        });
    }
}

注册 WebSocket 处理器

暴露 WebSocket 端点,配置允许的源。

java
package me.liujiajia.example.websocket;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private MyWebSocketHandler myWebSocketHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler, "/ws")
                .setAllowedOrigins("*");
    }

}

创建一个向所有 WebSocket 连接发送消息的定时任务

这是用来展示向客户端发送消息的定时任务示例:每 5 秒钟会向所有连接的客户端发送当前的服务器时间。

java
package me.liujiajia.example.websocket;

import jakarta.annotation.Resource;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class BroadcastSeverTimeScheduler {

    @Resource
    private MyWebSocketHandler myWebSocketHandler;

    @Scheduled(fixedRate = 5000)
    public void checkSessions() {
        myWebSocketHandler.broadcast("Server time: " + System.currentTimeMillis());
    }
}

SpringBoot 启动类

因为前面使用了定时任务,所以需要添加 @EnableScheduling 注解。

java
package me.liujiajia.example.websocket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class WebsocketApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebsocketApplication.class, args);
    }

}

前端测试页面

resources/static 目录下创建一个 index.html 文件,并在其中添加以下内容。

本页面会在创建连接后发送一条测试消息,然后在页面中显示接收到的消息。

html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>WebSocket 测试页面</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 20px;
            line-height: 1.6;
        }
        h1, h2 {
            color: #333;
        }
        pre {
            background-color: #f4f4f4;
            padding: 10px;
            border-radius: 5px;
            height: 80px;
            overflow: scroll;
        }
    </style>
</head>
<body>
<h1>WebSocket 测试页面</h1>
<h2>说明</h2>
<p>
    这个页面展示了如何使用 WebSocket 进行简单的通信,并提供如何在浏览器开发者工具中测试 WebSocket 的步骤。
</p>

<h2>WebSocket 示例</h2>
<pre id="log"></pre>

<script>
    const logElement = document.getElementById('log');

    // 创建 WebSocket 连接
    // 本地启动的 WebSocket 服务器由于没有证书需要使用 ws 协议,生产环境需要使用 wss 协议
    const ws = new WebSocket('ws://localhost:8080/ws');

    // 连接打开事件
    ws.onopen = () => {
        log('连接已打开');
        // 发送一个测试消息
        ws.send('Hello, WebSocket!');
    };

    // 接收消息事件
    ws.onmessage = (event) => {
        log(`收到消息: ${event.data}`);
    };

    // 连接关闭事件
    ws.onclose = () => {
        log('连接已关闭');
    };

    // 连接错误事件
    ws.onerror = (error) => {
        log(`发生错误: ${error.message}`);
    };

    // 日志函数
    function log(message) {
        const time = new Date().toLocaleTimeString();
        const logEntry = `${time} - ${message}\n`;
        logElement.textContent += logEntry;
        logElement.scrollTop = logElement.scrollHeight; // 自动滚动到底部
    }
</script>

<h2>在浏览器开发者工具中查看 WebSocket 请求</h2>
<ol>
    <li>
        打开浏览器(例如 Chrome、Firefox 等)。
    </li>
    <li>
        按 <kbd>F12</kbd> 或右键点击页面然后选择“检查”以打开开发者工具。
    </li>
    <li>
        导航到“网络”(Network)标签页。
    </li>
    <li>
        在过滤器中选择“WebSocket”。
    </li>
    <li>
        刷新此页面,你应该能看到与 `ws://localhost:8080/ws` 的 WebSocket 连接。
    </li>
    <li>
        点击该连接以查看详细信息,包括发送和接收的消息。
    </li>
</ol>

</body>
</html>

创建 WebSocket 连接的过程

本段内容由文心一言提供。

  1. TCP 三次握手

    • 在建立 WebSocket 连接之前,客户端和服务器之间首先需要进行 TCP 的三次握手,以确保双方之间的网络连接是可靠的。
  2. 客户端发送 WebSocket 握手请求

    • 握手请求是一个 HTTP 请求,但包含了特殊的头信息以指示这是一个 WebSocket 连接请求。
    • 请求方式必须是 GET,且 HTTP 的版本必须是 1.1。
    • 请求头中必须包含以下字段:
      • Host:指定服务器的主机名。
      • Connection:其值为Upgrade,表示这是一个升级请求。
      • Upgrade:其值为websocket,指定要升级到的协议是 WebSocket。
      • Sec-WebSocket-Key:一个随机生成的 Base64 编码字符串,用于帮助服务器确认连接请求的合法性,并防止恶意攻击。
      • Sec-WebSocket-Version:表示客户端支持的 WebSocket 版本,常见的是 13(目前的标准版本)。
  3. 服务器响应 WebSocket 握手请求

    • 如果服务器支持 WebSocket 协议,并且能够处理客户端的连接请求,它会返回一个包含 101 状态码的 HTTP 响应。
    • 响应头中包含必要的字段来确认升级协议,如:
      • Upgrade:其值为websocket,确认协议被升级到 WebSocket。
      • Connection:其值为Upgrade,指示当前连接已经从 HTTP 协议升级到 WebSocket。
      • Sec-WebSocket-Accept:这是一个通过特定算法(使用Sec-WebSocket-Key和一个预定义的 GUID 值)计算出来的响应值,用于确保客户端和服务器之间的握手是有效且安全的。
  4. 握手完成,WebSocket 连接建立

    • 一旦握手完成,客户端和服务器之间的连接就已经升级为 WebSocket 协议,此时双方可以开始通过 WebSocket 协议进行双向通信,发送和接收消息。

超时时间

调试跟踪的过程中还发现,Upgrade 成功后,读取和写入的超时时间会设置为 -1。

java
public UpgradeProcessorInternal(SocketWrapperBase<?> wrapper, UpgradeToken upgradeToken,
        UpgradeGroupInfo upgradeGroupInfo) {
    super(upgradeToken);
    this.internalHttpUpgradeHandler = (InternalHttpUpgradeHandler) upgradeToken.getHttpUpgradeHandler();
    /*
        * Leave timeouts in the hands of the upgraded protocol.
        */
    wrapper.setReadTimeout(INFINITE_TIMEOUT);
    wrapper.setWriteTimeout(INFINITE_TIMEOUT);

    internalHttpUpgradeHandler.setSocketWrapper(wrapper);

    // HTTP/2 uses RequestInfo objects so does not provide upgradeInfo
    UpgradeInfo upgradeInfo = internalHttpUpgradeHandler.getUpgradeInfo();
    if (upgradeInfo != null && upgradeGroupInfo != null) {
        upgradeInfo.setGroupInfo(upgradeGroupInfo);
    }
}

之后发送消息时会将写入的超时时间设置为 20s(WsRemoteEndpointImplBase.sendMessageBlock)。

java
void sendMessageBlock(CharBuffer part, boolean last) throws IOException {
    long timeout = getBlockingSendTimeout();
    boolean isDone = false;
    while (!isDone) {
        encoderBuffer.clear();
        CoderResult cr = encoder.encode(part, encoderBuffer, true);
        if (cr.isError()) {
            throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.encoderError", cr));
        }
        isDone = !cr.isOverflow();
        encoderBuffer.flip();
        sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last && isDone, timeout);
    }
    stateMachine.complete(last);
}

private long getBlockingSendTimeout() {
    Object obj = wsSession.getUserProperties().get(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY);
    Long userTimeout = null;
    if (obj instanceof Long) {
        userTimeout = (Long) obj;
    }
    if (userTimeout == null) {
        return Constants.DEFAULT_BLOCKING_SEND_TIMEOUT;
    } else {
        return userTimeout.longValue();
    }
}

// Configuration for blocking sends
public static final String BLOCKING_SEND_TIMEOUT_PROPERTY = "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT";

另外如果发布到生产环境时,服务和客户端之间部署了反向代理(如 Nginx),还需要配置反向代理的超时时间。

nginx
http {
    ...
    upstream backend_websocket_servers {
        server 10.0.0.1:8080;
        server 10.0.0.2:8080;
    }

    server {
        ...
        location /ws {
            proxy_pass http://backend_websocket_servers;

            # WebSocket特有的配置,确保升级请求被正确处理
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            proxy_set_header Host $host;
            proxy_cache_bypass $http_upgrade;

            # 设置WebSocket请求的超时时间
            proxy_read_timeout 3600s;  # 读取超时时间设置为3600秒(1小时)
            proxy_send_timeout 3600s;  # 发送超时时间设置为3600秒(1小时)
        }

    }

}

Nginx Ingress 的配置可以参考如下清单 [1]

yaml
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  annotations:
    kubernetes.io/ingress.class: nginx-controller-jiajia
    nginx.org/websocket-services: "svc-jiajia-websocket"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
  name: ingress-www-liujiajia-me-websocket
  namespace: jiajia-test
spec:
  rules:
  - host: www.liujiajia.me
    http:
      paths:
      - backend:
          serviceName: svc-jiajia-websocket
          servicePort: 80
        path: /ws/
        pathType: ImplementationSpecific
  tls:
  - hosts:
    - www.liujiajia.me
    secretName: tls-www-liujiajia-me

  1. Nginx Ingress Controller x WebSocket ↩︎