背景

在很多实时性需求比较高的场景,例如k线,盘口,用户余额这些场景,我们使用的一般就是两种方式获取数据,前端轮询查询接口或者使用websocket来进行数据获取。

引入依赖

这里就没有把依赖包管理贴出来了,版本自行选择

		<!-- 引入web依赖,排除tomcat -->
		 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 引入undertow容器 -->
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
        </dependency>
        <!-- 引入websocket依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
  • 为啥不使用tomcat呢,因为它对ws的支持不是很好,总会出现一些问题
  • 为啥不用jetty呢,如果我们要使用controller等,和tomcat的差别比较大,不易更改,如果我们同时需要使用websocket与http,那么建议还是使用undertow
    其实这些都不重要,主要就是undertow的ws支持比较好,编码和tomcat差别不大,容易上手

配置编写

1、自定义返回序列化的类

import org.springframework.context.annotation.Configuration;

import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

/**
* 这里的ApiResult是我自定义的响应体,可更换为自己的响应体
* @author : yyds
* @date : 2023/12/4 16:35
*/
@Configuration
public class WebSocketEncoder implements Encoder.Text<ApiResult> {
   @Override
   public String encode(ApiResult apiResult) throws EncodeException {
       // 定义自己的格式化方式
       return null;
   }

   @Override
   public void init(EndpointConfig endpointConfig) {

   }

   @Override
   public void destroy() {

   }
}

2、注入bean

随便在一个类里面注入即可,socket客户端

   @Bean
   public ServerEndpointExporter serverEndpointExporter() {
       return new ServerEndpointExporter();
   }
   
   // 解决spring中socketBean冲突,如果有冲突就加没有就不管
   @Bean
   @Nullable
   public TaskScheduler taskScheduler() {
       ThreadPoolTaskScheduler threadPoolScheduler = new ThreadPoolTaskScheduler();
//        threadPoolScheduler.setThreadNamePrefix("SockJS-");
//        threadPoolScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
//        threadPoolScheduler.setRemoveOnCancelPolicy(true);
       return threadPoolScheduler;
   }

3、实现客户端连接

import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author yyds
 * @date 2023/11/15
 */
@Data
@Slf4j
@Component
@ServerEndpoint(value = "/ws/{userId}",encoders = {WebSocketEncoder.class})
public class WebSocketEndPoint {

    /**
     * 用户ID
     */
    private String userId;

    /**
     * 当前客户端的连接会话, 需要通过它来给客户端发送消息
     */
    private Session session;

    /**
     * WS 会话连接池
     */
    private static ConcurrentHashMap<String, Session> sessionPool = MapUtil.newConcurrentHashMap();

    public static ConcurrentHashMap<String, Session> getSessionPool() {
        return sessionPool;
    }

    private String getSessionId(String userId, Session session) {
        return userId + StrUtil.AT + session.getId();
    }
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        this.userId = userId;
        this.session = session;
        sessionPool.put(getSessionId(userId, session), session);
        log.info("【ws-connect】sessionId:{}, userId:{}, 连接总数:{}", session.getId(), userId, sessionPool.size());
        try {
            sendMessageToOne(userId, WebSocketMessageEnum.CONNECT_SUCCESS.getCode());
        } catch (Exception ex) {
            log.error("【ws】发送连接成功消息异常, userId=" + userId, ex);
        }
    }

    @OnClose
    public void onClose(CloseReason close) {
        sessionPool.remove(getSessionId(userId, session));
        CloseReason.CloseCode closeCode = close.getCloseCode();
        log.info("【ws-close】userId:{}断开,断开code: {}, 是否正常断开:{} 连接总数:{}", this.userId, closeCode, closeCode == CloseReason.CloseCodes.NORMAL_CLOSURE, sessionPool.size());
    }

    @OnMessage
    public void onMessage(String message) {
        if (StrUtil.equals(message, WebSocketMessageEnum.HEALTH_CHECK_PING.getCode())) {
            log.info("【ws-message】收到用户[{}]客户端心跳检测消息:{}", this.userId, message);
            sendMessageToOne(this.userId, WebSocketMessageEnum.HEALTH_CHECK_PONG.getCode());
        } else {
            log.info("【ws-message】收到用户[{}]客户端发来的消息:{}", this.userId, message);
        }
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        log.error("【ws消息】错误" + this.userId, throwable);
    }

    /**
     * 广播消息
     *
     * @param message 消息内容
     */
    public void sendMessageToAll(String message) {
        // log.info("【ws消息】广播消息, 广播人数:{}, message={}", sessionPool.size(), message);
        for (Map.Entry<String, Session> entry : sessionPool.entrySet()) {
            Session session = entry.getValue();
            if (ObjectUtil.isNotNull(session) && session.isOpen()) {
                try {
                    session.getAsyncRemote().sendText(message);
                } catch (Exception ex) {
                    log.error("【ws广播消息错误】", ex);
                }
            }
        }
    }

    /**
     * 单点消息(单人)
     *
     * @param userId  用户ID
     * @param message 消息内容
     */
    public void sendMessageToOne(String userId, String message) {
        log.info("【ws-message-one】userId:{}, message:{}", userId, message);
        Session session = sessionPool.get(userId);
        if (ObjectUtil.isNotNull(session) && session.isOpen()) {
            try {
                session.getAsyncRemote().sendText(message);
            } catch (Exception ex) {
                log.error("【ws单点消息错误】", ex);
            }
        }
    }

    /**
     * 获取在线人数
     */
    public Integer onlineUsers() {
        return sessionPool.size();
    }
}

至此,websocket就已经就可以连接了。

注意

  • 前端同时发送多笔消息的时候,可能会出现问题,建议是收到消息回复后在进行下一步,或者初始化信息的时候,调用接口进行处理
  • 建议采用字符串形式传送,并且添加开始和结束标识符比如 消息内容
  • 前后端都做心跳检测
  • 记得在报错或者其他情况下需要关闭连接,并且移出缓存

查看全部
点赞(190) 打赏

评论列表 共有 0 条评论

暂无评论

微信小程序

微信扫一扫体验

立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部