1.如何实现定时任务- Java Timer/TimerTask 源码解析
2.Nacos源码之配置管理 三TaskManager 任务管理的锁定锁定使用
3.深入p-limit源码,如何限制并发数?
4.Chromium setTimeout/clearTimeout 源码分析
5.深度解析sync WaitGroup源码
如何实现定时任务- Java Timer/TimerTask 源码解析
日常实现各种服务端系统时,任务任务我们一定会有一些定时任务的源码源码需求。比如会议提前半小时自动提醒,解除异步任务定时/周期执行等。锁定锁定那么如何去实现这样的任务任务gamefi成品源码一个定时任务系统呢? Java JDK提供的Timer类就是一个很好的工具,通过简单的源码源码API调用,我们就可以实现定时任务。解除
现在就来看一下java.util.Timer是锁定锁定如何实现这样的定时功能的。
首先,任务任务我们来看一下一个使用demo
基本的源码源码使用方法:
加入任务的API如下:
可以看到API方法内部都是调用sched方法,其中time参数下一次任务执行时间点,解除是锁定锁定通过计算得到。period参数为0的任务任务话则表示为一次性任务。
那么我们来看一下Timer内部是源码源码如何实现调度的。
内部结构
先看一下Timer的组成部分:
Timer有3个重要的模块,分别是 TimerTask, TaskQueue, TimerThread
那么,在加入任务之后,整个Timer是怎么样运行的呢?可以看下面的示意图:
图中所示是简化的逻辑,多个任务加入到TaskQueue中,会自动排序,队首任务一定是当前执行时间最早的任务。TimerThread会有一个一直执行的循环,从TaskQueue取队首任务,判断当前时间是否已经到了任务执行时间点,如果是则执行任务。
工作线程
流程中加了一些锁,875源码用来避免同时加入TimerTask的并发问题。可以看到sched方法的逻辑比较简单,task赋值之后入队,队列会自动按照nextExecutionTime排序(升序,排序的实现原理后面会提到)。
从mainLoop的源码中可以看出,基本的流程如下所示
当发现是周期任务时,会计算下一次任务执行的时间,这个时候有两种计算方式,即前面API中的
优先队列
当从队列中移除任务,或者是修改任务执行时间之后,队列会自动排序。始终保持执行时间最早的任务在队首。 那么这是如何实现的呢?
看一下TaskQueue的源码就清楚了
可以看到其实TaskQueue内部就是基于数组实现了一个最小堆 (balanced binary heap), 堆中元素根据 执行时间nextExecutionTime排序,执行时间最早的任务始终会排在堆顶。这样工作线程每次检查的任务就是当前最早需要执行的任务。堆的初始大小为,有简单的倍增扩容机制。
TimerTask 任务有四种状态:
Timer 还提供了cancel和purge方法
常见应用
Java的Timer广泛被用于实现异步任务系统,在一些开源项目中也很常见, 例如消息队列RocketMQ的 延时消息/消费重试 中的异步逻辑。
上面这段代码是RocketMQ的延时消息投递任务 ScheduleMessageService 的核心逻辑,就是使用了Timer实现的异步定时任务。
不管是实现简单的异步逻辑,还是构建复杂的任务系统,Java的shib源码Timer确实是一个方便实用,而且又稳定的工具类。从Timer的实现原理,我们也可以窥见定时系统的一个基础实现:线程循环 + 优先队列。这对于我们自己去设计相关的系统,也会有一定的启发。
Nacos源码之配置管理 三TaskManager 任务管理的使用
在Nacos的源码中,TaskManager是一个核心组件,它负责管理一系列必须成功执行的任务,以单线程的方式确保任务的执行。TaskManager内部包含待处理的AbstractTask集合和对应的TaskProcessor,后者是执行任务的接口,不同的任务类型需实现自己的执行逻辑。以配置中心的配置文件Dump为例,Nacos会定期将数据库中的数据备份到磁盘,这个操作通过定义的DumpTask和其对应的DumpProcessor来实现。
DumpTask定义了必要的属性,而DumpProcessor则是专门处理DumpTask的任务处理器,其核心功能是将配置文件保存到磁盘并计算MD5。类似地,DumpAllTask和DumpAllBetaTask也有对应的处理器,如DumpAllProcessor和DumpAllBetaProcessor。
DumpAllTask的任务触发和执行发生在DumpService类中,该服务负责初始化配置信息的备份。在初始化时,会创建一个DumpAllProcessor执行器,并启动一个线程,BVH源码将默认执行器设置为这个处理器。此后,每隔十分钟,DumpService会向TaskManager添加一个新的DumpAllTask,由线程processingThread处理并执行。
深入p-limit源码,如何限制并发数?
并发处理在现代编程中扮演着至关重要的角色,尤其在异步操作和并行任务处理中。虽然JavaScript是单线程执行的,但它通过Promise.all等API实现了并发效果,允许同时处理多个异步操作。
Promise.all是Promise库中的一个关键函数,它接受一个Promise数组作为参数。此函数会等待所有给定的Promise实例全部完成或其中一个失败,然后返回一个新Promise的数组结果。如果所有Promise都成功,则返回所有成功结果的数组;如果一个或多个Promise被拒绝,则返回第一个拒绝的Promise的reason。
然而,有时并发操作需要被限制。过多的并发请求可能给服务器带来压力,影响性能。这时候,p-limit库就显得尤为重要,它允许我们为并发操作设置一个上限。
p-limit提供了pLimit函数来定义并发限制。浪子源码使用pLimit时,你可以传入一个数量参数,这个参数决定了同时可以执行的异步任务数量。函数返回一个新函数,该函数接收需要并发执行的异步任务。当执行队列中的任务数量达到上限时,新传入的任务会被加入队列,等待前面的任务释放资源后执行。
p-limit的实现中,核心在于初始化一个计数器和一个任务队列。队列采用了yocto-queue库实现,它提供了一个基于链表的队列结构。在并发处理过程中,p-limit通过enqueue函数将异步任务入队,并在队列中管理任务的执行顺序和限制。
enqueue函数负责将异步任务入队,同时对任务进行包装和控制,确保任务在队列中按顺序执行,且不会超过指定的并发限制。这通过使用async函数实现,以确保等待下一个微任务的到来,从而在异步更新的activeCount值上进行比较,以维持并发限制。
在实际执行时,每个任务的执行由run函数控制。此函数在内部管理并发计数,并在任务完成后执行下一个任务,确保并发限制被严格遵守。enqueue、run和next三个函数协同工作,构成了p-limit中一个动态、有限的异步任务执行流程。
此外,p-limit还包含了辅助函数用于管理任务状态,如获取当前执行任务数量(activeCount)、队列中等待任务数量(pendingCount)以及清空任务队列(clearQueue)。这些功能共同协作,确保并发处理既高效又可控。
通过p-limit库,开发人员能够轻松实现异步操作的并发控制,优化性能并防止服务器过载。了解其内部机制,能更好地利用并发处理技术,提升应用响应速度和用户体验。
Chromium setTimeout/clearTimeout 源码分析
Chromium版本.0..3中setTimeout函数的工作流程涉及大量源码,包括线程、消息循环、任务队列和操作系统定时器函数。本文仅分析setTimeout的关键步骤。
setTimeout函数通过创建包含回调函数和延时时间的action对象,调用DOMTimer::Install进行处理。DOMTimer::Install通过DOMTimerCoordinator::InstallNewTimeout向定时器哈希表timers_插入一个定时器对象,生成唯一timeout_id。
timeout_id由NextID生成,每次调用setTimeout返回递增的值,用于唯一标识每个定时器任务。timers_是一个哈希表,存放定时器对象,与任务一一对应。
创建定时器对象时,通过定时器的延时时间获取任务类型,并将回调函数与任务类型关联,最终通过web_task_runner_获取相应的任务运行器,并在TimerBase::SetNextFireTime调用web_task_runner_->PostDelayedTask提交延迟任务。
PostDelayedTask将延迟任务插入到延迟任务队列中,并更新当前线程的唤醒时间。延迟任务队列是优先队列,用于管理按延时时间排序的任务。
通过GetNextScheduledWakeUpImpl获取优先队列的队头任务,创建唤醒任务用于在线程唤醒时执行延迟任务。唤醒任务只包含延时时间,不包含回调函数。
UpdateDelayedWakeUpImpl根据新创建的唤醒任务更新唤醒任务队列。如果延迟任务队列中的任务延时时间较短,新任务可能无法立即进入唤醒任务队列。
调用操作系统定时器函数,如在Mac下调用CFRunLoopTimerSetNextFireDate,在Windows下调用SetTimer,在Android下调用timerfd_settime,在指定延时后唤醒线程。
线程睡眠后,唤醒线程执行已到期的延迟任务,将到期任务从延迟任务队列移出并加入工作队列。ThreadControllerWithMessagePumpImpl::DoWorkImpl找到并执行工作队列中的任务。
面试题:setTimeout延迟时间不准确的原因可能有:硬件层面的时间不准确、操作系统不保证定时器函数的精确性、CPU处理大量定时任务时可能出现部分任务延迟执行。
clearTimeout与clearInterval功能相同,DOMTimer::RemoveByID从timers_哈希表中移除指定timeout_id对应的定时器对象,将回调函数置空,视为任务取消。
深度解析sync WaitGroup源码
waitGroup
waitGroup 是 Go 语言中并发编程中常用的语法之一,主要用于解决并发和等待问题。它是 sync 包下的一个子组件,特别适用于需要协调多个goroutine执行任务的场景。
waitGroup 主要用于解决goroutine间的等待关系。例如,goroutineA需要在等待goroutineB和goroutineC这两个子goroutine执行完毕后,才能执行后续的业务逻辑。通过使用waitGroup,goroutineA在执行任务时,会在检查点等待其他goroutine完成,确保所有任务执行完毕后,goroutineA才能继续进行。
在实现上,waitGroup 通过三个方法来操作:Add、Done 和 Wait。Add方法用于增加计数,Done方法用于减少计数,Wait方法则用于在计数为零时阻塞等待。这些方法通过原子操作实现同步安全。
waitGroup的源码实现相对简洁,主要涉及数据结构设计和原子操作。数据结构包括了一个 noCopy 的辅助字段以及一个复合意义的 state1 字段。state1 字段的组成根据目标平台的不同(位或位)而有所不同。在位环境下,state1的第一个元素是等待线程数,第二个元素是 waitGroup 计数值,第三个元素是信号量。而在位环境下,如果 state1 的地址不是位对齐的,那么 state1 的第一个元素是信号量,后两个元素分别是等待线程数和计数值。
waitGroup 的核心方法 Add 和 Wait 的实现原理如下:
Add方法通过原子操作增加计数值。当执行 Add 方法时,首先将 delta 参数左移位,然后通过原子操作将其添加到计数值上。需要注意的是,delta 的值可正可负,用于在调用 Done 方法时减少计数值。
Done方法通过调用 Add(-1)来减少计数值。
Wait方法则持续检查 state 值。当计数值为零时,表示所有子goroutine已完成,调用者无需等待。如果计数值大于零,则调用者会变成等待者,加入等待队列,并阻塞自己,直到所有任务执行完毕。
通过使用waitGroup,开发者可以轻松地协调和同步并发任务的执行,确保所有任务按预期顺序完成。这在多goroutine协同工作时,尤其重要。掌握waitGroup的使用和源码实现,将有助于提高并发编程的效率和可维护性。
如果您对并发编程感兴趣,希望持续关注相关技术更新,请通过微信搜索「迈莫coding」,第一时间获取更多深度解析和实战指南。