皮皮网

【mes系统源码排行】【兵器软件源码】【神舟火箭源码】java 线程池 源码_java线程池源码深度解析

来源:lua源码 云风 时间:2025-01-19 07:50:23

1.Java线程池实现原理及其在美团业务中的线程a线实践
2.手写一个线程池,带你学习ThreadPoolExecutor线程池实现原理
3.java 线程池 工作队列是池源程池如何工作的
4.InheritableThreadLocal源码剖析
5.通过transmittable-thread-local源码理解线程池线程本地变量传递的原理
6.Java并发必会,深入剖析Semaphore源码

java 线程池 源码_java线程池源码深度解析

Java线程池实现原理及其在美团业务中的源码实践

       随着计算机行业的飞速发展,摩尔定律逐渐失效,深度多核CPU成为主流。解析使用多线程并行计算逐渐成为开发人员提升服务器性能的线程a线mes系统源码排行基本武器。J.U.C提供的池源程池线程池ThreadPoolExecutor类,帮助开发人员管理线程并方便地执行并行任务。源码了解并合理使用线程池,深度是解析一个开发人员必修的基本功。本文开篇简述了线程池概念和用途,线程a线接着结合线程池的池源程池源码,帮助大家领略线程池的源码设计思路,最后回归实践,深度通过案例讲述使用线程池遇到的解析问题,并给出了一种动态化线程池解决方案。

       一、写在前面

1.1 线程池是什么

       线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,包括创建销毁线程的开销、调度线程的开销等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。本文描述的线程池是JDK中提供的ThreadPoolExecutor类。

1.2 线程池解决的问题是什么

       线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能确定在任意时刻有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下问题:资源分配问题、线程调度问题等。线程池采用了“池化”思想来解决这些问题。Pooling是将资源统一管理的一种思想,不仅能应用在计算机领域,还在金融、设备、人员管理、工作管理等领域有相关应用。在计算机领域,表现为统一管理IT资源,兵器软件源码包括服务器、存储、网络等,通过共享资源在低投入中获益。

       二、线程池核心设计与实现

       Java中的线程池核心实现类是ThreadPoolExecutor,本文基于JDK 1.8的源码来分析线程池的核心设计与实现。首先,我们通过ThreadPoolExecutor的UML类图了解其继承关系,然后深入探讨其设计与实现。

2.1 总体设计

       ThreadPoolExecutor实现的顶层接口是Executor,提供了一种思想:将任务提交和任务执行进行解耦。用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行。ExecutorService接口增加了能力,如补充可以为一个或一批异步任务生成Future的方法以及提供管控线程池的方法,如停止线程池运行。

       AbstractExecutorService是上层的抽象类,将执行任务的流程串联起来,保证下层实现只需关注执行任务的方法。ThreadPoolExecutor作为最下层的实现类,实现最复杂的运行部分,负责维护自身的生命周期和管理线程与任务,使两者结合执行并行任务。

       ThreadPoolExecutor运行机制分为任务管理和线程管理两部分。任务管理充当生产者的角色,线程池会根据任务的流转决定执行流程。线程管理是消费者,维护线程池内的线程,根据任务请求进行线程分配。

2.2 生命周期管理

       线程池运行状态由内部维护,使用变量控制线程池的运行状态和有效线程数量。线程池内部使用AtomicInteger存储关键参数,实现线程池运行状态和线程数量的高效管理。线程池提供方法供用户获取当前运行状态和线程数量,通过位运算实现快速计算。

       ThreadPoolExecutor的运行状态有五种,包含生命周期转换。

2.3 任务执行机制

2.3.1 任务调度

       任务调度是线程池核心入口,用户提交任务后,决定任务执行流程。通过execute方法完成检查线程池状态、运行线程数和运行策略,神舟火箭源码决定执行流程,如直接申请线程执行或缓冲到队列执行,或直接拒绝任务。执行流程如下。

2.3.2 任务缓冲

       任务缓冲模块实现任务和线程的管理,通过生产者消费者模式和阻塞队列实现。阻塞队列缓存任务,工作线程从队列中获取任务。

2.3.3 任务申请

       任务执行有两种可能:直接由新创建的线程执行或从队列中获取任务执行。线程从任务缓存模块不断获取任务,通过getTask方法实现线程管理和任务管理之间的通信。

2.3.4 任务拒绝

       任务拒绝策略保护线程池,实现拒绝策略接口定制策略或选择JDK提供的四种已有策略。拒绝策略特点如下。

2.4 Worker线程管理

2.4.1 Worker线程

       Worker线程实现Runnable接口,持有线程和任务,通过构造方法创建。Worker线程执行任务模型如下,线程池通过AQS实现独占锁,控制线程生命周期,回收线程。

2.4.2 Worker线程增加

       Worker线程增加通过addWorker方法实现,增加线程时考虑线程池状态,策略在上一步完成,仅完成增加线程并运行,最后返回成功结果。方法参数包括firstTask和core,用于指定任务和线程策略。

2.4.3 Worker线程回收

       Worker线程回收依赖JVM自动回收,线程池维护线程引用,通过添加和移除引用控制线程生命周期。Worker被创建后,不断获取任务执行,核心线程无限等待,非核心线程限时获取。当无法获取任务时,循环结束,Worker主动移除自身引用。

2.4.4 Worker线程执行任务

       Worker线程执行任务通过runWorker方法实现,执行流程如下。

三、线程池在业务中的实践

       业务实践中,线程池用于获取并发性,提供典型场景和问题解决方案。代币开源码

3.1 业务背景

       互联网业界追求CPU多核性能,通过线程池管理线程获取并发性。常见场景包括快速响应用户请求和快速处理批量任务。

3.2 实际问题及方案思考

       线程池使用面临核心问题:参数配置困难。调研替代方案、参数设置合理性以及线程池参数动态化,动态化线程池提供简单有效的方法解决参数修改成本问题。

3.3 动态化线程池

       动态化线程池设计包括整体设计、功能架构,提供参数动态化、监控和告警能力。动态化线程池允许用户在管理平台上修改参数,实时生效,并监控线程池负载、任务执行情况,提供任务级别监控和运行时状态查看。

3.4 实践总结

       面对使用线程池的实际问题,动态化线程池提供成本效益平衡的解决方案,降低故障发生的概率,适用于业务需求。

       四、参考资料

       1. JDK 1.8 源码

       2. 维基百科-线程池

       3. 更好的使用Java线程池

       4. 维基百科Pooling(Resource Management)

       5. 深入理解Java线程池:ThreadPoolExecutor

       6. 《Java并发编程实践》

手写一个线程池,带你学习ThreadPoolExecutor线程池实现原理

       本文旨在通过手写一个线程池,来深入理解ThreadPoolExecutor线程池的实现原理。首先,线程池的核心目标是资源管理和性能优化,通过池化技术减少线程创建和销毁的开销。手写线程池的实现步骤包括确定核心流程和添加辅助流程,虽然代码简单,但能体现核心的池化思想。

       手写线程池的实现涉及到状态管理,如线程池数量和状态的记录,这部分在ThreadPoolExecutor中通过AtomicInteger的高3位和低位实现。线程池的状态流转包括RUNNING、BLOCKED等,并通过execute方法提交任务,这个过程与我们自己的实现类似,包括任务的执行、加入队列和策略决策。

       添加执行任务的过程分为增加线程数量和启动线程,这部分与我们最初的设想基本一致。在runWorker方法中,执行线程的核心是调用task.run(),同时还会涉及队列任务的蓝点无限 源码获取。这些步骤与手写线程池的逻辑相吻合,帮助我们更好地理解线程池的工作机制。

       总结来说,通过对比分析和实践,我们对ThreadPoolExecutor的线程池实现有了更深入的理解,包括状态管理、任务提交和执行流程等。深入阅读源码后,你会发现线程池的复杂性和优化设计。如果你对Java线程池感兴趣,这将是一次很好的学习和实践机会。

java 线程池 工作队列是如何工作的

       使用线程池的好处

       1、降低资源消耗

       可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

       2、提高响应速度

       当任务到达时,任务可以不需要等到线程创建就能立即执行。

       3、提高线程的可管理性

       线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

       线程池的工作原理

       首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

       1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

       2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

       3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

       线程池饱和策略

       这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

       AbortPolicy

       为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。

       DiscardPolicy

       直接抛弃,任务不执行,空方法

       DiscardOldestPolicy

       从队列里面抛弃head的一个任务,并再次execute 此task。

       CallerRunsPolicy

       在调用execute的线程里面执行此command,会阻塞入口

       用户自定义拒绝策略(最常用)

       实现RejectedExecutionHandler,并自己定义策略模式

       下我们以ThreadPoolExecutor为例展示下线程池的工作流程图

       1.jpg

       2.jpg

       1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

       2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

       3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

       4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

       ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

       线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇

       3.jpg

       关键方法源码分析

       我们看看核心方法添加到线程池方法execute的源码如下:

       //     //Executes the given task sometime in the future.  The task     //may execute in a new thread or in an existing pooled thread.     //     // If the task cannot be submitted for execution, either because this     // executor has been shutdown or because its capacity has been reached,     // the task is handled by the current { @code RejectedExecutionHandler}.     //     // @param command the task to execute     // @throws RejectedExecutionException at discretion of     //         { @code RejectedExecutionHandler}, if the task     //         cannot be accepted for execution     // @throws NullPointerException if { @code command} is null     //    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        //         // Proceed in 3 steps:         //         // 1. If fewer than corePoolSize threads are running, try to         // start a new thread with the given command as its first         // task.  The call to addWorker atomically checks runState and         // workerCount, and so prevents false alarms that would add         // threads when it shouldn't, by returning false.         // 翻译如下:         // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,         // 如果能完成新线程创建exexute方法结束,成功提交任务         // 2. If a task can be successfully queued, then we still need         // to double-check whether we should have added a thread         // (because existing ones died since last checking) or that         // the pool shut down since entry into this method. So we         // recheck state and if necessary roll back the enqueuing if         // stopped, or start a new thread if there are none.         // 翻译如下:         // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要         // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;         // 3. If we cannot queue task, then we try to add a new         // thread.  If it fails, we know we are shut down or saturated         // and so reject the task.         // 翻译如下:         // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池         // 已经达到饱和状态,所以reject这个他任务         //        int c = ctl.get();        // 工作线程数小于核心线程数        if (workerCountOf(c) < corePoolSize) {            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize            if (addWorker(command, true))                return;            c = ctl.get();        }        // 如果工作线程数大于等于核心线程数        // 线程的的状态未RUNNING并且队列notfull        if (isRunning(c) && workQueue.offer(command)) {            // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                // 移除成功,拒绝该非运行的任务                reject(command);            else if (workerCountOf(recheck) == 0)                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务                addWorker(null, false);        }        // 如果队列满了或者是非运行的任务都拒绝执行        else if (!addWorker(command, false))            reject(command);    }

InheritableThreadLocal源码剖析

       InheritableThreadLocal是Java中用于在多线程环境共享数据的工具,它允许子线程继承父线程的值,从而避免了线程间数据同步的复杂性。与ThreadLocal不同,InheritableThreadLocal实现了数据的继承机制,确保了数据在父线程到子线程间的顺利传递。这使得在使用线程池或其他线程管理技术时,应用程序能够保持数据的一致性和完整性。

       InheritableThreadLocal提供了一种从父线程到子线程的数据传递方式,它通过在Thread类中引入了inheritableThreadLocals字段来实现这一功能。这一字段是一个ThreadLocalMap类型的对象,专门用于存储InheritableThreadLocal实例。这意味着当创建子线程时,它会自动接收并继承父线程的值。

       实现这一特性,InheritableThreadLocal主要通过三个关键方法:set、get、remove。它们与ThreadLocal的同名方法相似,但操作的内部数据结构有所不同。InheritableThreadLocal的set、get、remove方法会通过获取inheritableThreadLocals字段中的ThreadLocalMap对象来进行操作,而ThreadLocal则通过操作threadLocals字段。

       为了验证InheritableThreadLocal的继承机制,可以通过在父线程中设置InheritableThreadLocal的值,然后在子线程中尝试获取该值来观察结果。实验证明,子线程能够成功获取到父线程设置的值,证明了InheritableThreadLocal的继承功能。

       在使用InheritableThreadLocal时,需要注意的是它的内存管理。一旦线程创建了InheritableThreadLocal实例,它会一直保留在所有后代线程中,直到显式调用remove方法或线程结束。因此,在资源管理和内存控制上,开发者需要特别注意,以防止潜在的内存泄漏问题。

       总之,InheritableThreadLocal通过在Thread类中引入专门的数据结构和方法来实现其独特的继承机制,简化了多线程编程中数据共享和管理的复杂性。然而,其使用需要谨慎,以避免不必要的内存占用和潜在的内存泄漏风险。

通过transmittable-thread-local源码理解线程池线程本地变量传递的原理

       最近几周,我投入了大量的时间和精力,完成了UCloud服务和中间件迁移至阿里云的工作,因此没有空闲时间撰写文章。不过,回忆起很早之前对ThreadLocal源码的分析,其中提到了ThreadLocal存在向预先创建的线程中传递变量的局限性。恰好,我的一位前同事,HSBC的技术大牛,提到了团队引入了transmittable-thread-local(TTL)来解决此问题。借此机会,我深入分析了TTL源码,本文将全面分析ThreadLocal和InheritableThreadLocal的局限性,并深入探讨TTL整套框架的实现。如有对线程池和ThreadLocal不熟悉的读者,建议先阅读相关前置文章,本篇文章行文较为干硬,字数接近5万字,希望读者耐心阅读。

       在Java中,没有直接的API允许子线程获取父线程的实例。获取父线程实例通常需要通过静态本地方法Thread#currentThread()。同样,为了在子线程中传递共享变量,也常采用类似的方法。然而,这种方式会导致硬编码问题,限制了方法的复用性和灵活性。为了解决这一问题,线程本地变量Thread Local应运而生,其基本原理是通过线程实例访问ThreadLocal.ThreadLocalMap来实现变量的存储与传递。

       ThreadLocal与InheritableThreadLocal之间的区别主要在于控制ThreadLocal.ThreadLocalMap的创建时机和线程实例中对应的属性获取方式。通过分析源码,可以清楚地看到它们之间的联系与区别。对于不熟悉概念的读者,可以尝试通过自定义实现来理解其中的原理与关系。

       ThreadLocal和InheritableThreadLocal的最大局限性在于无法为预先创建的线程实例传递变量。泛线程池Executor体系、TimerTask和ForkJoinPool等通常会预先创建线程,因此无法在这些场景中使用ThreadLocal和InheritableThreadLocal来传递变量。

       TTL提供了更灵活的解决方案,它通过委托机制(代理模式)实现了变量的传递。委托可以基于Micrometer统计任务执行时间并上报至Prometheus,然后通过Grafana进行监控展示。此外,TTL通过字节码增强技术(使用ASM或Javassist等工具)实现了类加载时期替换Runnable、Callable等接口的实现,从而实现了无感知的增强功能。TTL还使用了模板方法模式来实现核心逻辑。

       TTL框架的核心类TransmittableThreadLocal继承自InheritableThreadLocal,通过全局静态变量holder来管理所有TransmittableThreadLocal实例。holder实际上是一个InheritableThreadLocal,用于存储所有线程本地变量的映射,实现变量的全局共享。disableIgnoreNullValueSemantics属性的设置可以影响NULL值的处理方式,影响TTL实例的行为。

       发射器Transmitter是TransmittableThreadLocal的一个公有静态类,提供传输TransmittableThreadLocal实例和注册当前线程变量至其他线程的功能。通过Transmitter的静态方法,可以实现捕获、重放和复原线程本地变量的功能。

       TTL通过TtlRunnable类实现了任务的封装,确保在执行任务时能够捕获和传递线程本地变量。在任务执行前后,通过capture和restore方法捕获和重放变量,实现异步执行时上下文的传递。

       启用TTL的Agent模块需要通过Java启动参数添加javaagent来激活字节码增强功能。TTL通过Instrumentation回调激发ClassFileTransformer,实现目标类的字节码增强,从而在执行任务时自动完成上下文的捕捉和传递。

       TTL框架提供了一种高效、灵活的方式来解决线程池中线程复用时上下文传递的问题。通过委托机制和字节码增强技术,TTL实现了无入侵地提供线程本地变量传递功能。如果您在业务代码中遇到异步执行时上下文传递的问题,TTL库是一个值得考虑的解决方案。

Java并发必会,深入剖析Semaphore源码

       在深入理解Java并发编程时,必不可少的是对Semaphore源码的剖析。本文将带你探索这一核心组件,通过实践和源码解析,掌握其限流和共享锁的本质。Semaphore,中文名信号量,就像一个令牌桶,任务执行前需要获取令牌,处理完毕后归还,确保资源访问的有序进行。

       首先,Semaphore主要有acquire()和release()两个方法。acquire()负责获取许可,若许可不足,任务会被阻塞,直到有许可可用。release()用于释放并归还许可,确保资源释放后,其他任务可以继续执行。一个典型的例子是,如果一个线程池接受个任务,但Semaphore限制为3,那么任务将按每3个一组执行,确保系统稳定性。

       Semaphore的源码实现巧妙地结合了AQS(AbstractQueuedSynchronizer)框架,通过Sync同步变量管理许可数量,公平锁和非公平锁的实现方式有所不同。公平锁会优先处理队列中的任务,而非公平锁则按照获取许可的顺序进行。

       acquire()方法主要调用AQS中的acquireSharedInterruptibly(),并进一步通过tryReleaseShared()进行许可更新,公平锁与非公平锁的区别在于判断队列中是否有前置节点。release()方法则调用releaseShared(),更新许可数量。

       Semaphore的简洁逻辑在于,AQS框架负责大部分并发控制,子类只需实现tryReleaseShared()和tryAcquireShared(),专注于许可数量的管理。欲了解AQS的详细流程,可参考之前的文章。

       最后,了解了Semaphore后,我们还将继续探索共享锁CyclicBarrier的实现,敬请期待下篇文章。