【酒链app源码】【网站源码文字粒子】【直播推流 源码】workqueue源码实现

时间:2024-12-01 00:46:09 来源:gauge.js源码 编辑:spa网站源码

1.线程池中空闲的码实线程处于什么状态?
2.Java线程池拒绝策略
3.Linux内核 kthread_worker 和 kthread_work 机制
4.fork/join 全面剖析,你可以不用,码实但是码实不能不懂!
5.线程池调优之动态参数配置
6.Linux 中断( IRQ / softirq )基础:原理及内核实现

workqueue源码实现

线程池中空闲的码实线程处于什么状态?

       一:阻塞状态,线程并没有销毁,也没有得到CPU时间片执行;

       源码追踪:

       for (;;) {

       ...

        workQueue.take();

       ...

       }

       public E take()...{

       ...

       while (count.get() == 0) { / /这里就是任务队列中的消息数量

       notEmpty.await();

       }

       ...

       }

       public final void await()...{

       ...

       LockSupport.park(this);

       ...

       }

       继续往下:

       public static void park(Object blocker) {

       Thread t = Thread.currentThread();

       setBlocker(t, blocker);

       U.park(false, 0L);

       setBlocker(t, null);

       }

       private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();

       //线程调用该方法,线程将一直阻塞直到超时,码实或者是码实酒链app源码中断条件出现。

       public native void park(boolean isAbsolute,码实 long time);

       上面就是java线程池中阻塞的源码追踪;

       二.对比object的wait()方法:

       @FastNative

       public final native void wait(long timeout, int nanos) throws InterruptedException;

       还有Thread的sleep() 方法:

       @FastNative

       private static native void sleep(Object lock, long millis, int nanos)throws...;

       可见,线程池中使用的阻塞方式并不是Object中的wait(),也不是Thread.sleep() ;

       这3个方法最终实现都是通过c&c++实现的native方法.

       三.在<<Java虚拟机(第二版)>>中,对线程状态有以下介绍:

       .4.3 状态转换

       Java语言定义了5种线程状态,在任意一个时间点,码实一个线程只能有且只有其中的码实一种

       状态,这5种状态分别如下。码实

       1)新建(New):创建后尚未启动的码实线程处于这种状态。

       2)运行(Runable):Runable包括了操作系统线程状态中的码实Running和Ready,也就是码实处于此

       状态的线程有可能正在执行,也有可能正在等待着CPU为它分配执行时间。码实

       3)无限期等待(Waiting):处于这种状态的码实线程不会被分配CPU执行时间,它们要等待被

       其他线程显式地唤醒。以下方法会让线程陷入无限期的等待状态:

       ●没有设置Timeout参数的Object.wait()方法。

       ●没有设置Timeout参数的Thread.join()方法。

       ●LockSupport.park()方法。

       4)限期等待(Timed Waiting):处于这种状态的线程也不会被分配CPU执行时间,不过无

       须等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程

       进入限期等待状态:

       ●Thread.sleep()方法。

       ●设置了Timeout参数的Object.wait()方法。

       ●设置了Timeout参数的Thread.join()方法。

       ●LockSupport.parkNanos()方法。

       ●LockSupport.parkUntil()方法。

       5)阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是:“阻塞状态”在等

       待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状

       态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将

       进入这种状态。

       结束(Terminated):已终止线程的线程状态,线程已经结束执行。

Java线程池拒绝策略

       Java线程池在处理超过最大容量时,会采用预定义或自定义的拒绝策略。默认情况下,ThreadPoolExecutor提供了四种策略:

DiscardPolicy:任务被拒绝时不采取任何操作,直接丢弃,源码中表现为一个空的rejectedExecution方法。

AbortPolicy:拒绝时抛出RejectedExecutionException,网站源码文字粒子中断执行流程,线程会捕获这个异常。

CallerRunsPolicy:由提交任务的线程直接执行被拒绝的任务,workQueue中的任务在该线程中运行。

DiscardOldestPolicy:丢弃最旧的任务后尝试重新执行当前任务,workQueue为空或线程池关闭时,当前任务会被丢弃。

       要自定义拒绝策略,可以实现RejectedExecutionHandler接口,如示例中所示,将自定义的handler传递给ThreadPoolExecutor。这样,当线程池无法执行新任务时,就会调用自定义的rejectedExecution方法来处理策略。

Linux内核 kthread_worker 和 kthread_work 机制

       探究 Linux 内核中的 kthread_worker 和 kthread_work 机制,始于我在研究最新版 Linux Spi 驱动时对这部分工作流程的深入了解。kthread_worker 和 kthread_work 实际上是内核线程管理和使用的一种方式,与 work_struct 和 workqueue_struct 机制类似。接下来,让我们从数据结构、使用方式,以及具体实现入手,对 kthread_worker 和 kthread_work 进行深入分析。

       1、数据结构

       定义 kthread_worker 和 kthread_work 的数据结构位于 include/linux/kthread.h 中。观察结构体定义,可以看出它们之间的紧密联系。

       2、使用方式

       kthread_worker 作为核心组件,理解其使用方法至关重要。首先,定义并初始化 kthread_worker。接着,为 kthread_worker 创建一个内核线程,用于处理工作。

       2.1 准备 kthread_worker

       定义 kthread_worker 并初始化它。注意,初始化完成后,需要为 kthread_worker 创建一个内核线程。

       2.2 准备 kthread_work

       定义 kthread_work 并初始化。为它指定工作函数。直播推流 源码

       2.3 启动工作

       准备好了 worker 和 work 后,如有工作需要处理,将工作挂接到 worker 上。

       2.4 执行指定 worker 上的所有工作

       将指定 worker 上的所有工作全部执行。

       2.5 停止当前线程

       了解 Linux 内核源码学习资源。

       3、实现源码

       分析源码的步骤如下:

       3.1 kthread_init_worker

       初始化 kthread_worker。设置成员变量为零,并初始化工作列表。

       3.2 执行线程 kthread_worker_fn

       定义并初始化 kthread_worker 后,调用 kthread_worker_fn 函数,传入 worker 指针。代码逻辑简单,主要涉及状态设置、工作执行等。

       3.3 kthread_init_work

       清零 kthread_work 类型的工作,并初始化链表元素,最后挂接工作执行函数的指针。

       3.4 kthread_queue_work

       将初始化完成的 kthread_worker 和 kthread_work 推进执行。调用 kthread_insert_work 将工作添加至列表中,唤醒沉睡的执行线程。

       4、总结

       kthread_worker 和 kthread_work 机制为 Linux Kernel 提供了一种高效管理内核线程的手段。它们使得驱动等模块开发者能够简便地实现内核线程的使用。

fork/join 全面剖析,你可以不用,但是不能不懂!

       fork/join框架在Java并发包中扮演着重要角色,尤其在Java 8的并行流中。本文将深入剖析其设计思路、核心角色和实现机制。

       首先,fork/join的工作原理是将大任务分解成小任务,并利用多核处理。其特殊之处在于运用了work-stealing算法,通过双端队列分配任务,即使线程处理完一个任务,也能从其他未完成的任务中“窃取”以提高效率。

       核心角色包括ForkJoinPool,作为任务的管理者和线程容器,负责任务的智能组卷 源码提交和workerThread的管理。ForkJoinWorkerThread则是实际执行任务的“工人”,处理队列中的任务,并通过work-stealing机制优化资源利用。WorkQueue是存放任务的双端队列,ForkJoinTask则定义了任务类型,分为有返回值和无返回值两种。

       在初始化阶段,ForkJoinPool通过ForkJoinWorkerThreadFactory创建线程,任务的提交逻辑分为首次提交和任务切分后提交。首次提交会确保队列的创建和加锁,任务切分则在workerThread中进行。任务的消费则由workerThread或非workerThread线程根据任务状态进行处理。

       至于任务的窃取,工作线程在run()方法中通过scan(WorkQueue, int r)函数实现,不断尝试从队列中“窃取”任务,直到找到或者遍历完所有队列。

       尽管文章只是概述,深入研究fork/join的源码是理解其内在机制的关键,这将有助于在实际开发中更有效地利用并发框架。

线程池调优之动态参数配置

       前言线程池的核心参数配置在网上有一大堆的文章介绍,这次结合个人理解写一篇文章记录一下,以便加深印象和后续查阅。线程池配置参数

       corePoolSize:线程池核心线程数

       maximumPoolSize:线程池最大线程数

       keepAliveTime:允许线程空闲时间(对非核心工作线程的回收)

       TimeUnit:线程空闲时间单位

       workQueue:线程队列(当核心线程数满了,新的任务就会放入这个队列中)

       threadFactory:线程工厂(用于创建工作线程,自定义线程工厂可以指定线程名称)

       handler:线程池拒绝策略(当线程队列满了且最大线程数也满了,就会执行任务拒绝策略,默认有4种)

       allowCoreThreadTimeOut:控制核心工作线程是否需要被回收

常规线程池参数配置

       -首先提问一个面试题:现有个任务,台服务器,每台机器都是4核,在任务不丢弃情况下,线程池参数该怎么配置最合理呢?

       -把这个问题拆分一下,个任务,台机器,那么每台机器就负责个任务(常规轮训负载均衡模式,不考虑其他额外情况),每台机器都是4核,那么就可以设置核心线程数和最大线程数为4,线程队列大小为即可。

       -当然也可以把核心和最大线程数设置为5(n+1)个,线程队列大小为,这样是ps炫酷源码为了防止线程偶尔由于页缺失故障或者其他原因暂停,出多来的一个线程也能确保CPU的调度时钟周期不会被浪费,相当于备用线程。

       如果任务是CPU密集型配置:工作线程=cpu核心数+1;

       如果任务是IO密集型场景:工作线程=cpu核心数*2;

       所以上面例子中就是基于CPU密集型任务配置线程池。而且网上大部分文章描述线程池配置也是基于这两点来分析的。

       可惜理想很丰满,现实很骨感。在实际工作场景中,其实没那么容易区分线程中执行的任务是CPU密集还是IO密集,而且服务器上还会有其他应用线程抢占CPU资源,就算还有一些其他的公式计算配置线程池参数,那也是基于理想场景情况下进行配置的,所以上述配置更多的还是应用于面试中。

动态配置线程池参数

       上述中既然不能一次定义适配所有场景的线程池参数,那么如果可以根据不同业务场景动态配置线程池参数,通过人工干预介入来适配大部分场景也行的

       正好在JDK的自定义线程池ThreadPoolExecutor里,提供了动态扩展线程池核心参数的方法

       可以在运行期间的线程池使用此方法可以覆盖原来配置的值:

ThreadPoolExecutor线程池提供了5种配置参数可供动态更新:核心线程池,最大线程数,线程工厂,线程空闲时间,拒绝策略。

       这里主要讨论的是核心线程池和最大线程池两种参数配置:

/****@Author:ZRH*@Date://:*/@Slf4jpublicclassExecutorTest{ publicstaticvoidmain(String[]args)throwsException{ finalThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,3,,TimeUnit.SECONDS,newLinkedBlockingQueue<>(7),newThreadPoolExecutor.DiscardPolicy());for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}logExecutorInfo(threadPoolExecutor);threadPoolExecutor.setCorePoolSize(5);threadPoolExecutor.setMaximumPoolSize(5);logExecutorInfo(threadPoolExecutor);Thread.currentThread().join();}privatestaticvoidlogExecutorInfo(ThreadPoolExecutorexecutor){ log.info("线程池核心线程数="+executor.getCorePoolSize()+",线程池最大线程数="+executor.getMaximumPoolSize()+",线程池队列剩余任务="+executor.getQueue().size()+",线程池活跃线程数="+executor.getActiveCount()+",线程池任务完成数"+executor.getCompletedTaskCount());}}

       看执行结果:刚开始线程池里核心线程数2个、最大线程数3个、剩下7放队列。活跃的线程也只有3个。

       然后更改核心线程和最大线程数为5后,线程池里对应的核心线程数和最大线程数也增加至5个,活跃的工作线程也是5个。说明更改配置成功。

       注:更新线程池参数时,核心线程数不能超过最大线程数配置。否则配置最后不会生效。

publicstaticvoidmain(String[]args)throwsException{ finalThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,3,,TimeUnit.SECONDS,newLinkedBlockingQueue<>(7),newThreadPoolExecutor.DiscardPolicy());for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}logExecutorInfo(threadPoolExecutor);threadPoolExecutor.setCorePoolSize(5);//threadPoolExecutor.setMaximumPoolSize(5);logExecutorInfo(threadPoolExecutor);Thread.currentThread().join();}

       上图中把核心线程数更新为5,最大线程数不改动任为3。最后看执行结果,最终的活跃线程还是3个,说明配置没有生效,具体源码在ThreadPoolExecutor类的getTask()方法里,感兴趣的同学可以去看一下...

动态更新线程队列

       ThreadPoolExecutor线程池并没有动态配置线程池队列大小的方法

       想自己操作一下也是很简单的,只需要自定义实现一个队列,可以直接把LinkedBlockingQueue复制一份,并把capacity参数设定为可更改

publicstaticvoidmain(String[]args)throwsException{ finalThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,3,,TimeUnit.SECONDS,newCustomLinkedBlockingQueue<>(7),newThreadPoolExecutor.DiscardPolicy());for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}logExecutorInfo(threadPoolExecutor);threadPoolExecutor.setCorePoolSize(5);threadPoolExecutor.setMaximumPoolSize(5);CustomLinkedBlockingQueuequeue=(CustomLinkedBlockingQueue)threadPoolExecutor.getQueue();queue.setCapacity();for(inti=0;i<;i++){ threadPoolExecutor.execute(()->{ try{ logExecutorInfo(threadPoolExecutor);Thread.sleep();}catch(InterruptedExceptione){ }});}Thread.currentThread().join();}

       看结果,后续添加的任务会放入队列中,并且队列大小也超过第一次设置大小,说明配置成功

最后

       参考:Java线程池实现原理及其在美团业务中的实践

       虚心学习,共同进步-_-

Linux 中断( IRQ / softirq )基础:原理及内核实现

       中断(IRQ),尤其是软中断(softirq)的广泛用途之一是网络数据包的接收与发送,但其应用场景并非单一。本文将全面整理中断(IRQ)与软中断(softirq)的基础知识,这些内容与网络数据包处理虽无直接联系,但整理本文旨在更深入地理解网络数据包处理机制。

       什么是中断?

       CPU 通过时分复用处理多任务,其中包括硬件任务,如磁盘读写、键盘输入,以及软件任务,如网络数据包处理。CPU 在任何时刻只能执行一个任务。当某个硬件或软件任务当前未被执行,但希望CPU立即处理时,会向CPU发送中断请求——希望CPU暂停手头工作,优先服务“我”。中断以事件形式通知CPU,因此常看到“在XX条件下会触发XX中断事件”的表述。

       中断分为两类:

       管理中断的设备:Advanced Programmable Interrupt Controller(APIC)。

       硬中断的中断处理流程

       中断随时发生,处理流程如下:

       Maskable and non-maskable

       Maskable interrupts 在x_上可以通过sti/cli指令来屏蔽(关闭)和恢复:

       在屏蔽期间,这种类型的中断不会触发新的中断事件。大部分IRQ都属于这种类型。例如,网卡的收发包硬件中断。

       Non-maskable interrupts 不可屏蔽,因此属于更高优先级的类型。

       问题:执行速度与逻辑复杂性之间的矛盾

       IRQ处理器的两个特点如下:

       存在内在矛盾。

       解决方式:中断的推迟处理(deferred interrupt handling)

       传统解决方式是将中断处理分为两部分:

       这种方式称为中断的推迟处理或延后处理。现在已是一个通用术语,涵盖各种推迟执行中断处理的方式。中断分为两部分处理:

       在Linux中,有三种推迟中断(deferred interrupts):

       具体细节将在后续介绍。

       软中断与软中断子系统

       软中断是内核子系统的一部分:

       每个CPU上会初始化一个ksoftirqd内核线程,负责处理各种类型的softirq中断事件;

       使用cgroup ls或ps -ef都能看到:

       软中断事件的handler提前注册到softirq子系统,注册方式为open_softirq(softirq_id, handler)

       例如,注册网卡收发包(RX/TX)软中断处理函数:

       软中断占用了CPU的总开销:可以使用top查看,第三行倒数第二个指标是系统的软中断开销(si字段):

       Linux内核源码分析学习地址:ke.qq.com/course/...

       文章福利小编推荐自己的Linux内核源码分析交流群:点击加入整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!

       主处理

       smpboot.c类似于事件驱动的循环,会调度ksoftirqd线程执行pending的软中断。ksoftirqd内部会进一步调用到__do_softirq,

       避免软中断占用过多CPU

       软中断的潜在影响:推迟执行的部分(如softirq)可能会占用较长时间,在这段时间内,用户空间线程只能等待。反映在top中,si占比。

       不过softirq调度循环对此有所改进,通过budget机制来避免softirq占用过多CPU时间。

       硬中断-软中断调用栈

       softirq是一种推迟中断处理机制,将IRQ的大部分处理逻辑推迟在这里执行。有两条路径都会执行到softirq主处理逻辑__do_softirq():

       1、CPU调度到ksoftirqd线程时,会执行到__do_softirq();

       2、每次IRQ handler退出时:do_IRQ()->...

       do_IRQ是内核中主要的IRQ处理方式。它执行结束时,会调用exiting_irq(),这会展开成irq_exit()。后者会检查是否pending有softirq,如果有,则唤醒:

       进而会使CPU执行到__do_softirq。

       软中断触发执行的步骤

       总结,每个软中断会经过以下阶段:

       以收包软中断为例,IRQ handler并不执行NAPI,只是触发它,在内部会执行到raiseNET_RX_SOFTIRQ;真正的执行在softirq,会调用网卡的poll()方法收包。IRQ handler中会调用napi_schedule(),然后启动NAPI poll()。

       需要注意的是,虽然IRQ handler所做的工作很少,但处理这个包的softirq和IRQ在同一CPU上运行。这意味着,如果大量的包都放在同一个RX队列,虽然IRQ开销可能不多,但该CPU仍然会非常繁忙,都花费在softirq上。解决方式:RPS。它不会降低延迟,只是将包重新分配:RXQ->CPU。

       三种推迟执行方式(softirq/tasklet/workqueue)

       提到,Linux中的三种推迟中断执行方式:

       其中:

       前面已经看到,Linux在每个CPU上创建了一个ksoftirqd内核线程。

       softirqs是在Linux内核编译时确定的,例如网络收包对应的NET_RX_SOFTIRQ软中断。因此是一种静态机制。如果想添加一种新softirq类型,需要修改并重新编译内核。

       内部组织

       内部由一个数组(或称为向量)管理,每个软中断号对应一个softirq handler。数组与注册:

       在5.中所有类型的softirq:

       也就是在cat /proc/softirqs看到的哪些。

       触发(唤醒)softirq

       以收包软中断为例,IRQ handler并不执行NAPI,只是触发它,在内部会执行到raiseNET_RX_SOFTIRQ;真正的执行在softirq,会调用网卡的poll()方法收包。IRQ handler中会调用napi_schedule(),然后启动NAPI poll()。

       如果对内核源码有一定了解,会发现softirq使用非常有限,原因之一是它是静态编译的,依赖内置的ksoftirqd线程来调度内置的9种softirq。如果想添加一种新功能,就得修改并重新编译内核,开发成本很高。

       实际上,实现推迟执行的更常用方式是tasklet。它构建在softirq机制之上,具体来说就是使用了两种softirq:

       换句话说,tasklet是在运行时(runtime)创建和初始化的softirq,

       内核软中断子系统初始化了两个per-cpu变量:

       tasklet再执行针对list的循环:

       tasklet在内核中的使用非常广泛。不过,后面又出现了第三种方式:workqueue。

       这也是一种推迟执行机制,与tasklet有些相似,但有显著不同。

       使用场景

       简而言之,workqueue子系统提供了一个接口,通过该接口可以创建内核线程来处理从其他地方enqueue过来的任务。这些内核线程称为worker threads,内置的per-cpu worker threads:

       结构体

       kworker线程调度workqueues,原理与ksoftirqd线程调度softirqs类似。然而,我们可以为workqueue创建新的线程,而softirq则不行。

       参考资料引用链接

       [1]

       中断与中断处理:0xax.gitbooks.io/linux-...

       作者:赵亚楠 原文:arthurchiao.art/blog/li...来源:云原生实验室

深入理解k8s -- workqueue

       深入理解k8s -- workqueue

       在探讨k8s中的informer组件时,workqueue是一个关键角色。在前文的Controller源码分析中,workqueue的使用已经有所提及。工作队列是k8s中用于处理资源变更事件和调度任务的高效机制。它支持三种类型的队列:简单的FIFO队列、延时队列以及限速队列。

       工作队列通过一个名为Type的底层数据结构来实现,它实现了workqueue.Interface接口。Type结构体包含queue、dirty和processing三个重要字段,以及一个golang原生的条件锁cond。queue用于存储待处理的任务,dirty和processing用于管理任务的添加和完成状态。cond用于控制多个协程的同步操作。

       接下来,我们通过源码深入Type的方法实现,如Add、Get和Done。Add方法简单地将任务添加到queue、dirty和processing中。Get方法包含删除逻辑,同时会检查dirty中是否已有数据,若无,则从queue中取出任务。Done方法用于清理processing状态,确保任务正确处理并移出队列。Get和Done方法之间的配合保证了任务的正确执行和管理。

       在处理资源变更事件时,工作队列的作用尤为明显。在事件触发后,队列将资源变更事件加入到队列中,由Controller进行处理。Controller通过工作队列的Get方法获取待处理的任务,执行处理逻辑,然后调用Done方法将任务标记为完成。这种机制保证了资源变更事件能够被及时且有序地处理。

       除了基础的FIFO队列,k8s还提供了更高级的队列类型,如延时队列和限速队列。延时队列允许用户指定任务的延迟时间,即在特定时间后才将任务加入队列。这有助于优化资源的处理顺序和负载均衡。限速队列则进一步增强了队列功能,通过限速器动态调整任务的处理速率,避免系统过载或资源浪费。

       限速队列基于延时队列实现,通过引入限速器来控制任务的处理速率。常见的限速器包括BucketRateLimiter、ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter和MaxOfRateLimiter。这些限速器可以根据不同需求灵活配置,实现资源的高效管理和优化。

       总结而言,工作队列是k8s中实现资源变更事件处理和任务调度的核心组件,通过简单、延时和限速队列的不同组合,可以满足各种复杂场景的需求,实现资源管理的高效、有序和灵活。

copyright © 2016 powered by 皮皮网   sitemap