1.从原理剖析带你理解Stream
2.Flink深入浅出:JDBC Connector源码分析
3.Reactive Spring实战 -- 理解Reactor的源码设计与实现
4.Flux和Mono的常用API源码分析
5.Java Stream流与Optional流浅析
6.Flink Sink的反压优化(Sink异步化)
从原理剖析带你理解Stream
Stream是Java 8提供的新特性,它允许我们以声明式的详解方式处理数据集合,简化了集合操作的源码代码结构。在项目中,详解集合是源码最常用的数据存储结构,当我们需要对集合内的详解iapp如何导入源码元素进行过滤或其他操作时,传统的源码做法是使用for循环。Stream操作分为中间操作与结束操作两大类。详解中间操作仅进行记录,源码直到结束操作才会触发实际计算,详解这种特性称为懒加载,源码使得Stream在处理大规模对象迭代计算时非常高效。详解中间操作又分为有状态与无状态操作,源码有状态操作需要在处理所有元素后才能进行,详解无状态操作则不受之前元素的源码影响。
Stream结构分析揭示了其内部实现机制。每一次中间操作都会生成新的Stream对象,无状态操作的实现类为StatelessOp,有状态操作的实现类为StatefulOp。通过继承关系,我们可以观察到Stream结构的层次性。核心Sink概念在Stream API内部实现中扮演关键角色,Stream API通过重载Sink的接口方法实现了其功能。以filter或map方法为例,源码返回的StatelessOp或StatefulOp对象构成了一个复杂的结构,最终与Sink相关联。Sink对象在Stream执行流程中扮演关键角色,其作用在collect方法中得以体现,通过匿名内部类ReducingSink对象实现元素的收集与处理。动画理解Stream执行流程可以帮助我们更直观地了解其运行机制,从而深入掌握其高效处理数据集合的方法。
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的网络邮箱源码信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
Reactive Spring实战 -- 理解Reactor的设计与实现
Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。它提供了可组合的异步序列API,包括用于多个元素的商城订单源码Flux和用于零到一个元素的Mono。
Reactor Netty项目还支持非阻塞式网络通信,非常适合微服务架构,为HTTP(包括Websockets),TCP和UDP提供了响应式编程基础。本文将通过实例展示和源码阅读,深入分析Reactor的核心设计与实现机制。
Reactor源码基于版本3.3。
响应式编程是一个专注于数据流和变化传递的异步编程范式,允许使用编程语言表示静态或动态数据流。
Reactor中,发布者(Publisher)负责生产数据,订阅者(Subscriber)负责处理和消费数据。创建发布者和订阅者后,通过建立订阅关系,发布者开始生产数据并传递给订阅者。
Flux和Mono是两种发布者类型,分别用于生产多个数据元素和单个数据元素。例如,Flux.range和fromArray等静态方法会返回Flux子类。
Reactor中关键方法包括Publisher#subscribe和Flux#subscribe。订阅者在onSubscribe方法中接收订阅关系,然后通过Subscription#request方法向发布者请求数据。
RangeSubscription#request、Subscriber#onNext和CoreSubscriber的内部逻辑展示了数据流转的过程。Flux子类的subscribe方法创建Subscription,将操作符逻辑转移到Subscriber端。
操作符方法,如skip、distinct、sort和filter,是Reactor的核心,用于处理和组合数据流。例如,myHandler作为订阅者,可以处理生成的Flux子类序列。
Reactor支持push和pull模式。pull模式通过Flux#generate和Sink缓存数据,而push模式则通过Flux#create,允许多线程同时推送数据。
Reactor提供线程与调度器支持,例如parallel、single、boundedElastic和parallel。这些调度器允许在不同线程环境下执行操作。
Reactor中的publishOn和subscribeOn操作符方法用于切换操作上下文,分别影响后续操作和整个链路的线程执行环境。
流量控制是响应式编程中的重要概念,FluxSink.OverflowStrategy定义了在数据生产速度超过消费速度时的策略,如忽略、错误或缓存数据。
Reactor通过实例和源码展示了响应式编程的概念和实现机制,以及如何在实际应用中使用。通过WebFlux和AsyncRestTemplate的比较,将揭示响应式编程带来的优势。
Flux和Mono的常用API源码分析
Flux是一个响应式流,能够生成零个、ninja android源码一个、多个或无限个元素。Flux的产生元素机制主要体现在Flux.just和Flux.empty两个方法上。Flux.just返回的FluxArray内部存储了一个数组,用来保存1个或多个数据,通过ArraySubscription传递给消费者。Flux.empty则返回了一个FluxEmpty实例,当收到消费者注册信号时,会调用Operators的complete方法,消费者会收到一个complete信号,除此之外没有任何操作。
重复流通过创建一个FluxRepeatPredicate对象实现,这个对象在结束时会重新订阅Publisher,从而产生无限数量的流。doOnSignal方法提供了在框架中不消费数据或转变数据的机制,实际上是操作符FluxPeekFuseable,其peek onNext代码逻辑能大致理解其原理。
Mono表示要么有一个元素,要么产生完成或错误信号的Publisher。其then方法有五个重载版本,实际上创建了一个MonoIgnorePublisher,通过源码可以发现,MonoIgnorePublisher将真正的监听者封装为IgnoreElementsSubscriber,然后将事件源监听。Mono和Flux都有Create方法,用于创建对应的序列,Mono的create方法创建了MonoCreate对象,里面包含了MonoSink和一个消费者。Mono的then方法会忽略前面的onNext数据,只会传递给下游完成和错误的信号。then(Mono other)则创建了一个ThenIgnoreMain,并在所有操作完成之后开始下一个流的消费。
Mono和Flux的Create方法创建的对象为MonoCreate和FluxCreate,其中包含了MonoSink或FluxSink和一个消费者。使用using方法可以实现try-with-resource机制,用于包装阻塞API。
在响应式编程中,我们需要处理各种异常情况,确保异常能够传播到需要接收的地方。Publisher分为冷发布者和热发布者,冷发布者在没有订阅者时不会生成数据,而热发布者不论是否有订阅者都会生成数据。冷热发布者可以相互转换,例如使用defer将热操作符转换为冷操作符,或者使用ConnectableFlux将冷操作符转换为热操作符。在多播流中,一个Publisher可以同时给多个消费者提供数据,但只会收到一次的订阅。
FluxPublish对象在publish方法中创建,传入参数包括缓存大小和被包装的队列,这表示了publish方法创建了一个FluxPublish对象。在subscribe阶段,FluxPublish内部的PublishSubscriber会添加到父容器中。在connect方法中,真正订阅数据源,随后PublishSubscriber的新macd源码onSubscribe方法会执行,根据参数拉取数据,onNext方法处理接收到的数据。
本文通过解析Flux和Mono的常用API,揭示了它们在响应式编程中的应用和原理,旨在帮助读者更好地理解并运用这些流式操作符。正确处理异常、理解冷热发布者之间的转换以及掌握多播流的特性,对于构建高效、灵活的数据流处理系统至关重要。
Java Stream流与Optional流浅析
Stream流
1. 操作类型
Stream API中的操作类型主要分为两大类:中间操作和终止操作。中间操作仅作为标记,实际计算会在触发终止操作时进行。
2. Stream的操作过程
首先,我们准备了一些示例代码。在TestStream类中,我们定义了一些测试lambda函数的方法。在main方法中,我们执行了一个相关的流操作,在控制台中并没有看到任何输出。这说明Stream并没有真正执行到对应的方法中,因为我们没有写入终止操作。由此可见,在终止操作之前,Stream并没有真正去执行每个中间操作,而是将中间操作记录了下来。在执行终止操作这一行代码时,再去执行中间操作。
2.1 记录过程
进入源码后,可以看到Collection的Stream方法调用了StreamSupport.stream()方法。在该方法中,返回了一个ReferencePipeline.Head对象,这是记录管道操作的头节点对象。这个Head对象继承了ReferencePipeline对象,所以后续的map、filter等方法实际上是ReferencePipeline对象的方法。在构造方法中,也调用了父类AbstractPipeline类的构造方法。
在Stream中,每一步操作都被定义为一个Stage。在构造方法中,定义了previousStage和sourceStage,即上一个节点和头节点。在类中还有一个nextStage对象。
Stream实际上构建了一个双向链表来记录每一步操作。接下来,我们看一下list.map()方法。
在该方法中,创建了一个StatelessOp对象,它代表无状态的中间操作。这个对象同样继承了ReferencePipeline。在该对象的构造方法中,将调用该初始化方法的节点定义为上一个节点,并且对应的深度depth也进行了+1操作。
我们总结一下,stream()方法得到的是HeadStage,之后每一个操作(Operation)都会创建一个新的Stage,并以双向链表的形式结合在一起。每个Stage都记录了本身的操作。Stream就以此方式实现了对操作的记录。注意,结束操作不算depth的深度,它也不属于stage。但是我们的示例语句中没有写结束操作的代码,所以在这里提一下Stream的Lazy机制。它的特点是:Stream直到调用终止操作时才会开始计算,没有终止操作的Stream将是一个静默的无操作指令。
Stage相关类如下
2.2 执行过程
在了解执行过程之前,我们应该先了解另一个接口Sink,它继承了Consumer接口。在调用map、filter等无状态操作中返回的StatelessOp对象中,覆盖了opWrapSink方法,返回了一个Sink对象,并且将参数中的Sink对象作为构造方法中的参数传入进去。
走进构造方法后,可以看到在该对象中定义了一个downstream,该对象也是一个Sink类型的对象,并且在定义Sink对象时,覆盖了Consumer接口中的accept方法。
不难看出,在执行accept方法时,就是将当前节点的操作结果传入给downstream继续执行,而这个downstream则是通过onWrapSink方法中传入过来的。
了解了以上这些概念,我们可以走进结束操作.collect(Collectors.toList());方法。在该方法中,通过Collectors定义了一个另一个ArrayList收集器,并且传入了collect方法中。
我们暂时只看非并行的部分。在这一行通过ReduceOps定义了一个ReduceOp对象。
在makeRef方法中,返回了一个ReduceOp对象,该对象覆盖了makeSink()方法,返回了一个ReducingSink对象。我们继续往下走,走进evaluate方法中。
可以看出,wrapsink方法中,是查找链表的头节点,并且调用每个节点的onWrapSink方法,在该方法中传入当前节点的sink对象,并且将传入的对象定义成自己的下游,形成一个从头节点到尾部节点的Sink单向链表。
在wrapSink中,通过一层层的前置包装,返回头节点的Sink类传入copyInto方法中。
在该方法中,先调用了wrappedSink.begin()方法,该方法默认实现为调用downstream的begin方法。相当于触发全部Sink的begin方法,做好运行前的准备。
具体循环的执行则是在spliterator.forEachRemaining(wrappedSink);方法中,操作如下
在forEachRemaining方法中,调用了accept方法,也就是在定义onWrapSink方法中初始化Sink对象后定义的accept方法,将自己的执行结果传入downstream继续执行,也就是说,在调用结束操作后才实际执行每个方法。在实际执行过后,在执行end方法进行结束操作。Stream整体的流操作大概就是如此。了解了大概过程后可以找一些常用的case来分析一下。
2.3 具体分析
一般情况下都会选择list作为排序容器,大部分情况下都是不知道容器大小的,于是采用RefSortingSink类作为当前节点处理类,该类代码如下。
可以看到该Sink中的accept方法中,并没有执行下游的accept方法,而是将所有的数据装入了一个ArrayList,在end方法利用arrayList进行排序,并且继续开启后续的循环操作。
3. 代码建议
Flink Sink的反压优化(Sink异步化)
在Flink项目中,我们面临一个场景,即从阿里SLS接收监控指标并进行清洗,然后写入TSDB。起初运行平稳,但在指标数量增加后,发现SLS消费存在延迟问题。因此,我们着手优化Sink的异步处理。
问题的起因和定位涉及到了Sink的同步写入策略。原设计中,每接收到一条数据,Sink就立即同步调用TSDB接口,导致性能受限。为提升效率,我们需要将Sink的处理逻辑转变为异步模式。
异步优化的关键在于引入一个比喻,就像组织会议:首先确定参会者,只有当所有人都到位(即await()方法调用完成)时,会议才能开始。在Flink中,我们通过设置一个栅栏计数器来模拟这个过程,当处理任务(SinkTaskProcessor)完成一个数据写入请求,计数器减一,直到所有任务完成,数据才会被真正写入TSDB。
SinkTaskProcessor是用户必须实现的接口,负责处理数据写入。而AbstractAsyncRichSinkFunction作为抽象类,继承了RichSinkFunction并实现了CheckpointedFunction。AsyncSinkTaskRunnable则是提交到线程池的任务,它负责从数据缓存队列中取出数据,并交给SinkTaskProcessor处理,同时设置了ms的超时防止阻塞。
源代码位于cn.sh.flink.learning.sink.async包下的SlowlyRickSinkTestFunction,这是一个模拟处理耗时任务的类,真正的数据处理工作由SinkTaskProcessor负责。我们鼓励大家试用并提供反馈,如果发现任何问题或有改进意见,欢迎通过私信或issue进行交流。
Miracast技术详解(四):Sink源码解析
Miracast Sink端源码最早出现在Android 4.2.2版本中,可通过android.googlesource.com查看。然而,在Android 4.3版本之后,Google移除了这部分源码,详细移除记录可在android.googlesource.com上查阅。尽管Sink端代码被移除,但Source端源码依然存在。通过使用Android手机的投射功能,仍可实现Miracast投屏发送端的功能。
为了查看源码,推荐使用Android Studio,以便利用IDE的代码提示和类/方法跳转功能。首先新建一个Native Project,将libstagefright相关源码拷贝至cpp目录,并导入必要的include头文件。在CMakeLists.txt中添加这部分源码后,同步环境,以此引用相关类与头文件,提升查看源码的效率。
Sink端核心类主要包括:WifiDisplaySink.cpp、RTPSink.cpp、TunnelRenderer.cpp。通过分析可得知,初始化操作主要在wfd.cpp中的main()方法内完成,重点关注sink->start()方法启动WifiDisplaySink,进而使用ip和端口参数执行相关操作。
RTSP通讯涉及关键步骤,包括创建RTSP TCP连接、处理连接状态与数据异步通知。当连接建立后,开始进行RTSP协商与会话建立,处理RTSP M1-M7指令。请求与响应流程需参考前面的RTSP协议分析文章,这里不详细展开。
处理RTSP消息时,首先判断消息类型,是Request还是Response。对于Request,主要处理Source端M1请求,并响应M2确认。对于Source端M3请求,处理相关属性及能力,如RTP端口号、支持的音频和视频编解码格式等。M4与M5请求则分别进行常规的响应处理。
在发送完Setup M6请求后,注册onReceiveSetupResponse()回调,用于完成RTSP最后一步,即发送PLAY M7请求。此时,Source端会按照Sink指定的UDP端口发送RTP数据包,包含音视频数据。
RTSP协商与会话建立完成后,数据流通过RTPSink处理,建立UDP连接并解析RTP数据包。在TunnelRenderer中接收并播放音视频流。流程包括消息处理、环境初始化、TS包解析、音视频裸流解码与播放等。
源码解析过程中,关键步骤包括初始化RTPSink、建立UDP连接、处理RTP与RTCP数据、解析TS包并获取音视频裸流等。移植Native Sink端难点在于隔离与处理Native相关依赖,如异步消息机制、网络连接实现等。建议在应用层实现RTSP连接、音视频解码与渲染功能,然后移植底层解析代码,以减少依赖,提高移植效率。
超详细!spdlog源码解析(下)
回顾spdlog的组成,包含logger、sink、formatter以及registry四个关键部分。在前两篇中,我们深入探讨了logger、sink和formatter的基本功能与使用方法。这三者协同工作,能够实现日志的记录功能。然而,registry作为管理器角色,主要负责协调和配置这些组件,确保日志系统的一致性和高效性。尽管registry并非必须依赖的组件,它的存在能够提供更加便捷的管理方式,例如统一设置日志等级、创建具有默认配置的logger等。
在默认logger和默认sink的实现中,registry扮演着关键角色。当使用spdlog::info方法时,实际上调用了registry中的default_logger_成员变量,获取默认logger的指针。通过静态方法registry::instance()获取registry对象,最终registry::registry()方法创建默认logger,并选择ansicolor_stdout_sink_mt作为sink,实现控制台彩色输出。这种设计使得用户无需深入了解内部细节,即可直接使用默认配置进行日志输出,简化了用户上手过程。
registry的功能不仅限于管理默认logger,它还提供了创建logger的便利接口。通过一系列预设的logger创建函数,spdlog实现了与不同sink的无缝集成,隐藏了sink的概念,使得用户仅需关注日志输出的目的地,而无需深入理解底层实现。例如,stdout_logger创建函数通过调用Factory::create方法,自动将创建的logger注册到registry中,实现日志输出格式的统一化和全局管理。对于异步环境,async_factory::create方法同样完成了类似功能,但需额外处理线程池的创建。
通过反思registry的实现,我们可以发现,其核心功能在于管理logger,而这一过程包含了将logger注册到registry中的关键步骤。通过提供Factory(如synchronous_factory或async_factory)的create方法,spdlog确保在创建logger后将其自动注册,这一设计与设计模式中的工厂方法原理相契合。实现这一目标的关键在于注册操作,而非创建logger本身,这突显了registry在spdlog系统中的核心作用。
在介绍spdlog的宏定义使用时,我们探讨了其支持的两种编译版本:header-only version和compiled version。header-only version通过将声明与实现分开,提供了轻量级的集成方式。要实现compiled version,只需复制header-only version的代码,并按照特定规则组织文件结构。在async.cpp文件中,通过SPDLOG_COMPILED_LIB宏定义判断编译方式,相应地include声明与实现文件,实现代码的高效复用。同时,SPDLOG_HEADER_ONLY宏定义控制了代码的包含行为,确保了不同编译方式下的代码正确性。
在多平台支持方面,spdlog通过os.h和os-inl.h文件封装了针对不同平台差异的处理逻辑,使得上层业务无需关注底层实现的细节。通过宏定义和条件编译,spdlog能够提供一致的接口,适应不同操作系统和环境的需求,确保跨平台兼容性和稳定性。
至此,spdlog源码解析系列告一段落。通过深入分析spdlog的架构设计、功能实现以及跨平台支持,我们不仅了解了如何高效地使用spdlog进行日志管理,还洞悉了其设计背后的巧妙逻辑和实践细节。希望本系列解析能够为开发者提供宝贵的参考,助力构建更加稳定、高效和易于维护的日志系统。