1.Reactive Spring实战 -- 理解Reactor的码分设计与实现
2.使用Gateway作为SpringCloud网关
3.Reactor-Netty基本抽象类介绍
4.响应式编程入门之 Project Reactor
5.第6章 Reactor Netty中的消息处理逻辑 01
Reactive Spring实战 -- 理解Reactor的设计与实现
Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。码分它提供了可组合的码分异步序列API,包括用于多个元素的码分Flux和用于零到一个元素的Mono。
Reactor Netty项目还支持非阻塞式网络通信,码分非常适合微服务架构,码分溯源码供货公司为HTTP(包括Websockets),码分TCP和UDP提供了响应式编程基础。码分本文将通过实例展示和源码阅读,码分深入分析Reactor的码分核心设计与实现机制。
Reactor源码基于版本3.3。码分
响应式编程是码分一个专注于数据流和变化传递的异步编程范式,允许使用编程语言表示静态或动态数据流。码分
Reactor中,码分发布者(Publisher)负责生产数据,码分订阅者(Subscriber)负责处理和消费数据。创建发布者和订阅者后,通过建立订阅关系,发布者开始生产数据并传递给订阅者。
Flux和Mono是两种发布者类型,分别用于生产多个数据元素和单个数据元素。例如,Flux.range和fromArray等静态方法会返回Flux子类。
Reactor中关键方法包括Publisher#subscribe和Flux#subscribe。订阅者在onSubscribe方法中接收订阅关系,然后通过Subscription#request方法向发布者请求数据。
RangeSubscription#request、Subscriber#onNext和CoreSubscriber的内部逻辑展示了数据流转的过程。Flux子类的subscribe方法创建Subscription,将操作符逻辑转移到Subscriber端。
操作符方法,如skip、distinct、sort和filter,先知者买卖指标源码是Reactor的核心,用于处理和组合数据流。例如,myHandler作为订阅者,可以处理生成的Flux子类序列。
Reactor支持push和pull模式。pull模式通过Flux#generate和Sink缓存数据,而push模式则通过Flux#create,允许多线程同时推送数据。
Reactor提供线程与调度器支持,例如parallel、single、boundedElastic和parallel。这些调度器允许在不同线程环境下执行操作。
Reactor中的publishOn和subscribeOn操作符方法用于切换操作上下文,分别影响后续操作和整个链路的线程执行环境。
流量控制是响应式编程中的重要概念,FluxSink.OverflowStrategy定义了在数据生产速度超过消费速度时的策略,如忽略、错误或缓存数据。
Reactor通过实例和源码展示了响应式编程的概念和实现机制,以及如何在实际应用中使用。通过WebFlux和AsyncRestTemplate的比较,将揭示响应式编程带来的优势。
使用Gateway作为SpringCloud网关
本着能用原生就用原生的原则,我们这里使用SpringGateway来作为云服务的网关
配置
从官网的介绍来看,spring网关拥有的功能有,路由(配置,过滤,重写等),熔断以及流量控制
首先引入包
动态路由
路由的配置比较简单,有两种方法:使用配置文件和代码注入,我们这里简单展示下两种方法
或者使用
路由配置中id、dz板块 源码在哪里uri、order、predicates.path/host没什么好说的,根据需求配置即可,filters相关参数,这里最好还是参考源码相关部分或者Spring Cloud Gateway比较全面,比如常用的前缀切割
这里我们以常用的两种filter,流量控制和熔断降级举例
流量控制
通常我们需要限流来保证服务的可用性,保护一些不太稳定的服务不会因为高并发的请求而挂掉,这里我们一般在网关层做流量控制,减少实际进入的请求达到平波峰的目的
计数器算法
如果某个服务会在请求中数量达到时候挂掉,请求平均时间为2s,我们给一段时间一个请求量的限制,比如2秒次,每次请求进入就减少计数,每2s开始时重新计数,这样就能保证服务请求中数量在以内。但是对于抢购类接口,可能前ms请求数量就用完了,后面所有请求都被拒绝,即请求突刺现象,这样的用户体验是非常差的所以我们需要尽可能在所有的时间内保证接口的可用性(计数器算法就像DRAM中的集中式刷新一样不太能被接受),而且短时间内大量请求运行在相同代码段是非常危险的,在设计不好的情况很可能会出现数据库死锁等等问题
漏桶算法
我们需要让请求尽可能地能进行来,就需要平波峰填波谷,就上例而言,2s内最大请求为,也就是每个请求占用的时间比例为ms,我们设计一个容量为的桶(队列)每ms向接口发一个请求,可以让服务中请求数量不超过的情况下,每ms都能接受一个新的请求,这样就缓解了请求突刺现象。但是php营销型网站源码这里还有一个问题,对于抢购类接口,个容量可能ms就用完了,在第ms可能还会有个请求抢1个位置,个请求会被取消,这样也是相对来说不能被接受的
令牌桶算法
令牌桶算法就是目前spring cloud gateway采用的算法,这里采用的用户时间换用户失败的策略,假设我们认为用户的平均忍耐时间为8秒,接口超过8秒一些用户就要骂街了,减去实际执行的2秒,也就是说我们的可以利用6秒的时间容纳更多的请求。依上文而言每ms去调用这个端口,那么也就是说桶的设计可以更大,在桶里放上令牌,每个请求需要在桶里面拿到令牌才能调用,这里的桶容量就是6s/ms为个。但是我们的执行速度是不变的,也就是结果是,在请求多的情况下用户的执行时间在8秒左右,而在请求少的情况下执行速度在2s左右,这样就缓解了短时间内大量请求导致大量失败的问题了。这里比较重要的参数有两个,第一个是桶请求容量 defaultBurstCapacity,第二个是每秒执行的请求速度(也就是桶的填充速率)defaultReplenishRate
在这个例子中defaultBurstCapacity=而defaultReplenishRate=,这两个参数我们会在下方配置
这里我们需要引用redis包,再说明一下,本站使用的是jdk的版本,其他版本的配置和引用可能会稍有变化,需要调整
覆写KeyResolver的实现类
流量控制,这里同样有代码实现和配置文件实现,由于目前idea对于复杂配置文件的支持不太好,如果使用配置文件方式会疯狂报红,但是如果全部使用代码的话会不方便实现动态路由,因为gateway是左下角导航 源码先加载配置再处理代码的。所以这里我们路由使用配置,filter之类复杂的使用代码实现,下面是简单示例
这样全服务层面的接口流量控制就完成了,具体的哪些服务使用流量控制,具体控制参数的配置,自行稍作修改即可
测试流量控制的话,可以将令牌回复量和令牌总容量调至比较低的水平,然后再浏览器直接curl接口,比如令牌回复量和容量为1,则单秒内curl即可触发浏览器提示,线上大令牌容量测试能需要多线程curl了,这里参考官方文档给的lua脚本
ip限流
如果我们需要对某个ip进行限流,比如防止脚本抢货,我们这里需要KeyResolver的实现不再使用exchange.getRequest().getURI().getPath() ,而是使用 exchange.getRequest().getRemoteAddress() 。但是这里还有一个问题,我们请求是经过层层转发的,nginx,docker等,所以我们可能并不能拿到原始的请求地址,所以这里我们需要在最外层,比如nginx中将原始地址存到header或者cookie当中,这里给出简单示例
当然还有其他类似X-Forwarded-For的字段不再本文主要探讨范围就不多拓展了,在nginx中配置记录初始远程地址到header后,我们这里需要在程序中取出来,如果你这里使用的标准的X-Real-IP的字段去存储,那么只需要
即可获取真实地址,如果你这里自定义了一个header的key那么需要在exchange.getRequest().getHeaders()里面自己找出来了
最后我们这里给出对同一个接口同时配置两种限流的示例
我在ip限流这里修改了返回的code由改为了,方便测试,这里我们将ip的限流参数设置为(2,2),将path的限流参数设置为(1,)然后不断请求接口就发现一开始返回错误,后续path令牌桶用完后返回错误,即设置成功
补充
如果这里你不希望返回,并且要求返回一个用户可读的带有json信息结果,那么比较好的业务处理方式是前端完成。如果是对外接口的话,那么我们这里就只能重写RateLimiter的实现了,不再使用RedisRateLimiter的类,而是自己去继承RateLimiter接口去实现,
参考 SpringCloudGateway限流后,默认返回的改造:改跳转或增加响应body,这篇文章已经很详细,这里就不赘述了
熔断降级
熔断降级,即某个接口调用失败时使用其他接口代替,来保证整体服务对外的可用性
首先需要引入熔断包
circuitbreaker-reactor-resilience4j 熔断的相关配置分为两个部分,熔断逻辑本身的配置以及在集成到gateway中时候,网关的配置,熔断的重要的配置有,触发熔断的接口,代替接口,熔断超时时间(当然还有其他的,比如自定义熔断HttpStatus等等,详细参数参考 Spring Cloud Circuit Breaker以及resilience4j官网)
这里熔断触发接口和代替接口配置位于gateway中,这里我们使用代码实现,位置参考前述
这里setName的目的是和熔断包中的配置产生对应关系,下方为熔断包的配置,这里定义默认超时时间(也就是没有匹配到name的超时时间)为s,your_breaker_id的超时时间为3s
最后
到这里网关的基本功能就差不多了,自定义的一些业务功能配置,比如header,cookie,以及调用方ip的处理逻辑等等其实都是在网关层处理的,可以参考 Spring Cloud Gateway WebFilter Factories以及Writing Custom Spring Cloud Gateway Filters,但是这种配置基本都没什么坑,这里就不谈了
网关由于不经常作为业务逻辑被重构,所以网络上的资料相对比较少,我这里使用的又是最新的版本还是蛮多和前版本不一样的地方,尤其是webflux的一些东西,很多问题需要看源码才能解决,非常的消耗意志力。这里建议小伙伴们如果是业务使用的这种资料相对较少的架构,最好还是不要使用最新版本的比较好,毕竟万一遇到坑,踩个一两天是很正常的事情,而这种在业务场景可能就没那么容易接受了
Reactor-Netty基本抽象类介绍
概述
之前已经把reactor3看的差不多了,在学会webflux之前还需要了解Reactor-Netty的相关知识,然后才能看懂webflux,然后才能看懂Gateway.
LoopResource首先先学习几个基本的类才能看懂Reactor-Netty在干什么.我们先来看LoopResource类.官方说这个类是一个EventLoopGroup 的 selector并且关联了 Channel的工厂
* An { @link EventLoopGroup} selector with associated* { @link io.netty.channel.Channel} factories.我们来看一下LoopResource提供的一些方法
static LoopResources create(String prefix) { if (Objects.requireNonNull(prefix, "prefix").isEmpty()) { throw new IllegalArgumentException("Cannot use empty prefix"); } return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT, DEFAULT_IO_WORKER_COUNT, true);}我们来看看DefaultLoopResource内部实现
其实内部就是缓存了一堆的EventLoopGroup
ChannelPipelineConfigurer这个类的作用就是Channel创建好之后,在读取数据之前的初始化工作,我们看几个实现类 HttpServerChannelInitializer
ChannelGroup官方解释: 一个线程安全的集合,里面装的是打开的Channel,并且提供了很多操作Channel的方法,关闭的Channel会自动被group剔除.一个Channel可以属于多个Group
先来看看唯一一个实现类DefaultChannelGroup的源码
可以看到内部就是两个Map维护服务端和客户端的Channel,然后还有一个监听器.接下来看看添加Channel的方法再来看看是如何自动把过期Channel移除的,channel关闭之后会出发listener,listener会调用remove方法
其实就是很简单的从map中移除数据的逻辑
ConnectionObserver从字面上看就是连接的观察者.是一个Connection的生命周期观察器.核心方法是 onStateChange.子类很多,等看源码的时候看到具体的再看源码.我们先来看ConnectionObserver定义的几个状态
TransportConfig一个配置的抽象类,里面保存了一些属性
我们上面介绍的那些类都被保存在了这个Config里面.来看看其中一些比较重要的子类
ServerTransportConfig可以看到这个子类里面提供了两个ConnectionObserver我们分别来看一看
ServerTransportDoOnconnectionServerTransportDoOn原文:/post/响应式编程入门之 Project Reactor
本文旨在为读者提供对响应式编程及其核心库——Reactor的入门理解。在介绍前,我们先回顾一下非阻塞IO编程的基础,理解为何在Spring MVC中引入了WebFlux以及Reactor。Reactor是基于Java 8函数式API,集成CompletableFuture、Stream和Duration,它提供了Flux和Mono等异步序列API,并实现了Reactive Streams规范,特别适合构建微服务架构中的响应式系统。
在非阻塞IO编程中,比如调用远程服务时,我们通常通过回调函数来处理数据可用情况。然而,当回调逻辑复杂时,代码往往难以阅读。响应式编程通过简化这种逻辑,提供了更简洁的实现方式。它将传统命令式编程抽象为一系列API,更适合非阻塞IO环境。尽管响应式编程在非阻塞IO框架中广泛应用,如Vertx和WebFlux,但这并不意味着非阻塞IO编程只能依赖响应式编程。
Reactor作为响应式编程的基础,实现了Java响应式编程规范,理解其内部工作原理有助于深入掌握其API。Reactor的核心接口展示了其运作机制,包括数据发布和订阅流程。在实际应用中,Publisher和Subscription共同作用,通过调用Subscriber的onNext、onComplete和onError方法来实现数据流转。
响应式编程思想可类比为一条流水线,Publisher定义了数据生产过程,Operators对数据进行解析、校验和转换等操作,最终流转到Subscriber。这种设计使得系统在未被订阅之前保持静默,直至实际使用时才启动。
Reactor中的Operator作为连接上下游的关键组件,实现了数据的转换和处理。例如,map操作符通过改变数据值来实现数据转换。实际实现虽然复杂且严谨,但遵循了相同的设计理念。
学习Reactor的关键在于理解核心接口以及实践API。首先理解响应式编程的基本概念和Reactor如何实现这些概念。接下来,深入阅读官方文档并进行代码实践。追踪源码时,关注subscribe方法和Subscription的作用,以及Subscriber中的onNext、onComplete和onError方法的实现。
总之,通过本文的学习,读者应能对响应式编程和Reactor有初步的了解,并掌握学习Reactor的方法和途径。尽管本文未详细探讨Reactor的每个细节,但它为深入探索提供了基础。欢迎读者通过实践和阅读源码进一步深入理解这一强大且灵活的编程范式。
第6章 Reactor Netty中的消息处理逻辑
在深入探讨Reactor Netty中的消息处理逻辑时,我们首先回顾了ChannelOperationsHandler作为WebFlux与Netty之间的重要桥梁,以及如何通过Channel生命周期的核心功能实现对Netty客户端与服务器端连接的封装。关注服务器端层面上的浏览器通过HTTP请求访问应用程序所涉及的Channel及其消息传递机制,这一章旨在逐步揭示其中的组织细节。
在理解Reactor Netty中如何利用ChannelOperationsHandler进行通信的基础上,我们重点分析了ConnectionObserver与channelOperation的相互配合。特别地,我们探讨了ConnectionObserver如何在服务器端通过其onStateChange方法,根据不同的ConnectionObserver.State状态对Channel执行相应操作。以PooledConnectionProvider#acquire的源码为例,展示了如何在获取Channel时预先配置ConnectionObserver和channelOperationFactory,进而根据应用场景进行灵活的定制与配置。
为了深入了解ConnectionObserver与channelOperation在服务器端的协同工作,我们首先通过回顾PooledConnectionProvider的配置机制,了解到在服务器端配置ConnectionObserver的途径。具体来说,利用reactor.netty.http.server.HttpServerOperator#tcpConfiguration这一概念,我们引入了HttpServerHandle这一设计思路,以实现针对TcpServer的配置操作。通过这一配置,我们可以将特定的业务逻辑包装在HttpServerHandle中,从而实现在服务器端与客户端之间建立连接时的定制化处理。
深入分析后,我们发现HttpServerHandle的实现不仅封装了ConnectionObserver,还通过其onStateChange方法,实现了对特定HTTP状态的响应处理。在服务器端的场景下,当BiFunction类型的handler执行完毕,通过调用HttpServerOperations的onComplete方法,我们可以控制后续的操作流程。这一过程与Channel生命周期做核心功能中的内容相呼应,展示了如何在Netty的操作逻辑与自定义管理方式之间进行平滑过渡。
进一步探索,我们揭示了NettyOutbound在消息传递过程中的重要作用,以及它在HTTP协议下响应式收尾包装的具体实现。NettyOutbound#then方法提供了链式操作的便利性,允许开发者在发送请求或响应信息时,通过此方法进行额外的逻辑封装。在客户端和服务器端,这一方法都被广泛使用,特别是在HTTP协议的上下文中,它帮助实现了信息的响应式管理与收尾。
在源码解析过程中,我们还深入研究了then操作的实现机制,通过ReactorNetty.OutboundThen的分析,我们了解到如何在标准接口层面上定义操作规则,并通过Mono#thenEmpty方法实现链式操作的特殊场景处理。这一机制允许开发者在特定条件下,基于Mono#thenEmpty的返回结果,进行更加灵活的逻辑扩展与实现。
综上所述,Reactor Netty中的消息处理逻辑通过ConnectionObserver与channelOperation的紧密配合,以及NettyOutbound和NettyOutbound#then方法的巧妙运用,构建了一套高效、灵活的消息传递机制。这一机制不仅实现了与Netty操作逻辑的平滑对接,还提供了丰富的自定义能力,使得开发者能够根据实际需求进行高度定制化的消息处理流程设计。