Springboot中的一些Websocket实现

各种Websocket实现方式

Posted by Momoka7 on July 4, 2024

WebSocket

spring-boot-starter-websocket

spring 官方提供的 websocket 实现,注意无法直接与 socket.io 连接

构建连接端点

使用@ServerEndpoint(value = "/echo")注解来标注端点路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.io.IOException;
import java.time.Instant;

import jakarta.websocket.CloseReason;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ServerEndpoint(value = "/echo")
public class EchoChannel {

    private Session session;

    @OnMessage
    public void onMessage(String message) throws IOException {

        log.info("[websocket] 收到消息:id={},message={}", this.session.getId(), message);

        if (message.equalsIgnoreCase("bye")) {
            // 由服务器主动关闭连接。状态码为 NORMAL_CLOSURE(正常关闭)。
            this.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Bye"));
            return;
        }

        this.session.getAsyncRemote().sendText("[" + Instant.now().toEpochMilli() + "] Hello " + message);
    }

    @OnOpen
    public void onOpen(Session session, EndpointConfig endpointConfig) {
        // 保存 session 到对象
        this.session = session;
        log.info("[websocket] 新的连接:id={}", this.session.getId());
    }

    // 连接关闭
    @OnClose
    public void onClose(CloseReason closeReason) {
        log.info("[websocket] 连接断开:id={},reason={}", this.session.getId(), closeReason);
    }

    // 连接异常
    @OnError
    public void onError(Throwable throwable) throws IOException {

        log.info("[websocket] 连接异常:id={},throwable={}", this.session.getId(), throwable.getMessage());

        // 关闭连接。状态码为 UNEXPECTED_CONDITION(意料之外的异常)
        this.session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
    }
}

注册端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WSConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {

        ServerEndpointExporter exporter = new ServerEndpointExporter();

        // 手动注册 WebSocket 端点
        exporter.setAnnotatedEndpointClasses(EchoChannel.class);

        return exporter;
    }
}

客户端连接

这里使用的是 ws 库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// const { WebSocket } = require("");
const path = require("path");
const { WebSocket } = require("ws");
console.log(path.dirname(__filename));

let websocket = new WebSocket("ws://localhost:8888/echo");

// 连接断开
websocket.onclose = (e) => {
  console.log(`连接关闭: code=${e.code}, reason=${e.reason}`);
};
// 收到消息
websocket.onmessage = (e) => {
  console.log(`收到消息:${e.data}`);
};
// 异常
websocket.onerror = (e) => {
  console.log("连接异常");
  console.error(e);
};
// 连接打开
websocket.onopen = (e) => {
  console.log("连接打开");

  // 创建连接后,往服务器连续写入3条消息
  websocket.send("sprigdoc.cn");
  websocket.send("sprigdoc.cn");
  websocket.send("sprigdoc.cn");

  // 最后发送 bye,由服务器断开连接
  websocket.send("bye");

  // 也可以由客户端主动断开
  // websocket.close();
};

Socket.io

并非完全标准的 websocket 实现,java 可使用 netty-socketio,js 使用 socket.io

注意版本非常重要!!!

配置 Controller

messageSendToUser消息类型,通过socketServer.addEventListener来添加监听器

可配置广播、房间等功能,待探究

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.example.demo;

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Log4j2
public class SocketIOController {

    @Autowired
    private SocketIOServer socketServer;

    SocketIOController(SocketIOServer socketServer) {
        this.socketServer = socketServer;

        this.socketServer.addConnectListener(onUserConnectWithSocket);
        this.socketServer.addDisconnectListener(onUserDisconnectWithSocket);

        /**
         * Here we create only one event listener
         * but we can create any number of listener
         * messageSendToUser is socket end point after socket connection user have to
         * send message payload on messageSendToUser event
         */
        this.socketServer.addEventListener("messageSendToUser", String.class, onSendMessage);

    }

    public ConnectListener onUserConnectWithSocket = new ConnectListener() {
        @Override
        public void onConnect(SocketIOClient client) {
            log.info("Perform operation on user connect in controller");
        }
    };

    public DisconnectListener onUserDisconnectWithSocket = new DisconnectListener() {
        @Override
        public void onDisconnect(SocketIOClient client) {
            log.info("Perform operation on user disconnect in controller");
        }
    };

    public DataListener<String> onSendMessage = new DataListener<String>() {
        @Override
        public void onData(SocketIOClient client, String message, AckRequest acknowledge) throws Exception {

            /**
             * Sending message to target user
             * target user should subscribe the socket event with his/her name.
             * Send the same payload to user
             */

            log.info(" user send message to user " + message);
            // socketServer.getBroadcastOperations().sendEvent("messageSendToUser", client,
            // message);
            client.getNamespace().getBroadcastOperations().sendEvent("messageSendToUser", message);

            /**
             * After sending message to target user we can send acknowledge to sender
             */
            acknowledge.sendAckData("Message send to target user successfully");
        }
    };

}

配置服务器

使用这种方式配置的 socketio 服务器和 springboot 的 ws 不一样,需要额外设置主机和端口,一般为自机 ip 和指定的端口,在配置文件中设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.demo;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.CrossOrigin;

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DisconnectListener;

import jakarta.annotation.PreDestroy;
import lombok.extern.log4j.Log4j2;

@CrossOrigin
@Log4j2
@org.springframework.context.annotation.Configuration
public class SocketIOConfig {

    @Value("${socket.host}")
    private String SOCKETHOST;

    @Value("${socket.port}")
    private int SOCKETPORT;
    private SocketIOServer server;

    @Bean
    public SocketIOServer socketIOServer() {
        Configuration config = new Configuration();
        config.setHostname(SOCKETHOST);
        config.setPort(SOCKETPORT);
        server = new SocketIOServer(config);
        server.start();
        server.addConnectListener(new ConnectListener() {
            @Override
            public void onConnect(SocketIOClient client) {
                log.info("new user connected with socket " + client.getSessionId());
            }
        });

        server.addDisconnectListener(new DisconnectListener() {
            @Override
            public void onDisconnect(SocketIOClient client) {
                client.getNamespace().getAllClients().stream().forEach(data -> {
                    log.info("user disconnected " + data.getSessionId().toString());
                });
            }
        });
        return server;
    }

    @PreDestroy
    public void stopSocketIOServer() {
        this.server.stop();
    }
}

客户端

可通过消息类型来收发消息

1
2
3
4
5
6
7
8
9
10
11
var socket = io.connect("http://127.0.0.1:8889");

socket.on("connect", function () {
  console.log("connected");
});

socket.emit("messageSendToUser", "123"); //发送消息到服务端

socket.on("messageSendToUser", function (data) {
  console.log(data);
});

命名空间

服务端

通过SocketIOServer来添加命名空间

1
2
SocketIOServer socketServer;
this.chatNamespace = socketServer.addNamespace("/chat");

使用命名空间:一样具有添加监听器等方法

1
2
3
4
5
6
7
8
this.chatNamespace.addConnectListener((client) -> {
    log.info("new user connected to chat " + client.getSessionId());
});

this.chatNamespace.addEventListener("chat", String.class, (client, data, ackRequest) -> {
    log.info("user " + client.getSessionId() + " sent " + data);
    this.chatNamespace.getBroadcastOperations().sendEvent("chat", data);
});

客户端

在连接时指定连接命名空间,一个实例对应一个命名空间

默认都会连接到/命名空间

1
2
3
4
5
6
7
import { io } from "socket.io-client";

var socket = io.connect("http://127.0.0.1:8889/chat");

socket.on("connect", function () {
  console.log("connected");
});

广播/房间

广播

对于某个namspace来说,广播到命名空间里的客户端

1
this.chatNamespace.getBroadcastOperations().sendEvent("chat", data);

对于SocketIOServer实例来说,可广播到所有连接客户端

1
this.socketServer.getBroadcastOperations().sendEvent("chat", data);

房间

可再缩小广播的范围,需要加入房间的客户端才会收到消息

1
2
this.chatNamespace.getRoomOperations("chat")
  .sendEvent("chat", data);

由服务端控制客户端加入或离开房间

1
2
client.joinRoom("chat");
client.leaveRoom("chat");

数据传输

服务端

在监听器中指定接受数据类型

对于 Java 内置类型,可以直接直接使用Integer.class、String.class

1
addEventListener("data", String.class, (client, data, ackRequest)=>{}

对于复杂类型,须在服务端定义 entity,字段名和类型需要和客户端发来的 json 一致

在收到事件消息和数据后,数据会被自动封装

1
addEventListener("data", Person.class, (client, data, ackRequest)=>{})

客户端

接受到的数据若为复杂类型,则会以json格式的形式封装好,可以直接使用

1
2
3
4
5
6
socket.on("updatacount", (data) => {//这里data可以是json数据
  if (data.owner != selfCount.owner) {
    otherCount.count = data.count;
    otherText.text = otherCount.count;
  }
});

配置 ipv6 连接地址

1
2
3
4
5
server.port=8888
server.address=::

socket.host = ::
socket.port = 8889

在访问时使用[]将 ipv6 地址包含起来即可

1
http://[fe80::484b:afb2:19b7:3ac9]:8889/

诸如fe80::484b:afb2:19b7:3ac9%10这样的 ipv6 形式

网络接口标识符(Scope ID),用 % 分隔开。Scope ID 标识了 IPv6 地址所属的特定网络接口,这在特定的网络配置中可能会使用到。

在使用 Socket.IO 或其他网络通信库连接 IPv6 地址时,通常不需要在地址后面加上网络接口标识符(Scope ID)

STOMP

是什么

STOMP(Streaming Text Orientated Message Protocol)是应用层协议,可以基于 Websocket 或 TCP,可以像http协议一样定义协议格式(如 Header)。

客户端可以使用 SEND SUBSCRIBE 命令来发送或订阅消息,以及描述消息内容和谁应该收到它的 destination header。这就实现了一个简单的发布-订阅机制,你可以用它来通过 broker 向其他连接的客户端发送消息,或者向服务器发送消息以请求执行某些工作。

使用 Spring 的 STOMP 支持时,Spring WebSocket 应用程序充当客户的 STOMP broker。消息被路由到 @Controller 消息处理方法或简单的内存中 broker,该 broker 跟踪订阅并将消息广播给订阅用户。也可以将 Spring 配置为与专门的 STOMP broker(如 RabbitMQ、ActiveMQ 等)合作,进行消息的实际广播。

使用 STOMP 作为子协议可以让 Spring 框架和 Spring Security 提供更丰富的编程模型,而不是使用原始 WebSockets。

使用

Springboot中,spring-boot-starter-websocket模块原生支持了 Stomp 功能。

配置

消息代理Broker的配置,定义连接端口,前缀等

enableSimpleBroker(“/topic”):启用一个简单的内存消息代理来处理以/topic 为前缀的消息。客户端可以订阅这些消息。

setApplicationDestinationPrefixes(“/app”):设置应用程序目的地前缀。当客户端发送消息到服务器时,消息应以/app 为前缀,后面的部分将映射到控制器的@MessageMapping 注解的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").withSockJS();
    }
}

消息处理器 Controller

@MessageMapping(“/hello”):映射客户端发送到/app/hello 的消息到这个方法。这里的/app 前缀是通过配置类中的 setApplicationDestinationPrefixes(“/app”)配置的。

@SendTo(“/topic/greetings”):将方法的返回值发送到订阅了/topic/greetings 的客户端。

1
2
3
4
5
6
7
8
9
10
@Controller
public class WebSocketController {

    @MessageMapping("/hello")
    @SendTo("/topic/greetings")
    public String greeting(String message) {
        System.out.println("Received message: " + message);
        return "Hello, " + message + "!";
    }
}

前端

安装

1
2
npm install socket-client
npm install stompjs

连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const SockJS = require("sockjs-client");
const Stomp = require("stompjs");

var stompClient = null;
var socket = new SockJS("http://localhost:8080/ws");
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
  console.log("Connected: " + frame);
  setInterval(function () {
    console.log("sending");
    stompClient.send("/app/hello", {}, { a: 1, b: 2 });
  }, 1000);
  stompClient.subscribe("/topic/greetings", function (message) {
    // showMessage(message.body);
    console.log("Received: " + message.body);
  });
});