1.UE4源码剖析——异步与并行 中篇 之 Thread
2.ListenableFuture源码解析
3.Ray 源码解析(一):任务的任务任务状态转移和组织形式
4.如何实现定时任务- Java Timer/TimerTask 源码解析
5.深度解析sync WaitGroup源码
6.Nacos源码之配置管理 三TaskManager 任务管理的使用
UE4源码剖析——异步与并行 中篇 之 Thread
我们知道UE中的异步框架分为TaskGraph与Thread两种,上篇教程我们学习了TaskGraph,程序程序它擅长处理有依赖关系的源码源码短任务;本篇教程我们将学习Thread,它与TaskGraph相反,任务任务它更擅长于处理长任务。程序程序而下一篇文章,源码源码秀可可 源码我们则会承接Thread,任务任务去学习一下引擎中一些重要的程序程序线程。
Thread擅长处理长任务,源码源码从长任务生命周期这个层面来看,任务任务我们可以先把长任务分为两类:常驻型长任务与非常驻型长任务。程序程序
常驻型长任务侧重于并行,源码源码通常用于监听式服务,任务任务例如网络传输,程序程序使用单独的源码源码线程对网络进行监听,每当有网络数据包到达时,线程接收并处理后,不会立即结束,而是重置部分状态,继续监听,等待下一轮数据包。
非常驻型长任务侧重于异步,通常用于数据处理,例如主线程为了提高性能,避免卡顿,会将一些重负载的运算任务分发给分线程处理,可能分批给多条分线程,主线程继续运行其他逻辑。任务处理完成后,将结果返回给主线程,分线程可销毁。
接下来,我们通过两个例子学习Thread的使用。
计算由N到M(N和M为大数字)所有数字的和。使用Thread异步调用,将计算操作交由分线程执行,计算完成后再通知主线程结果,代码实现如下:
逻辑分为两部分:启动分线程计算数字和,使用Async函数,微信商城源码.net参数为EAsyncExecution::Thread,创建新线程执行。学习Async函数用法,该函数返回TFuture对象,代表未来状态,当前无法获取结果,但在未来某个时刻状态变为Ready,此时可通过TFuture获取结果。
主线程注册回调,等待分线程计算完成,使用TFuture的Then函数,完成时触发注册的回调,也可使用Wait系列函数等待计算完成。
接下来学习常驻型任务使用。
定义玩家血量上限点,当前点,当血量未满时,每0.2秒恢复1点血量。代码实现分为创建生命治疗仪FRunnable对象、重写Run函数、创建FRunnableThread线程、测试恢复功能和释放线程资源。
生命治疗仪创建与测试完整代码如下,可验证生命恢复功能和暂停与恢复。
UE4中的FRunnable与FRunnableThread提供创建常驻型任务所需接口。无论是常驻型还是非常驻型,底层实现相同,都是使用FRunnableThread线程。
FRunnableThread线程结构包含标识符、逻辑功能、效率与性能、辅助调试字段。线程创建与生命周期分为创建FRunnable类对象、创建FRunnableThread对象两步,通过FRunnable的生命周期管理实现线程运行与停止。
UE4线程管理流程包括继承并创建FRunnable类对象、创建FRunnableThread对象,生命治疗仪线程创建代码。ecshop商创版源码
UE4中的几种异步方式底层使用线程实现,学习了线程类型、创建、生命周期、销毁方法,为下篇学习引擎特殊线程打下基础。
ListenableFuture源码解析
ListenableFuture 是 spring 中对 JDK Future 接口的扩展,主要应用于解决在提交线程池的任务拿到 Future 后在 get 方法调用时会阻塞的问题。通过使用 ListenableFuture,可以向其注册回调函数(监听器),当任务完成时,触发回调。Promise 在 Netty 中也实现了类似的功能,用于处理类似 Future 的场景。
实现 ListenableFuture 的关键在于 FutureTask 的源码解析。FutureTask 是实现 Future 接口的基础类,ListenableFutureTask 在其基础上做了扩展。其主要功能是在任务提交后,当调用 get 方法时能够阻塞当前业务线程,直到任务完成时唤醒。
FutureTask 通过在内部实现一个轻量级的 Treiber stack 数据结构来管理等待任务完成的线程。这个数据结构由 WaitNode 节点组成,每个节点代表一个等待的线程。当业务线程调用 get 方法时,会将自己插入到 WaitNode 栈中,并且在插入的同时让当前线程进入等待状态。在任务执行完成后,会遍历 WaitNode 栈,唤醒等待的线程。
为了确保并发安全,FutureTask 使用 CAS(Compare and Swap)操作来管理 WaitNode 栈。每个新插入的节点都会使用 CAS 操作与栈顶节点进行比较,并在满足条件时更新栈顶。这一过程保证了插入操作的原子性,防止了并发条件下的数据混乱。同时,插入操作与栈顶节点的更新操作相互交织,确保了数据的wap浏览器 源码一致性和完整性。
在 FutureTask 中,还利用了 LockSupport 类提供的 park 和 unpark 方法来实现线程的等待和唤醒。当线程插入到 WaitNode 栈中后,通过 park 方法将线程阻塞;任务执行完成后,通过 unpark 方法唤醒线程,完成等待与唤醒的流程。
综上所述,ListenableFuture 通过扩展 FutureTask 的功能,实现了任务执行与线程等待的高效管理。通过注册监听器并利用 CAS 操作与 LockSupport 方法,实现了在任务完成时通知回调,解决了异步任务执行时的线程阻塞问题,提高了程序的并发处理能力。
Ray 源码解析(一):任务的状态转移和组织形式
Ray源码解析系列的第一篇着重于任务的状态管理和组织形式。Ray的核心设计在于其细粒度、高吞吐的任务调度,依赖于共享内存的Plasma存储输入和输出,以及Redis的GCS来管理所有状态,实现去中心化的调度。任务分为无状态的Task和有状态的Actor Method,后者包括Actor的构造函数和成员函数。
Ray支持显式指定任务的资源约束,通过ResourcesSet量化节点资源,用于分配和回收。在调度时,需找到满足任务资源要求的节点。由于Task输入在分布式存储中,调度后需要传输依赖。对于Actor Method,其与Actor绑定,会直接调度到对应的节点。
状态变化如任务状态转移、资源依赖等信息,都存储在GCS中。任务状态更改需更新GCS,失联或宕机时,根据GCS中的状态信息重试任务。通过GCS事件订阅驱动任务状态变化。最简单的网站源码
文章主要讲述了任务状态的组织方式,如任务队列(TaskQueue)和调度队列(SchedulingQueue)的运作,以及状态转移图和状态枚举类的定义。例如,TaskQueue负责任务的增删查改,其中ReadyQueue通过资源映射优化调度决策。此外,文中还解释了一些关键概念,如Task Required Resources、Task argument、Object、Object Store、Node/Machine等。
后续文章将深入探讨调度策略和资源管理。让我们期待下篇的精彩内容。
如何实现定时任务- Java Timer/TimerTask 源码解析
日常实现各种服务端系统时,我们一定会有一些定时任务的需求。比如会议提前半小时自动提醒,异步任务定时/周期执行等。那么如何去实现这样的一个定时任务系统呢? Java JDK提供的Timer类就是一个很好的工具,通过简单的API调用,我们就可以实现定时任务。
现在就来看一下java.util.Timer是如何实现这样的定时功能的。
首先,我们来看一下一个使用demo
基本的使用方法:
加入任务的API如下:
可以看到API方法内部都是调用sched方法,其中time参数下一次任务执行时间点,是通过计算得到。period参数为0的话则表示为一次性任务。
那么我们来看一下Timer内部是如何实现调度的。
内部结构
先看一下Timer的组成部分:
Timer有3个重要的模块,分别是 TimerTask, TaskQueue, TimerThread
那么,在加入任务之后,整个Timer是怎么样运行的呢?可以看下面的示意图:
图中所示是简化的逻辑,多个任务加入到TaskQueue中,会自动排序,队首任务一定是当前执行时间最早的任务。TimerThread会有一个一直执行的循环,从TaskQueue取队首任务,判断当前时间是否已经到了任务执行时间点,如果是则执行任务。
工作线程
流程中加了一些锁,用来避免同时加入TimerTask的并发问题。可以看到sched方法的逻辑比较简单,task赋值之后入队,队列会自动按照nextExecutionTime排序(升序,排序的实现原理后面会提到)。
从mainLoop的源码中可以看出,基本的流程如下所示
当发现是周期任务时,会计算下一次任务执行的时间,这个时候有两种计算方式,即前面API中的
优先队列
当从队列中移除任务,或者是修改任务执行时间之后,队列会自动排序。始终保持执行时间最早的任务在队首。 那么这是如何实现的呢?
看一下TaskQueue的源码就清楚了
可以看到其实TaskQueue内部就是基于数组实现了一个最小堆 (balanced binary heap), 堆中元素根据 执行时间nextExecutionTime排序,执行时间最早的任务始终会排在堆顶。这样工作线程每次检查的任务就是当前最早需要执行的任务。堆的初始大小为,有简单的倍增扩容机制。
TimerTask 任务有四种状态:
Timer 还提供了cancel和purge方法
常见应用
Java的Timer广泛被用于实现异步任务系统,在一些开源项目中也很常见, 例如消息队列RocketMQ的 延时消息/消费重试 中的异步逻辑。
上面这段代码是RocketMQ的延时消息投递任务 ScheduleMessageService 的核心逻辑,就是使用了Timer实现的异步定时任务。
不管是实现简单的异步逻辑,还是构建复杂的任务系统,Java的Timer确实是一个方便实用,而且又稳定的工具类。从Timer的实现原理,我们也可以窥见定时系统的一个基础实现:线程循环 + 优先队列。这对于我们自己去设计相关的系统,也会有一定的启发。
深度解析sync WaitGroup源码
waitGroup
waitGroup 是 Go 语言中并发编程中常用的语法之一,主要用于解决并发和等待问题。它是 sync 包下的一个子组件,特别适用于需要协调多个goroutine执行任务的场景。
waitGroup 主要用于解决goroutine间的等待关系。例如,goroutineA需要在等待goroutineB和goroutineC这两个子goroutine执行完毕后,才能执行后续的业务逻辑。通过使用waitGroup,goroutineA在执行任务时,会在检查点等待其他goroutine完成,确保所有任务执行完毕后,goroutineA才能继续进行。
在实现上,waitGroup 通过三个方法来操作:Add、Done 和 Wait。Add方法用于增加计数,Done方法用于减少计数,Wait方法则用于在计数为零时阻塞等待。这些方法通过原子操作实现同步安全。
waitGroup的源码实现相对简洁,主要涉及数据结构设计和原子操作。数据结构包括了一个 noCopy 的辅助字段以及一个复合意义的 state1 字段。state1 字段的组成根据目标平台的不同(位或位)而有所不同。在位环境下,state1的第一个元素是等待线程数,第二个元素是 waitGroup 计数值,第三个元素是信号量。而在位环境下,如果 state1 的地址不是位对齐的,那么 state1 的第一个元素是信号量,后两个元素分别是等待线程数和计数值。
waitGroup 的核心方法 Add 和 Wait 的实现原理如下:
Add方法通过原子操作增加计数值。当执行 Add 方法时,首先将 delta 参数左移位,然后通过原子操作将其添加到计数值上。需要注意的是,delta 的值可正可负,用于在调用 Done 方法时减少计数值。
Done方法通过调用 Add(-1)来减少计数值。
Wait方法则持续检查 state 值。当计数值为零时,表示所有子goroutine已完成,调用者无需等待。如果计数值大于零,则调用者会变成等待者,加入等待队列,并阻塞自己,直到所有任务执行完毕。
通过使用waitGroup,开发者可以轻松地协调和同步并发任务的执行,确保所有任务按预期顺序完成。这在多goroutine协同工作时,尤其重要。掌握waitGroup的使用和源码实现,将有助于提高并发编程的效率和可维护性。
如果您对并发编程感兴趣,希望持续关注相关技术更新,请通过微信搜索「迈莫coding」,第一时间获取更多深度解析和实战指南。
Nacos源码之配置管理 三TaskManager 任务管理的使用
在Nacos的源码中,TaskManager是一个核心组件,它负责管理一系列必须成功执行的任务,以单线程的方式确保任务的执行。TaskManager内部包含待处理的AbstractTask集合和对应的TaskProcessor,后者是执行任务的接口,不同的任务类型需实现自己的执行逻辑。以配置中心的配置文件Dump为例,Nacos会定期将数据库中的数据备份到磁盘,这个操作通过定义的DumpTask和其对应的DumpProcessor来实现。
DumpTask定义了必要的属性,而DumpProcessor则是专门处理DumpTask的任务处理器,其核心功能是将配置文件保存到磁盘并计算MD5。类似地,DumpAllTask和DumpAllBetaTask也有对应的处理器,如DumpAllProcessor和DumpAllBetaProcessor。
DumpAllTask的任务触发和执行发生在DumpService类中,该服务负责初始化配置信息的备份。在初始化时,会创建一个DumpAllProcessor执行器,并启动一个线程,将默认执行器设置为这个处理器。此后,每隔十分钟,DumpService会向TaskManager添加一个新的DumpAllTask,由线程processingThread处理并执行。
硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理
深入剖析JUC线程池ThreadPoolExecutor的执行核心 早有计划详尽解读ThreadPoolExecutor的源码,因事务繁忙未能及时整理。在之前的文章中,我们曾提及Doug Lea设计的Executor接口,其顶层方法execute()是线程池扩展的基础。本文将重点关注ThreadPoolExecutor#execute()的实现,结合简化示例,逐步解析。 ThreadPoolExecutor的核心功能包括固定的核心线程、额外的非核心线程、任务队列和拒绝策略。它的设计巧妙地运用了JUC同步器框架AbstractQueuedSynchronizer(AQS),以及位操作和CAS技术。以核心线程为例,设计上允许它们在任务队列满时阻塞,或者在超时后轮询,而非核心线程则在必要时创建。 创建ThreadPoolExecutor时,我们需要指定核心线程数、最大线程数、任务队列类型等。当核心线程和任务队列满载时,会尝试添加额外线程处理新任务。线程池的状态控制至关重要,通过整型变量ctl进行管理和状态转换,如RUNNING、SHUTDOWN、STOP等,状态控制机制包括工作线程上限数量的位操作。 接下来,我们深入剖析execute()方法。首先,方法会检查线程池状态和工作线程数量,确保在需要时添加新线程。这里涉及一个疑惑:为何需要二次检查?这主要是为了处理任务队列变化和线程池状态切换。任务提交流程中,addWorker()方法负责创建工作线程,其内部逻辑复杂,包含线程中断和适配器Worker的创建。 Worker内部类是线程池核心,它继承自AQS,实现Runnable接口。Worker的构造和run()方法共同确保任务的执行,同时处理线程中断和生命周期的终结。getTask()方法是工作线程获取任务的关键,它会检查任务队列状态和线程池大小,确保资源的有效利用。 线程池关闭操作通过shutdown()、shutdownNow()和awaitTermination()方法实现,它们涉及线程中断、任务队列清理和状态更新等步骤,以确保线程池的有序退出。在这些方法中,可重入锁mainLock和条件变量termination起到了关键作用,保证了线程安全。 ThreadPoolExecutor还提供了钩子方法,允许开发者在特定时刻执行自定义操作。除此之外,它还包含了监控统计、任务队列操作等实用功能,每个功能的实现都是对execute()核心逻辑的扩展和优化。 总的来说,ThreadPoolExecutor的execute()方法是整个线程池的核心,它的实现原理复杂而精细。后续将陆续分析ExecutorService和ScheduledThreadPoolExecutor的源码,深入探讨线程池的扩展和调度机制。敬请关注,期待下文的详细解析。