1.ListenableFuture源码解析
2.从HotSpot源码,视频视频深度解读 park 和 unpark
3.Rust并发:标准库sync::Once源码分析
4.LockSupportçparkçå¾
çåºå±å®ç°
5.ReentrantLock源码详细解析
ListenableFuture源码解析
ListenableFuture 是源码源代 spring 中对 JDK Future 接口的扩展,主要应用于解决在提交线程池的码下任务拿到 Future 后在 get 方法调用时会阻塞的问题。通过使用 ListenableFuture,视频视频可以向其注册回调函数(监听器),源码源代当任务完成时,码下web java框架 源码触发回调。视频视频Promise 在 Netty 中也实现了类似的源码源代功能,用于处理类似 Future 的码下场景。
实现 ListenableFuture 的视频视频关键在于 FutureTask 的源码解析。FutureTask 是源码源代实现 Future 接口的基础类,ListenableFutureTask 在其基础上做了扩展。码下其主要功能是视频视频在任务提交后,当调用 get 方法时能够阻塞当前业务线程,源码源代直到任务完成时唤醒。码下
FutureTask 通过在内部实现一个轻量级的 Treiber stack 数据结构来管理等待任务完成的线程。这个数据结构由 WaitNode 节点组成,每个节点代表一个等待的线程。当业务线程调用 get 方法时,会将自己插入到 WaitNode 栈中,并且在插入的同时让当前线程进入等待状态。在任务执行完成后,会遍历 WaitNode 栈,唤醒等待的线程。
为了确保并发安全,程序源码下载地址FutureTask 使用 CAS(Compare and Swap)操作来管理 WaitNode 栈。每个新插入的节点都会使用 CAS 操作与栈顶节点进行比较,并在满足条件时更新栈顶。这一过程保证了插入操作的原子性,防止了并发条件下的数据混乱。同时,插入操作与栈顶节点的更新操作相互交织,确保了数据的一致性和完整性。
在 FutureTask 中,还利用了 LockSupport 类提供的 park 和 unpark 方法来实现线程的等待和唤醒。当线程插入到 WaitNode 栈中后,通过 park 方法将线程阻塞;任务执行完成后,通过 unpark 方法唤醒线程,完成等待与唤醒的流程。
综上所述,ListenableFuture 通过扩展 FutureTask 的功能,实现了任务执行与线程等待的高效管理。通过注册监听器并利用 CAS 操作与 LockSupport 方法,实现了在任务完成时通知回调,解决了异步任务执行时的线程阻塞问题,提高了程序的并发处理能力。
从HotSpot源码,深度解读 park 和 unpark
我最近建立了一个在线自习室(App:番茄ToDO)用于相互监督学习,感兴趣的idea 查看包源码小伙伴可以加入。自习室加入码:D5A7A
Java并发包下的类大多基于AQS(AbstractQueuedSynchronizer)框架实现,而AQS线程安全的实现依赖于两个关键类:Unsafe和LockSupport。
其中,Unsafe主要提供CAS操作(关于CAS,在文章《读懂AtomicInteger源码(多线程专题)》中讲解过),LockSupport主要提供park/unpark操作。实际上,park/unpark操作的最终调用还是基于Unsafe类,因此Unsafe类才是核心。
Unsafe类的实现是由native关键字说明的,这意味着这个方法是原生函数,是用C/C++语言实现的,并被编译成了DLL,由Java去调用。
park函数的作用是将当前调用线程阻塞,而unpark函数则是唤醒指定线程。
park是等待一个许可,unpark是为某线程提供一个许可。如果线程A调用park,除非另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。每次调用一次park,需要有一个unpark来解锁。
并且,声音 hdmi源码输出unpark可以先于park调用,但不管unpark先调用多少次,都只提供一个许可,不可叠加。只需要一次park来消费掉unpark带来的许可,再次调用会阻塞。
在Linux系统下,park和unpark是通过Posix线程库pthread中的mutex(互斥量)和condition(条件变量)来实现的。
简单来说,mutex和condition保护了一个叫_counter的信号量。当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。当_counter=0时线程阻塞,当_counter>0时直接设为0并返回。
每个Java线程都有一个Parker实例,Parker类的部分源码如下:
由源码可知,Parker类继承于PlatformParker,实际上是用Posix的mutex和condition来实现的。Parker类里的_counter字段,就是用来记录park和unpark是否需要阻塞的标识。
具体的执行逻辑已经用注释标记在代码中,简要来说,仿某国际源码就是检查_counter是不是大于0,如果是,则把_counter设置为0,返回。如果等于零,继续执行,阻塞等待。
unpark直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程。源码如下:
(如果不会下载JVM源码可以后台回复“jdk”,获得下载压缩包)
Rust并发:标准库sync::Once源码分析
一次初始化同步原语Once,其核心功能在于确保闭包仅被执行一次。常见应用包括FFI库初始化、静态变量延迟初始化等。
标准库中的Once实现更为复杂,其关键在于如何高效地模拟Mutex阻塞与唤醒机制。这一机制依赖于线程暂停和唤醒原语thread::park/unpark,它们是实现多线程同步对象如Mutex、Condvar等的基础。
具体实现中,Once维护四个内部状态,状态与等待队列头指针共同存储于AtomicUsize中,利用4字节对齐优化空间。
构造Once实例时,初始化状态为Incomplete。调用Once::call_once或Once::call_once_force时,分别检查是否已完成初始化,未完成则执行闭包,闭包执行路径标记为冷路径以节省资源,同时避免泛型导致的代码膨胀。
闭包执行逻辑由Once::call_inner负责,线程尝试获取执行权限,未能获取则进入等待状态,获取成功后执行闭包,结束后唤醒等待线程。
等待队列通过无锁侵入式链表实现,节点在栈上分配,以优化内存使用。Once::wait函数实现等待线程逻辑,WaiterQueue的drop方法用于唤醒所有等待线程,需按特定顺序操作栈节点,以避免use after free等潜在问题。
思考题:如何在实际项目中利用Once实现资源安全共享?如何评估Once与Mutex等同步原语在不同场景下的性能差异?
LockSupportçparkçå¾ çåºå±å®ç°
ä»ä¸ä¸ç¯æç« ä¸çJDKç延è¿éåä¸,æç»æ¯éè¿LockSupport.parkå®ç°çº¿ç¨ççå¾ ï¼é£ä¹åºå±æ¯å¦ä½å®ç°çå¾ åè¶ æ¶çå¾ çï¼æ¬ææ们æ¥æ¢è®¨ä¸ä¸ãLockSupportçparkåunparkçæ¹æ³publicstaticvoidpark(){ UNSAFE.park(false,0L);}publicstaticvoidparkNanos(longnanos){ if(nanos>0)UNSAFE.park(false,nanos);}publicstaticvoidunpark(Threadthread){ if(thread!=null)UNSAFE.unpark(thread);}ä»ä¸é¢å¯ä»¥çå°å®é LockSupport.parkæ¯éè¿Unsafeççparkæ¹æ³å®ç°ï¼ä»ä¸é¢çæ¹æ³å¯ä»¥çåºè¿ä¸ªæ¯ä¸ä¸ªnativeæ¹æ³.
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);JVMçUnsafeçparkæ¹æ³ä»ä¸é¢JDKä¸ä»£ç ä¸å¯ä»¥threadçParkerç对象çparkæ¹æ³è¿è¡ä¸æ®µæ¶é´ççå¾ ã
UNSAFE_ENTRY(void,Unsafe_Park(JNIEnv*env,jobjectunsafe,jbooleanisAbsolute,jlongtime)){ HOTSPOT_THREAD_PARK_BEGIN((uintptr_t)thread->parker(),(int)isAbsolute,time);EventThreadParkevent;JavaThreadParkedStatejtps(thread,time!=0);thread->parker()->park(isAbsolute!=0,time);if(event.should_commit()){ constoopobj=thread->current_park_blocker();if(time==0){ post_thread_park_event(&event,obj,min_jlong,min_jlong);}else{ if(isAbsolute!=0){ post_thread_park_event(&event,obj,min_jlong,time);}else{ post_thread_park_event(&event,obj,time,min_jlong);}}}HOTSPOT_THREAD_PARK_END((uintptr_t)thread->parker());}UNSAFE_ENDThread.hppçæ件ä¸å é¨å®ä¹çPark对象
private:Parker_parker;public:Parker*parker(){ return&_parker;}ä¸é¢æ¯Os_posix.cppä¸æ¯Linuxä¸å®ç°çParkçparkçå®ç°æ¹å¼
é¦å å°_counterçåééè¿CAS设置为0ï¼è¿åå°±æ§çå¼ï¼å¦æä¹åæ¯å¤§äº0ï¼å说ææ¯å 许访é®ï¼ä¸ç¨é»å¡ï¼ç´æ¥è¿åã
è·åå½å线ç¨ã
å¤æ线ç¨æ¯å¦æ¯ä¸æä¸ï¼å¦ææ¯ï¼åç´æ¥è¿åï¼(ä¹å°±æ¯è¯´çº¿ç¨å¤äºä¸æç¶æä¸ä¼å¿½ç¥parkï¼ä¸ä¼é»å¡çå¾ )
å¤æå¦æä¼ å ¥çtimeåæ°å°äº0 æè æ¯ç»å¯¹æ¶é´å¹¶ä¸timeæ¯0,åç´æ¥è¿å,(ä¸é¢çUnsafeè°ç¨parkä¼ å ¥çåæ°æ¯ falseã0ï¼æ以ä¸æ»¡è¶³è¿ç§æ åµ)
å¦ætime大äº0ï¼å转æ¢æç»å¯¹æ¶é´ã
å建ThreadBlockInVM对象ï¼å¹¶ä¸è°ç¨pthread_mutex_trylockè·å线ç¨äºæ¥éï¼å¦æ没æè·åå°éï¼åç´æ¥è¿åï¼
å¤æ_counteråéæ¯å¦å¤§äº0ï¼å¦ææ¯ï¼åéç½®_counter为0ï¼éæ¾çº¿ç¨éï¼ç´æ¥è¿åã
è°ç¨ OrderAccess::fence(); å å ¥å åå±éï¼ç¦æ¢æ令éæåºï¼ç¡®ä¿å éåéæ¾éçæ令ç顺åºã
å建OSThreadWaitState对象ï¼
å¤ætimeæ¯å¦å¤§äº0ï¼å¦ææ¯0ï¼åè°ç¨pthread_cond_waitè¿è¡çå¾ ï¼å¦æä¸æ¯0ï¼ç¶åè°ç¨pthread_cond_timedwaitè¿è¡æ¶é´åæ°ä¸ºabsTimeççå¾ ï¼
è°ç¨pthread_mutex_unlockè¿è¡éæ¾_mutexéï¼
å次è°ç¨OrderAccess::fence()ç¦æ¢æ令éæåºã
//Parker::parkdecrementscountif>0,elsedoesacondvarwait.Unpark//setscountto1andsignalscondvar.Onlyonethreadeverwaits//onthecondvar.Contentionseenwhentryingtoparkimpliesthatsomeone//isunparkingyou,sodon'twait.Andspuriousreturnsarefine,sothere//isnoneedtotracknotifications.voidParker::park(boolisAbsolute,jlongtime){ //Optionalfast-pathcheck://Returnimmediatelyifapermitisavailable.//WedependonAtomic::xchg()havingfullbarriersemantics//sincewearedoingalock-freeupdateto_counter.if(Atomic::xchg(&_counter,0)>0)return;JavaThread*jt=JavaThread::current();//Optionaloptimization--avoidstatetransitionsifthere's//aninterruptpending.if(jt->is_interrupted(false)){ return;}//Next,demultiplex/decodetimeargumentsstructtimespecabsTime;if(time<0||(isAbsolute&&time==0)){ //don'twaitatallreturn;}if(time>0){ to_abstime(&absTime,time,isAbsolute,false);}//Entersafepointregion//Bewareofdeadlockssuchas.//Theper-threadParker::mutexisaclassicleaf-lock.//InparticularathreadmustneverblockontheThreads_lockwhile//holdingtheParker::mutex.Ifsafepointsarependingboththe//theThreadBlockInVM()CTORandDTORmaygrabThreads_lock.ThreadBlockInVMtbivm(jt);//Can'taccessinterruptstatenowthatweare_thread_blocked.Ifwe've//beeninterruptedsincewecheckedabovethen_counterwillbe>0.//Don'twaitifcannotgetlocksinceinterferencearisesfrom//unparking.if(pthread_mutex_trylock(_mutex)!=0){ return;}intstatus;if(_counter>0){ //nowaitneeded_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();return;}OSThreadWaitStateosts(jt->osthread(),false/*notObject.wait()*/);assert(_cur_index==-1,"invariant");if(time==0){ _cur_index=REL_INDEX;//arbitrarychoicewhennottimedstatus=pthread_cond_wait(&_cond[_cur_index],_mutex);assert_status(status==0MACOS_ONLY(||status==ETIMEDOUT),status,"cond_wait");}else{ _cur_index=isAbsolute?ABS_INDEX:REL_INDEX;status=pthread_cond_timedwait(&_cond[_cur_index],_mutex,&absTime);assert_status(status==0||status==ETIMEDOUT,status,"cond_timedwait");}_cur_index=-1;_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();Linuxæä½ç³»ç»æ¯å¦ä½å®ç°pthread_cond_timedwaitè¿è¡æ¶é´çå¾ çpthread_cond_timedwaitå½æ°ä½äºglibcä¸pthread_cond_wait.c, å¯ä»¥çå°æ¯è°ç¨__pthread_cond_wait_commonå®ç°
/*See__pthread_cond_wait_common.*/int___pthread_cond_timedwait(pthread_cond_t*cond,pthread_mutex_t*mutex,conststruct__timespec*abstime){ /*Checkparametervalidity.ThisshouldalsotellthecompilerthatitcanassumethatabstimeisnotNULL.*/if(!valid_nanoseconds(abstime->tv_nsec))returnEINVAL;/*RelaxedMOissufficebecauseclockIDbitisonlymodifiedinconditioncreation.*/unsignedintflags=atomic_load_relaxed(&cond->__data.__wrefs);clockid_tclockid=(flags&__PTHREAD_COND_CLOCK_MONOTONIC_MASK)?CLOCK_MONOTONIC:CLOCK_REALTIME;return__pthread_cond_wait_common(cond,mutex,clockid,abstime);}ä¸é¢__pthread_cond_wait_commonæ¯å®ç°éè¿__futex_abstimed_wait_cancelableå®ç°æ¶é´çå¾
static__always_inlineint__pthread_cond_wait_common(pthread_cond_t*cond,pthread_mutex_t*mutex,clockid_tclockid,conststruct__timespec*abstime){ ''çç¥''`err=__futex_abstimed_wait_cancelable(cond->__data.__g_signals+g,0,clockid,abstime,private);''çç¥''`}__futex_abstimed_wait_cancelableæ¯è°ç¨__futex_abstimed_wait_common
int__futex_abstimed_wait_cancelable(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate){ return__futex_abstimed_wait_common(futex_word,expected,clockid,abstime,private,true);}__futex_abstimed_wait_commonä¸é¢åæ¯éè¿å¤æå¹³å°æ¯ä½æè ä½,è°ç¨__futex_abstimed_wait_commonæè __futex_abstimed_wait_common
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate,boolcancel){ interr;unsignedintclockbit;/*Workaroundthefactthatthekernelrejectsnegativetimeoutvaluesdespitethembeingvalid.*/if(__glibc_unlikely((abstime!=NULL)&&(abstime->tv_sec<0)))returnETIMEDOUT;if(!lll_futex_supported_clockid(clockid))returnEINVAL;clockbit=(clockid==CLOCK_REALTIME)?FUTEX_CLOCK_REALTIME:0;intop=__lll_private_flag(FUTEX_WAIT_BITSET|clockbit,private);#ifdef__ASSUME_TIME_SYSCALLSerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#elseboolneed_time=abstime!=NULL&&!in_time_t_range(abstime->tv_sec);if(need_time){ err=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);if(err==-ENOSYS)err=-EOVERFLOW;}elseerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#endifswitch(err){ case0:case-EAGAIN:case-EINTR:case-ETIMEDOUT:case-EINVAL:case-EOVERFLOW:/*Passedabsolutetimeoutusesbittime_ttype,butunderlyingkerneldoesnotsupportbittime_tfutexsyscalls.*/return-err;case-EFAULT:/*Musthavebeencausedbyaglibcorapplicationbug.*/case-ENOSYS:/*Musthavebeencausedbyaglibcbug.*//*Noothererrorsaredocumentedatthistime.*/default:futex_fatal_error();}}__futex_abstimed_wait_commonæ¯è°ç¨INTERNAL_SYSCALL_CANCELå®å®ä¹å®ç°
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,intop,conststruct__timespec*abstime,intprivate,boolcancel){ if(cancel)returnINTERNAL_SYSCALL_CANCEL(futex_time,futex_word,op,expected,abstime,NULL/*Unused.*/,FUTEX_BITSET_MATCH_ANY);elsereturnINTERNAL_SYSCALL_CALL(futex_time,futex_word,op,expected,abstime,NULL/*Ununsed.*/,FUTEX_BITSET_MATCH_ANY);}ç³»ç»è°ç¨ççå®å®ä¹
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);0æ»ç»ä¸»è¦å¯¹LockSupportçparkçå¾ å®ç°çåºå±å®ç°çæµ æï¼é对äºLinuxçç³»ç»è°ç¨è¿æ²¡ææ¾å°æºç ï¼åç»ä¼ç»§ç»è·è¸ªï¼å¸ææ读è ç¥éç满å¸å¯ä»¥åç¥ä¸ï¼è°¢è°¢ã
é¾æ¥ï¼/post/ReentrantLock源码详细解析
在深入解析ReentrantLock源码之前,我们先了解ReentrantLock与同步机制的关系。ReentrantLock作为Java中引入的并发工具类,由Doug Lea编写,相较于synchronized关键字,它提供了更为灵活的锁管理策略,支持公平与非公平锁两种模式。AQS(AbstractQueuedSynchronizer)作为实现锁和同步器的核心框架,由AQS类的独占线程、同步状态state、FIFO等待队列和UnSafe对象组成。AQS类的内部结构图显示了其组件的构成。在AQS框架下,等待队列采用双向链表实现,头结点存在但无线程,T1和T2节点中的线程可能在自旋获取锁后进入阻塞状态。
Node节点作为等待队列的基本单元,分为共享模式和独占模式,值得关注的是waitStatus成员变量,它包含五种状态:-3、-2、-1、0、1。本文重点讨论-1、0、1状态,-3状态将不涉及。非公平锁与公平锁的差异在于,非公平锁模式下新线程可直接尝试获取锁,而公平锁模式下新线程需排队等待。
ReentrantLock内部采用非公平同步器作为其同步器实现,构造函数中根据需要选择非公平同步器或公平同步器。ReentrantLock默认采用非公平锁策略。非公平锁与公平锁的区别在于获取锁的顺序,非公平锁允许新线程跳过等待队列,而公平锁严格遵循队列顺序。
在非公平同步器的实例中,我们以T1线程首次获取锁为例。T1成功获取锁后,将exclusiveOwnerThread设置为自身,state设置为1。紧接着,T2线程尝试获取锁,但由于state为1,获取失败。调用acquire方法尝试获得锁,尝试通过tryAcquire方法实现,非公平同步器的实现调用具体逻辑。
在非公平锁获取逻辑中,通过CAS操作尝试交换状态。交换成功后,设置独占线程。当当前线程为自身时,执行重入操作,叠加state状态。若获取锁失败,则T2和T3线程进入等待队列,调用addWaiter方法。队列初始化通过enq方法实现,enq方法中的循环逻辑确保线程被正确加入队尾。新线程T3调用addWaiter方法入队,队列初始化完成。
在此过程中,T2和T3线程开始自旋尝试获取锁。若失败,则调用parkAndCheckInterrupt()方法进入阻塞状态。在shouldParkAfterFailedAcquire方法中,当前驱节点等待状态为CANCELLED时,方法会找到第一个非取消状态的节点,并断开取消状态的前驱节点与该节点的连接。若T5线程加入等待队列,T3和T4线程因为自旋获取锁失败进入finally块调用取消方法,找到等待状态不为1的节点(即T2),断开连接。
理解了shouldParkAfterFailedAcquire方法后,我们关注acquireQueued方法的实现。该方法确保线程在队列中正确释放,如果队列的节点前驱为head节点,成功获取锁后,调用setHead方法释放线程。setHead方法通过CAS操作更新head节点,释放线程。acquire方法中的阻塞是为防止线程在唤醒后重新尝试获取锁而进行的额外阻断。
锁的释放过程相对简单,将state减至0,将exclusiveOwnerThread设置为null,完成锁的释放。通过上述解析,我们深入理解了ReentrantLock的锁获取、等待、释放等核心机制,为并发编程提供了强大的工具支持。
2024-11-30 03:28
2024-11-30 03:27
2024-11-30 03:09
2024-11-30 02:33
2024-11-30 01:09
2024-11-30 00:57