8

跟着源码一起学:手把手教你用WebSocket打造Web端IM聊天

 3 years ago
source link: http://www.blogjava.net/jb2011/archive/2021/04/06/435844.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Jack Jiang

我的最新工程MobileIMSDK:http://git.oschina.net/jackjiang/MobileIMSDK
posts - 225, comments - 13, trackbacks - 0, articles - 0

本文作者芋艿,原题“芋道 Spring Boot WebSocket 入门”,本次有修订和改动。

WebSocket如今在Web端即时通讯技术应用里使用广泛,不仅用于传统PC端的网页里,也被很多移动端开发者用于基于HTML5的混合APP里。对于想要在基于Web的应用里添加IM、推送等实时通信功能,WebSocket几乎是必须要掌握的技术。

本文将基于Tomcat和Spring框架实现一个逻辑简单的入门级IM应用,对于即时通讯初学者来说,能找到一个简单直接且能顺利跑通的实例代码,显然意义更大,本文正是如此。希望能给你的IM开发和学习带来启发。

注:源码在本文第四、五节开头的附件处可下载。

学习交流:

- 即时通讯/推送技术开发交流5群:215477170 [推荐]

- 移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM

- 开源IM框架源码:https://github.com/JackJiang2011/MobileIMSDK

(本文同步发布于:http://www.52im.net/thread-3483-1-1.html

二、知识准备

如果你对Web端即时通讯知识一头雾水,务必先读:《新手入门贴:史上最全Web端即时通讯技术原理详解》、《Web端即时通讯技术盘点:短轮询、Comet、Websocket、SSE》。

限于篇幅,本文不会深究WebSocket技术理论,如有兴趣请从基础学习:

如果想要更硬核一点的,可以读读下面这几篇:

三、内容概述

相比 HTTP 协议来说,WebSocket 协议对大多数后端开发者是比较陌生的。

相对而言:WebSocket 协议重点是提供了服务端主动向客户端发送数据的能力,这样我们就可以完成实时性较高的需求。例如:聊天 IM 即使通讯功能、消息订阅服务、网页游戏等等。

同时:因为 WebSocket 使用 TCP 通信,可以避免重复创建连接,提升通信质量和效率。例如:美团的长连接服务,具体可以看看 《美团点评的移动端网络优化实践:大幅提升连接成功率、速度等》 。

友情提示:

这里有个误区,WebSocket 相比普通的 Socket 来说,仅仅是借助 HTTP 协议完成握手,创建连接。后续的所有通信,都和 HTTP 协议无关。

看到这里,大家一定以为又要开始哔哔 WebSocket 的概念。哈哈,我偏不~如果对这块不了的朋友,可以阅读本文“2、知识准备”这一章。

要想使用WebSocket,一般有如下几种解决方案可选:

目前笔者手头有个涉及到 IM 即使通讯的项目,采用的是方案三。

主要原因是:我们对 Netty 框架的实战、原理与源码,都相对熟悉一些,所以就考虑了它。并且,除了需要支持 WebSocket 协议,我们还想提供原生的 Socket 协议。

如果仅仅是仅仅提供 WebSocket 协议的支持,可以考虑采用方案一或者方案二,在使用上,两个方案是比较接近的。相比来说,方案一 Spring WebSocket 内置了对 STOMP 协议的支持。

不过:本文还是采用方案二“Tomcat WebSocket”来作为入门示例。咳咳咳,没有特殊的原因,主要是开始写本文之前,已经花了 2 小时使用它写了一个示例。实在是有点懒,不想改。如果能重来,我要选李白,哈哈哈哈~

当然,不要慌,方案一和方案二的实现代码,真心没啥差别。

在开始搭建 Tomcat WebSocket 入门示例之前,我们先来了解下 JSR-356 规范,定义了 Java 针对 WebSocket 的 API :即 Javax WebSocket 。规范是大哥,打死不会提供实现,所以 JSR-356 也是如此。目前,主流的 Web 容器都已经提供了 JSR-356 的实现,例如说 Tomcat、Jetty、Undertow 等等。

四、Tomcat WebSocket 实战入门

4.1、基本介绍

示例代码下载:

(因附件无法上传到此处,请从同步链接处下载:http://www.52im.net/thread-3483-1-1.html

代码目录内容是这样: 

在本小节中,我们会使用 Tomcat WebSocket 搭建一个 WebSocket 的示例。

提供如下消息的功能支持:

  • 1)身份认证请求;
  • 2)私聊消息;
  • 3)群聊消息。

考虑到让示例更加易懂,我们先做成全局有且仅有一个大的聊天室,即建立上 WebSocket 的连接,都自动动进入该聊天室。

下面,开始遨游 WebSocket 这个鱼塘...

4.2、引入依赖

在 pom.xml 文件中,引入相关依赖。

<?xml version="1.0"encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 [url=http://maven.apache.org/xsd/maven-4.0.0.xsd]http://maven.apache.org/xsd/maven-4.0.0.xsd[/url]">

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.1.10.RELEASE</version>

        <relativePath/> <!-- lookup parent from repository -->

    </parent>

    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-25-01</artifactId>

    <dependencies>

        <!-- 实现对 WebSocket 相关依赖的引入,方便~ -->

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-websocket</artifactId>

        </dependency>

        <!-- 引入 Fastjson ,实现对 JSON 的序列化,因为后续我们会使用它解析消息 -->

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>fastjson</artifactId>

            <version>1.2.62</version>

        </dependency>

    </dependencies>

</project>

具体每个依赖的作用,自己认真看下注释。

4.3、WebsocketServerEndpoint

在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路径下,创建 WebsocketServerEndpoint 类,定义 Websocket 服务的端点(EndPoint)。

代码如下:

// WebsocketServerEndpoint.java

@Controller

@ServerEndpoint("/")

public class WebsocketServerEndpoint {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @OnOpen

    public void onOpen(Session session, EndpointConfig config) {

        logger.info("[onOpen][session({}) 接入]", session);

    @OnMessage

    public void onMessage(Session session, String message) {

        logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别

    @OnClose

    public void onClose(Session session, CloseReason closeReason) {

        logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);

    @OnError

    public void onError(Session session, Throwable throwable) {

        logger.info("[onClose][session({}) 发生异常]", session, throwable);

如代码所示:

  • 1)在类上,添加 @Controller 注解,保证创建一个 WebsocketServerEndpoint Bean;
  • 2)在类上,添加 JSR-356 定义的 @ServerEndpoint 注解,标记这是一个 WebSocket EndPoint ,路径为 / ;
  • 3)WebSocket 一共有四个事件,分别对应使用 JSR-356 定义的 @OnOpen@OnMessage@OnClose@OnError 注解。

这是最简版的 WebsocketServerEndpoint 的代码。在下文,我们会慢慢把代码补全。

4.4、WebSocketConfiguration

在 cn.iocoder.springboot.lab24.springwebsocket.config 包路径下,创建 WebsocketServerEndpoint 配置类。

代码如下:

// WebSocketConfiguration.java

@Configuration

// @EnableWebSocket // 无需添加该注解,因为我们并不是使用 Spring WebSocket

public class WebSocketConfiguration {

    @Bean

    public ServerEndpointExporter serverEndpointExporter() {

        return new ServerEndpointExporter();

PS:在 #serverEndpointExporter() 方法中,创建 ServerEndpointExporter Bean 。该 Bean 的作用,是扫描添加有 @ServerEndpoint 注解的 Bean 。

4.5、Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。

代码如下:

// Application.java

@SpringBootApplication

public class Application {

    public static void main(String[] args) {

        SpringApplication.run(Application.class, args);

执行 Application 启动该示例项目。

考虑到大家可能不会或者不愿意写前端代码,所以我们直接使用 WebSocket在线测试工具,测试 WebSocket 连接。

如下图:

至此,最简单的一个 WebSocket 项目的骨架,我们已经搭建完成。下面,我们开始改造,把相应的逻辑补全。

4.6、消息

在 HTTP 协议中,是基于 Request/Response 请求响应的同步模型,进行交互。在 Websocket 协议中,是基于 Message 消息的异步模型,进行交互。这一点,是很大的不同的,等会看到具体的消息类,感受会更明显。

因为 WebSocket 协议,不像 HTTP 协议有 URI 可以区分不同的 API 请求操作,所以我们需要在 WebSocket 的 Message 里,增加能够标识消息类型,这里我们采用 type 字段。

所以在这个示例中,我们采用的 Message 采用 JSON 格式编码。

格式如下:

    type: "", // 消息类型

    body: {} // 消息体

解释一下:

  • 1)type 字段,消息类型。通过该字段,我们知道使用哪个 MessageHandler 消息处理器(关于 MessageHandler ,我们在下一节中,详细解析);
  • 2)body 字段,消息体。不同的消息类型,会有不同的消息体;
  • 3)Message 采用 JSON 格式编码,主要考虑便捷性,实际项目下,也可以考虑 Protobuf 等更加高效且节省流量的编码格式。

实际上:我们在该示例中,body 字段对应的 Message 相关的接口和类,实在想不到名字了。所有的 Message 们,我们都放在 cn.iocoder.springboot.lab25.springwebsocket.message 包路径下。

4.6.1 Message

创建 Message 接口,基础消息体,所有消息体都要实现该接口。

代码如下:

// Message.java

publicinterfaceMessage {

目前作为一个标记接口,未定义任何操作。

4.6.2 认证相关 Message

创建 AuthRequest 类,用户认证请求。

代码如下:

// AuthRequest.java

public class AuthRequest implements Message {

    public static final String TYPE = "AUTH_REQUEST";

     * 认证 Token

    private String accessToken;

    // ... 省略 set/get 方法

解释一下:

  • 1)TYPE 静态属性,消息类型为 AUTH_REQUEST 。
  • 2)accessToken 属性,认证 Token 。

对于第2)点,在 WebSocket 协议中,我们也需要认证当前连接,用户身份是什么。一般情况下,我们采用用户调用 HTTP 登录接口,登录成功后返回的访问令牌 accessToken 。这里,我们先不拓展开讲,事后可以看看 《基于 Token 认证的 WebSocket 连接》 文章。

虽然说,WebSocket 协议是基于 Message 模型,进行交互。但是,这并不意味着它的操作,不需要响应结果。例如说,用户认证请求,是需要用户认证响应的。所以,我们创建 AuthResponse 类,作为用户认证响应。

代码如下:

// AuthResponse.java

public class AuthResponse implements Message {

    public static final String TYPE = "AUTH_RESPONSE";

     * 响应状态码

    private Integer code;

     * 响应提示

    private String message;

    // ... 省略 set/get 方法

解释一下:

  • 1)TYPE 静态属性,消息类型为 AUTH_REQUEST ;
  • 2)code 属性,响应状态码;
  • 3)message 属性,响应提示。

对于第1)点,实际上,我们在每个 Message 实现类上,都增加了 TYPE 静态属性,作为消息类型。下面,我们就不重复赘述了。

在本示例中,用户成功认证之后,会广播用户加入群聊的通知 Message ,使用 UserJoinNoticeRequest 。

代码如下:

// UserJoinNoticeRequest.java

public class UserJoinNoticeRequest implements Message {

    public static final String TYPE = "USER_JOIN_NOTICE_REQUEST";

    private String nickname;

    // ... 省略 set/get 方法

实际上,我们可以在需要使用到 Request/Response 模型的地方,将 Message 进行拓展:

  • 1)Request 抽象类,增加 requestId 字段,表示请求编号;
  • 2)Response 抽象类,增加 requestId 字段,和每一个 Request 请求映射上(同时,里面统一定义 code 和 message 属性,表示响应状态码和响应提示)。

这样,在使用到同步模型的业务场景下,Message 实现类使用 Request/Reponse 作为后缀。例如说,用户认证请求、删除一个好友请求等等。

而在使用到异步模型能的业务场景下,Message 实现类还是继续 Message 作为后缀。例如说,发送一条消息,用户操作完后,无需阻塞等待结果

4.6.3 发送消息相关 Message

创建 SendToOneRequest 类,发送给指定人的私聊消息的 Message。

代码如下:

// SendToOneRequest.java

public class SendToOneRequest implements Message {

    public static final String TYPE = "SEND_TO_ONE_REQUEST";

     * 发送给的用户

    private String toUser;

     * 消息编号

    private String msgId;

    private String content;

    // ... 省略 set/get 方法

每个字段,自己看注释噢。

创建 SendToAllRequest 类,发送给所有人的群聊消息的 Message。

代码如下:

// SendToAllRequest.java

public class SendToAllRequest implements Message {

    public static final String TYPE = "SEND_TO_ALL_REQUEST";

     * 消息编号

    private String msgId;

    private String content;

    // ... 省略 set/get 方法

每个字段,自己看注释噢。

在服务端接收到发送消息的请求,需要异步响应发送是否成功。所以,创建 SendResponse 类,发送消息响应结果的 Message 。

代码如下:

// SendResponse.java

public class SendResponse implements Message {

    public static final String TYPE = "SEND_RESPONSE";

     * 消息编号

    private String msgId;

     * 响应状态码

    private Integer code;

     * 响应提示

    private String message;

    // ... 省略 set/get 方法

重点看 msgId 字段:即消息编号。客户端在发送消息,通过使用 UUID 算法,生成全局唯一消息编号(唯一ID的生成技术见:《从新手到专家:如何设计一套亿级消息量的分布式IM系统》的“5、唯一ID的技术方案”章节)。这样,服务端通过 SendResponse 消息响应,通过 msgId 做映射。

在服务端接收到发送消息的请求,需要转发消息给对应的人。所以,创建 SendToUserRequest 类,发送消息给一个用户的 Message 。

代码如下:

// SendResponse.java

public class SendToUserRequest implements Message {

    public static final String TYPE = "SEND_TO_USER_REQUEST";

     * 消息编号

    private String msgId;

    private String content;

    // ... 省略 set/get 方法

相比 SendToOneRequest 来说,少一个 toUser 字段。因为,我们可以通过 WebSocket 连接,已经知道发送给谁了。

4.7、消息处理器

每个客户端发起的 Message 消息类型,我们会声明对应的 MessageHandler 消息处理器。这个就类似在 SpringMVC 中,每个 API 接口对应一个 Controller 的 Method 方法。

所有的 MessageHandler 们,我们都放在 cn.iocoder.springboot.lab25.springwebsocket.handler 包路径下。

4.7.1 MessageHandler

创建 MessageHandler 接口,消息处理器接口。

代码如下:

// MessageHandler.java

public interface MessageHandler<T extends Message> {

     * 执行处理消息

     * @param session 会话

     * @param message 消息

    void execute(Session session, T message);

     * @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段

    String getType();

解释一下:

  • 1)定义了泛型 <T> ,需要是 Message 的实现类;
  • 2)定义的两个接口方法,自己看下注释哈。

4.7.2 AuthMessageHandler

创建 AuthMessageHandler 类,处理 AuthRequest 消息。

代码如下:

// AuthMessageHandler.java

@Component

public class AuthMessageHandler implements MessageHandler<AuthRequest> {

    @Override

    public void execute(Session session, AuthRequest message) {

        // 如果未传递 accessToken

        if(StringUtils.isEmpty(message.getAccessToken())) {

            WebSocketUtil.send(session, AuthResponse.TYPE,

                    new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入"));

            return;

        // 添加到 WebSocketUtil 中

        WebSocketUtil.addSession(session, message.getAccessToken()); // 考虑到代码简化,我们先直接使用 accessToken 作为 User

        // 判断是否认证成功。这里,假装直接成功

        WebSocketUtil.send(session, AuthResponse.TYPE,newAuthResponse().setCode(0));

        // 通知所有人,某个人加入了。这个是可选逻辑,仅仅是为了演示

        WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE,

                newUserJoinNoticeRequest().setNickname(message.getAccessToken())); // 考虑到代码简化,我们先直接使用 accessToken 作为 User

    @Override

    public String getType() {

        return AuthRequest.TYPE;

代码比较简单,跟着代码读读即可。

关于 WebSocketUtil 类,我们在「5.8、WebSocketUtil」一节中再来详细看看。

4.7.3 SendToOneRequest

创建 SendToOneHandler 类,处理 SendToOneRequest 消息。

代码如下:

// SendToOneRequest.java

@Component

public class SendToOneHandler implements MessageHandler<SendToOneRequest> {

    @Override

    public void execute(Session session, SendToOneRequest message) {

        // 这里,假装直接成功

        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);

        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);

        // 创建转发的消息

        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())

                .setContent(message.getContent());

        // 广播发送

        WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest);

    @Override

    public String getType() {

        return SendToOneRequest.TYPE;

代码比较简单,跟着代码读读即可。

4.7.4 SendToAllHandler

创建 SendToAllHandler 类,处理 SendToAllRequest 消息。

代码如下:

// SendToAllRequest.java

@Component

public class SendToAllHandler implements MessageHandler<SendToAllRequest> {

    @Override

    public void execute(Session session, SendToAllRequest message) {

        // 这里,假装直接成功

        SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);

        WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);

        // 创建转发的消息

        SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())

                .setContent(message.getContent());

        // 广播发送

        WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest);

    @Override

    public String getType() {

        return SendToAllRequest.TYPE;

代码比较简单,跟着代码读读即可。

4.8、WebSocketUtil

代码在 cn.iocoder.springboot.lab25.springwebsocket.util 包路径下。

创建 WebSocketUtil 工具类,主要提供两方面的功能:

  • 1)Session 会话的管理;
  • 2)多种发送消息的方式。

整体代码比较简单,自己瞅瞅哟。

代码在目录中的如下位置: 

4.9、完善 WebsocketServerEndpoint

在本小节,我们会修改 WebsocketServerEndpoint 的代码,完善其功能。

4.9.1 初始化 MessageHandler 集合

实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

代码如下:

// WebsocketServerEndpoint.java

 * 消息类型与 MessageHandler 的映射

 * 注意,这里设置成静态变量。虽然说 WebsocketServerEndpoint 是单例,但是 Spring Boot 还是会为每个 WebSocket 创建一个 WebsocketServerEndpoint Bean 。

private static final Map<String, MessageHandler> HANDLERS = newHashMap<>();

@Autowired

private ApplicationContext applicationContext;

@Override

public void afterPropertiesSet() throws Exception {

    // 通过 ApplicationContext 获得所有 MessageHandler Bean

    applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean.forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中

    logger.info("[afterPropertiesSet][消息处理器数量:{}]", HANDLERS.size());

通过这样的方式,可以避免手动配置 MessageHandler 与消息类型的映射。

4.9.2 onOpen

重新实现 #onOpen(Session session, EndpointConfig config) 方法,实现连接时,使用 accessToken 参数进行用户认证。

代码如下:

// WebsocketServerEndpoint.java

@OnOpen

public void onOpen(Session session, EndpointConfig config) {

    logger.info("[onOpen][session({}) 接入]", session);

    // <1> 解析 accessToken

    List<String> accessTokenValues = session.getRequestParameterMap().get("accessToken");

    String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null;

    // <2> 创建 AuthRequest 消息类型

    AuthRequest authRequest = newAuthRequest().setAccessToken(accessToken);

    // <3> 获得消息处理器

    MessageHandler<AuthRequest> messageHandler = HANDLERS.get(AuthRequest.TYPE);

    if(messageHandler == null) {

        logger.error("[onOpen][认证消息类型,不存在消息处理器]");

        return;

    messageHandler.execute(session, authRequest);

如代码所示:

  • <1> 处:解析 ws:// 地址上的 accessToken 的请求参。例如说:ws://127.0.0.1:8080?accessToken=999999;
  • <2> 处:创建 AuthRequest 消息类型,并设置 accessToken 属性;
  • <3> 处:获得 AuthRequest 消息类型对应的 MessageHandler 消息处理器,然后调用 MessageHandler#execute(session, message) 方法,执行处理用户认证请求。

打开三个浏览器创建,分别设置服务地址如下:

  • 1)ws://127.0.0.1:8080/?accessToken=芋艿;
  • 2)ws://127.0.0.1:8080/?accessToken=番茄;
  • 3)ws://127.0.0.1:8080/?accessToken=土豆。

然后,逐个点击「开启连接」按钮,进行 WebSocket 连接。

最终效果如下图:

如上图所示:

  • 1)在红圈中,可以看到 AuthResponse 的消息;
  • 2)在黄圈中,可以看到 UserJoinNoticeRequest 的消息。

4.9.3 onMessage

重新实现 #onMessage(Session session, String message) 方法,实现不同的消息,转发给不同的 MessageHandler 消息处理器。

代码如下:

// WebsocketServerEndpoint.java

@OnMessage

public void onMessage(Session session, String message) {

    logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别

        // <1> 获得消息类型

        JSONObject jsonMessage = JSON.parseObject(message);

        String messageType = jsonMessage.getString("type");

        // <2> 获得消息处理器

        MessageHandler messageHandler = HANDLERS.get(messageType);

        if(messageHandler == null) {

            logger.error("[onMessage][消息类型({}) 不存在消息处理器]", messageType);

            return;

        // <3> 解析消息

        Class<? extendsMessage> messageClass = this.getMessageClass(messageHandler);

        // <4> 处理消息

        Message messageObj = JSON.parseObject(jsonMessage.getString("body"), messageClass);

        messageHandler.execute(session, messageObj);

    } catch(Throwable throwable) {

        logger.info("[onMessage][session({}) message({}) 发生异常]", session, throwable);

代码中:

  • <1> 处,获得消息类型,从 "type" 字段中;
  • <2> 处,获得消息类型对应的 MessageHandler 消息处理器;
  • <3> 处,调用 #getMessageClass(MessageHandler handler) 方法,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。

代码如下:

// WebsocketServerEndpoint.java

private Class<? extends Message> getMessageClass(MessageHandler handler) {

    // 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。

    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);

    // 获得接口的 Type 数组

    Type[] interfaces = targetClass.getGenericInterfaces();

    Class<?> superclass = targetClass.getSuperclass();

    while((Objects.isNull(interfaces) || 0== interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准

        interfaces = superclass.getGenericInterfaces();

        superclass = targetClass.getSuperclass();

    if(Objects.nonNull(interfaces)) {

        // 遍历 interfaces 数组

        for(Type type : interfaces) {

            // 要求 type 是泛型参数

            if(type instanceof ParameterizedType) {

                ParameterizedType parameterizedType = (ParameterizedType) type;

                // 要求是 MessageHandler 接口

                if(Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {

                    Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();

                    // 取首个元素

                    if(Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {

                        return(Class<Message>) actualTypeArguments[0];

                    } else{

                        thrownewIllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));

    throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));

这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。

如果大家对 Java 的泛型机制没有做过一点了解,可能略微有点硬核。可以先暂时跳过,知道意图即可。

<4> 处,调用 MessageHandler#execute(session, message) 方法,执行处理请求。

另外:这里增加了 try-catch 代码,避免整个执行的过程中,发生异常。如果在 onMessage 事件的处理中,发生异常,该消息对应的 Session 会话会被自动关闭。显然,这个不符合我们的要求。例如说,在 MessageHandler 处理消息的过程中,发生一些异常是无法避免的。

继续基于上述创建的三个浏览器,我们先点击「清空消息」按钮,清空下消息,打扫下上次测试展示出来的接收得到的 Message 。当然,WebSocket 的连接,不需要去断开。

在第一个浏览器中,分别发送两种聊天消息。

一条 SendToOneRequest 私聊消息:

    type: "SEND_TO_ONE_REQUEST",

    body: {

        toUser: "番茄",

        msgId: "eaef4a3c-35dd-46ee-b548-f9c4eb6396fe",

        content: "我是一条单聊消息"

一条 SendToAllHandler 群聊消息:

    type: "SEND_TO_ALL_REQUEST",

    body: {

        msgId: "838e97e1-6ae9-40f9-99c3-f7127ed64747",

        content: "我是一条群聊消息"

最终结果如下图:

如上图所示:

  • 1)在红圈中,可以看到一条 SendToUserRequest 的消息,仅有第二个浏览器(番茄)收到;
  • 2)在黄圈中,可以看到三条 SendToUserRequest 的消息,所有浏览器都收到。

4.9.4 onClose

重新实现 #onClose(Session session, CloseReason closeReason) 方法,实现移除关闭的 Session 。

代码如下:

// WebsocketServerEndpoint.java

@OnClose

public void onClose(Session session, CloseReason closeReason) {

    logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);

    WebSocketUtil.removeSession(session);

4.9.5 onError

#onError(Session session, Throwable throwable) 方法,保持不变。

代码如下:

// WebsocketServerEndpoint.java

@OnError

public void onError(Session session, Throwable throwable) {

    logger.info("[onClose][session({}) 发生异常]", session, throwable);

五、Spring WebSocket 实战入门

5.0、基础介绍

示例代码下载:

(因附件无法上传到此处,请从同步链接处下载:http://www.52im.net/thread-3483-1-1.html

仔细一个捉摸,虎躯一震,还是提供一个 Spring WebSocket 快速入门的示例。

在 上章「Tomcat WebSocket 实战入门」 的 lab-websocket-25-01 示例的基础上,我们复制出 lab-websocket-25-02 项目,进行改造。

改造的代码目录内容是这样:

5.1、WebSocketUtil

因为 Tomcat WebSocket 使用的是 Session 作为会话,而 Spring WebSocket 使用的是 WebSocketSession 作为会话,导致我们需要略微修改下 WebSocketUtil 工具类。改动非常略微,点击 WebSocketUtil.java 查看下,秒懂的噢。

主要有两点:

  • 1)将所有使用 Session 类的地方,调整成 WebSocketSession 类;
  • 2)将发送消息,从 Session 修改成 WebSocketSession 。

5.2、消息处理器

将 cn.iocoder.springboot.lab25.springwebsocket.handler 包路径下的消息处理器们,使用到 Session 类的地方,调整成 WebSocketSession 类。

5.3、DemoWebSocketShakeInterceptor

在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路径下,创建 DemoWebSocketShakeInterceptor 拦截器。因为 WebSocketSession 无法获得 ws 地址上的请求参数,所以只好通过该拦截器,获得 accessToken 请求参数,设置到 attributes 中。

代码如下:

// DemoWebSocketShakeInterceptor.java

public class DemoWebSocketShakeInterceptor extends HttpSessionHandshakeInterceptor {

    @Override// 拦截 Handshake 事件

    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throwsException {

        // 获得 accessToken

        if(request instanceof ServletServerHttpRequest) {

            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;

            attributes.put("accessToken", serverRequest.getServletRequest().getParameter("accessToken"));

        // 调用父方法,继续执行逻辑

        return super.beforeHandshake(request, response, wsHandler, attributes);

5.4、DemoWebSocketHandler

在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路径下,创建 DemoWebSocketHandler 处理器。该处理器参考 「5.9、完善 WebsocketServerEndpoint」 小节,编写它的代码。

DemoWebSocketHandler.java代码位于如下目录处,具体内容就不贴出来了,自已去读一读:

代码极其相似,简单撸下即可。

5.5、WebSocketConfiguration

修改 WebSocketConfiguration 配置类,代码如下:

// WebSocketConfiguration.java

@Configuration

@EnableWebSocket// 开启 Spring WebSocket

public class WebSocketConfiguration implements WebSocketConfigurer {

    @Override

    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

        registry.addHandler(this.webSocketHandler(), "/") // 配置处理器

                .addInterceptors(newDemoWebSocketShakeInterceptor()) // 配置拦截器

                .setAllowedOrigins("*"); // 解决跨域问题

    @Bean

    public DemoWebSocketHandler webSocketHandler() {

        return new DemoWebSocketHandler();

    @Bean

    public DemoWebSocketShakeInterceptor webSocketShakeInterceptor() {

        return new DemoWebSocketShakeInterceptor();

解释一下:

  • 1)在类上,添加 @EnableWebSocket 注解,开启 Spring WebSocket 功能;
  • 2)实现 WebSocketConfigurer 接口,自定义 WebSocket 的配置(具体可以看看 #registerWebSocketHandlers(registry) 方法,配置 WebSocket 处理器、拦截器,以及允许跨域)。

至此,我们已经完成 Spring WebSocket 的示例。

后面,我们执行 Application 来启动项目。具体的测试,这里就不重复了,可以自己使用 WebSocket 在线测试工具 来测试下。

七、写在最后

虽然说,WebSocket 协议已经在主流的浏览器上,得到非常好的支持,但是总有一些“异类”,是不兼容的。所以就诞生了 SockJS、Socket.io这类库。关于它们的介绍与使用,可以看看 《SockJS 简单介绍》 、《Web端即时通讯技术的发展与WebSocket、Socket.io的技术实践》文章。

实际场景下,我们在使用 WebSocket 还是原生 Socket 也好,都需要考虑“如何保证消息一定送达给用户?”

大家肯定能够想到的是:如果用户不处于在线的时候,消息持久化到 MySQL、MongoDB 等等数据库中。这个是正确,且是必须要做的。

我们在一起考虑下边界场景:客户端网络环境较差,特别是在移动端场景下,出现网络闪断,可能会出现连接实际已经断开,而服务端以为客户端处于在线的情况。此时,服务端会将消息发给客户端,那么消息实际就发送到“空气”中,产生丢失的情况。

要解决这种情况下的问题,需要引入客户端的 ACK 消息机制。

目前,主流的有两种做法。

第一种:基于每一条消息编号 ACK

整体流程如下:

  • 1)无论客户端是否在线,服务端都先把接收到的消息持久化到数据库中。如果客户端此时在线,服务端将完整消息推送给客户端;
  • 2)客户端在接收到消息之后,发送 ACK 消息编号给服务端,告知已经收到该消息。服务端在收到 ACK 消息编号的时候,标记该消息已经发送成功;
  • 3)服务端定时轮询,在线的客户端,是否有超过 N 秒未 ACK 的消息。如果有,则重新发送消息给对应的客户端。

这种方案,因为客户端逐条 ACK 消息编号,所以会导致客户端和服务端交互次数过多。当然,客户端可以异步批量 ACK 多条消息,从而减少次数。

不过因为服务端仍然需要定时轮询,也会导致服务端压力较大。所以,这种方案基本已经不采用了。

第二种:基于滑动窗口 ACK

整体流程如下:

  • 1)无论客户端是否在线,服务端都先把接收到的消息持久化到数据库中。如果客户端此时在线,服务端将消息编号推送给客户端;
  • 2)客户端在接收到消息编号之后,和本地的消息编号进行比对。如果比本地的小,说明该消息已经收到,忽略不处理;如果比本地的大,使用本地的消息编号,向服务端拉取大于本地的消息编号的消息列表,即增量消息列表。拉取完成后,更新消息列表中最大的消息编号为新的本地的消息编号;
  • 3)服务端在收到客户端拉取增量的消息列表时,将请求的编号记录到数据库中,用于知道客户端此时本地的最新消息编号;
  • 4)考虑到服务端将消息编号推送给客户端,也会存在丢失的情况,所以客户端会每 N 秒定时向服务端拉取大于本地的消息编号的消息列表。

这种方式,在业务被称为推拉结合的方案,在分布式消息队列、配置中心、注册中心实现实时的数据同步,经常被采用。

并且,采用这种方案的情况下,客户端和服务端不一定需要使用长连接,也可以使用长轮询所替代。

做法比如,客户端发送带有消息版本号的 HTTP 请求到服务端:

  • 1)如果服务端已有比客户端新的消息编号,则直接返回增量的消息列表;
  • 2)如果服务端没有比客户端新的消息编号,则 HOLD 住请求,直到有新的消息列表可以返回,或者 HTTP 请求超时;
  • 3)客户端在收到 HTTP 请求超时时,立即又重新发起带有消息版本号的 HTTP 请求到服务端。如此反复循环,通过消息编号作为增量标识,达到实时获取消息的目的。

如果大家对消息可靠投递这块感兴趣,可以看看下面这几篇:

毕竟,本篇这里写的有点简略哈 ~

最后:如果你想系统的学习IM开发方面方面的知识,推荐详读:《新手入门一篇就够:从零开发移动端IM》。如果你自认为已经有点小牛x了,可以看看生产环境下的大用户量IM系统架构设计方面的知识:《从新手到专家:如何设计一套亿级消息量的分布式IM系统》。

限于篇幅,这里就不再继续展开了。

附录:更多IM开发动手实践文章

自已开发IM有那么难吗?手把手教你自撸一个Andriod版简易IM (有源码)

一种Android端IM智能心跳算法的设计与实现探讨(含样例代码)

手把手教你用Netty实现网络通信程序的心跳机制、断线重连机制

轻量级即时通讯框架MobileIMSDK的iOS源码(开源版)[附件下载]

开源IM工程“蘑菇街TeamTalk”2015年5月前未删减版完整代码 [附件下载]

NIO框架入门(一):服务端基于Netty4的UDP双向通信Demo演示 [附件下载]

NIO框架入门(二):服务端基于MINA2的UDP双向通信Demo演示 [附件下载]

NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战 [附件下载]

NIO框架入门(四):Android与MINA2、Netty4的跨平台UDP双向通信实战 [附件下载]

一个WebSocket实时聊天室Demo:基于node.js+socket.io [附件下载]

适合新手:从零开发一个IM服务端(基于Netty,有完整源码)

拿起键盘就是干:跟我一起徒手开发一套分布式IM系统

正确理解IM长连接的心跳及重连机制,并动手实现(有完整IM源码)

适合新手:手把手教你用Go快速搭建高性能、可扩展的IM系统(有源码)

跟着源码一起学:手把手教你用WebSocket打造Web端IM聊天

本文已同步发布于“即时通讯技术圈”公众号。

▲ 本文在公众号上的链接是:点此进入。同步发布链接是:http://www.52im.net/thread-3483-1-1.html 


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK