⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.csdn.net/m0_64360721/article/details/125125467 「代码峡谷孙膑」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

介绍

WebSocket大家应该是再熟悉不过了,如果是单体应用确实不会有什么问题,但是当我们的项目使用微服务架构时,就可能会存在问题

比如服务A有两个实例A1和A2,前端的WebSocket客户端C通过网关的负载均衡连到了A1,这个时候当A2触发消息发送的逻辑,需要将某个消息发送给所有的客户端时,C就接受不到消息

这个时候我们很快就能想到一种最简单的解决方案,就是把A2的消息转发给A1,A1再把消息发送给C,这样C就能收到A2发送的消息了

基于这个思路,我实现了一个库,一个配置注解搞定一切

用法

接下来让我们看看这个库的用法

首先我们需要在启动类上添加一个注解@EnableWebSocketLoadBalanceConcept

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
public class AServiceApplication {

public static void main(String[] args) {
SpringApplication.run(AServiceApplication.class, args);
}
}

接着我们在需要发送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨实例发消息啦

@RestController
@RequestMapping("/ws")
public class WsController {

@Autowired
private WebSocketLoadBalanceConcept concept;

@RequestMapping("/send")
public void send(@RequestParam String msg) {
concept.send(msg);
}
}

是不是很简单,有没有觉得比自己集成单体应用的WebSocket还要简单!

当你的同事还在头疼要实现手动转发时你已经通过一个配置注解实现了功能并开始泡茶喝

你的同事肯定对你刮目相看啊(又能开始摸鱼了)

不知道大家看了之后是不是对具体实现已经有了一些思路呢

接下来我就来讲讲这个库的实现流程

抽象思路

其实我之前有专门针对WebSocket实现过类似功能的模块,只是当时的一些场景都是基于项目定死的,所以相对来说实现比较简单,但是过于定制化不好扩展

有一天在和我的一个前同事聊天的过程中得知,他们在考虑让设备和服务直连,并且服务要部署成多实例

设备和服务直连无非就是通过TCP这种长连接来实现,可以使用缓存来保存连接和服务地址的映射关系来实现点对点转发的功能需求

听到这里,是不是感觉似曾相识?当时就有一道光穿过我的脑瓜子,真相只有一个!这不就和WebSocket在集群模式下的问题一样么

于是我从原来针对WebSocket的思考,变成了对各种长连接的思考,最终我将这个问题抽象成了:长连接的集群方案

而不管是WebSocket还是TCP都是长连接的一种具体实现

所以我们可以抽象一个顶级接口Connection,然后实现WebSocketConnection或者是TCPConnection

其实从抽象的角度来说不仅仅是长连接,短连接也在我们的抽象范围之内,只不过类似HTTP等协议并不存在上述的问题,但是并不妨碍你实现一个HTTPConnection用于转发消息,所以大家不要被先入为主的思维束缚住了

转发思路

之前讲到,这个库的主要思路就是将消息转发给其他的服务实例来达到一个单播或广播的效果

所以消息转发的设计就非常重要了

首先消息转发需要凭借一些支持数据交互的技术手段

比如HTTP,MQ,TCP,WebSocket

说到这里。。。大家是不是。。。你TM原来自己就能搞定啊(掀桌)

长连接不就是用来交互数据的吗,所以完全可以自给自足啊

于是就有一个精妙的想法在我脑子里形成:

如果每个服务实例都把自己作为一个客户端,连接到其他服务上呢?

  • WebSocket的场景下,我们将当前服务实例作为一个WebSocket客户端去连接其他服务实例的WebSocket服务端
  • TCP的场景下,我们将当前服务实例作为一个TCP的客户端去连接其他服务实例的TCP服务端

这样其他服务实例就可以把消息发到这些伪装的客户端上,当服务实例上伪装的客户端接收到消息之后就可以再转发给自己管理的真正的客户端

撒花家人们,自闭(自我闭环)了属于是

所以我们首先需要先让服务实例之间相互连接上

连接流程

让我们来看看互相建立连接是怎么设计的

我定义了一个ConnectionSubscriber的接口,大家可以理解为我们的服务实例要去订阅监听其他服务发送的消息

同时提供了默认实现,就是基于自身的协议进行连接和消息的发送

当然也能够灵活的支持其他方式,只需要自定义一个ConnectionSubscriber就可以了,如果使用MQ的方式就可以实现一个MQConnectionSubscriber或者使用HTTP就可以实现一个HTTPConnectionSubscriber

只不过使用自身的协议就可以不用依赖其他的库或是中间件了,当然如果你对消息的丢失率有比较严格的要求也可以使用MQ作为消息转发的中介,而以我之前参与过的项目来说,一般普通的WebSocket场景基本上还是能忍受一定的丢失率的

获取服务实例信息

那么我们怎么知道要去连接哪些实例呢

我定义了一个ConnectionServerManager的接口用来管理服务信息

当然我们完全可以自己实现一个,比如通过配置文件来配置服务实例信息

不过我们有更方便的方式,那就是依赖Spring Cloud的服务发现组件了,不管是Eureka还是Nacos还是其他的注册中心相当于都支持了,这就是抽象的魅力啊

我们可以通过DiscoveryClient#getInstances(Registration.getServiceId())来获得所有的实例,排除掉自身就是需要连接的服务实例了

当我们的服务实例连接上其他的服务实例之后,发送一个自身实例信息的消息过去,其他的服务实例接收到对应的消息之后反过来连接我们的服务实例,保证一定的连接及时性,这样双方的连接就搭建起来了,可以互相转发消息了

同时我还添加了心跳检测和自动重连,当一段时间没有收到心跳回复后就会断开连接,并且每隔一段时间就会重新查询一遍实例信息,如果发现存在某个服务实例没有对应的连接,就会重新进行连接,这样就能在某些偶尔网络不好的情况下有一定的容错

到目前为止,我们基本的框架已经建立了,当我们启动服务之后,服务间就会自动建立连接

连接区分和管理

基于上述的思路,我们肯定需要区分真实的客户端和用来转发的客户端

于是我就把这些连接做了一个分类

类别 说明
Client 普通的连接
Subscriber 服务实例伪装的连接,用于接受需要转发的消息
Observable 服务实例伪装的连接,用于发送需要转发的消息

然后对于这些连接进行一个统一的管理

通过连接工厂ConnectionFactory我们可以将任意的连接适配成Connection对象,并实现各种连接间的消息转发

每个连接都会配置一个MessageEncoderMessageDecoder用于消息的编码和解码,而且不同类别的连接对应的编码器和解码器肯定是不一样的,比如转发的消息和发给真实客户端的消息很大程度上都是有区别的,所以额外定义了一个MessageCodecAdapter用来适配不同类型的编解码器,也能让大家在自定义时方便管理

消息发送

现在当我们发送某条消息之后,消息就会被转发到其他的服务实例,所有的客户端就都能收到了

不对啊,在有些情况下我们不想让所有客户端都收到啊,能不能我们想让谁收到就让谁收到啊

真麻烦,来,我把所有的连接都给你,你自己选吧

连接选择

我们需要在消息发送时确定发送给哪些连接

于是我就定义了一个连接选择器ConnectionSelector

每次要发送消息的时候,我都会匹配一个连接选择器,然后通过选择器来获得需要发送消息的连接,而我们可以通过自定义连接选择器来实现我们消息的精准发送

这里其实就是我为什么会取名WebSocketLoadBalanceConcept的原因,为什么要叫LoadBalance呢

Ribbon通过IRule来选择一个Server

我通过ConnectionSelector来选择一个Connection集合

是不是有异曲同工之妙

继续来说自定义选择器

准备工作:

  • 我们的Connection有一个metadata字段用于存放自定义属性
  • 我们的Message有一个headers字段用于存放消息头

给指定用户发送消息

很多场景下我们需要给指定的用户发送消息

首先当客户端连接上来时,可以通过参数或者主动发送一个消息将userId发给服务端,然后服务端将得到的userId存在Connectionmetadata

接着我们给需要发送的Message添加一个header,将对应的userId作为消息头

这样我们就可以自定义一个连接选择器通过判断Message是否包含userId消息头来作为匹配的条件,当Message的headers中存在userId时,对Connection中的metadata进行userId的匹配来筛选需要发送消息的连接

由于userId是唯一的,当我们自身服务连上来的客户端中已经匹配到就不需要再转发了,如果没有匹配到就通过其他服务实例的客户端进行消息转发

库中已经实现了对应的UserSelectorUserMessage,可以使用配置开启并通过在连接路径上添加userId参数来标记用户

当然我们也可以借用缓存来精确的判断需不需要转发或者是需要转发给哪几个服务,把userId和服务的instanceId等一些具有唯一性的数据缓存在Redis中,当给用户发送消息时,从Redis中获得用户对应的服务实例的instanceId或是具有唯一性的数据,如果经过匹配就是当前服务就可以直接下发,如果是其他服务就转发给那个对应的服务就行了

给指定路径发送消息

还有一种场景也比较常见就是类似主题订阅,如订阅设备状态更新的数据,就要给每一个对应路径的连接发送消息了

我们可以使用不同的路径来表示不同主题,然后自定义一个连接选择器来匹配连接的路径和消息头中指定的路径

当然库中也已经实现了对应的PathSelectorPathMessage,可以通过配置开启

结束

最后请允许我发表一点对于抽象的拙见

抽象其实就和 “道生一,一生二,二生三,三生万物” 一样,根据你的顶级接口(也就是核心功能)不断的向外展开,你的顶级接口就是道(狭义的来讲)

以这个库为例,ConnectionLoadBalanceConcept就是这个库的道,他的核心功能就是发送消息,至于怎么发,发给谁,不确定,像是一个混沌的状态

那么什么是一,二,三呢,我们发送消息需要载体于是就有了ConnectionMessage,我们需要对Connection进行管理于是就有了ConnectionRepository,我们需要转发消息于是就有了ConnectionSubscriber等等

而万物就像是具体的实现,是能落实的,基于Spring Cloud服务发现的连接管理器DiscoveryConnectionServerManager,基于路径的连接选择器PathSelector,基于Reactive的WebSocket连接ReactiveWebSocketConnection

就像是你创造的世界,不断的衍生出各种各样的规则,这些规则相辅相成,让你的世界平稳的运行

当然你的世界也有可能存在bug,手动狗头

文章目录
  1. 1. 介绍
  2. 2. 用法
    1. 2.1. 抽象思路
    2. 2.2. 转发思路
    3. 2.3. 连接流程
      1. 2.3.1. 获取服务实例信息
      2. 2.3.2. 连接区分和管理
    4. 2.4. 消息发送
      1. 2.4.1. 连接选择
  3. 3. 结束