1.【Java原理系列】Java AtomicInteger原理用法源码详解
2.ForkjoinPool -1
3.编程「锁」事|详解乐观锁 CAS 的例源技术原理
4.从HotSpot源码,深度解读 park 和 unpark
5.深入学习CAS底层原理
【Java原理系列】Java AtomicInteger原理用法源码详解
Java的源码原子类AtomicInteger,是解析《Java原理用法示例及代码规范详解系列》的一部分,关注和收藏以获取最新内容。例源它用于在多线程环境中进行安全的源码整数操作,如get(),解析种草app 源码 set(), incrementAndGet(), compareAndSet()等,提高并发性能,例源适用于计数器、源码标记位等场景。解析
AtomicInteger的例源核心原理基于CAS操作,内部使用volatile修饰的源码int变量保证可见性和原子性。CAS操作确保在多线程环境中,解析对整数的例源修改是原子性的,避免了竞态条件和数据不一致。源码如果CAS操作失败,解析它会通过循环重试确保操作成功。
在使用AtomicInteger时,如计数器递增和条件判断,应避免竞态条件。通过额外的同步手段如锁或Lock接口,可以确保整个操作序列是原子的。AtomicInteger提供的方法如getAndIncrement(),保证了这些操作的线程安全。
场景上,jdkapi源码推荐AtomicInteger在计数器、并发任务处理和共享变量的线程安全操作中大显身手。例如,网站访问计数和任务完成数量统计,AtomicInteger确保了这些操作的原子性,输出的计数始终准确。
总的来说,AtomicInteger是处理多线程整数操作的理想选择,为并发编程提供了一种高效且线程安全的解决方案。
ForkjoinPool -1
ForkJoinæ¯ç¨äºå¹¶è¡æ§è¡ä»»å¡çæ¡æ¶ï¼ æ¯ä¸ä¸ªæ大任å¡åå²æè¥å¹²ä¸ªå°ä»»å¡ï¼æç»æ±æ»æ¯ä¸ªå°ä»»å¡ç»æåå¾å°å¤§ä»»å¡ç»æçæ¡æ¶ãForkå°±æ¯æä¸ä¸ªå¤§ä»»å¡åå为è¥å¹²åä»»å¡å¹¶è¡çæ§è¡ï¼Joinå°±æ¯å并è¿äºåä»»å¡çæ§è¡ç»æï¼æåå¾å°è¿ä¸ªå¤§ä»»å¡çç»æãä¸é¢æ¯ä¸ä¸ªæ¯ä¸ä¸ªç®åçJoin/Fork计ç®è¿ç¨ï¼å°1âæ°åç¸å
é常è¿æ ·ä¸ªæ¨¡åï¼ä½ 们ä¼æ³å°ä»ä¹ï¼
Release Framework ï¼ å¸¸è§çå¤ç模åæ¯ä»ä¹ï¼ task pool - worker poolç模åã ä½æ¯Forkjoinpool éåäºå®å ¨ä¸åç模åã
ForkJoinPoolä¸ç§ExecutorServiceçå®ç°ï¼è¿è¡ForkJoinTaskä»»å¡ãForkJoinPoolåºå«äºå ¶å®ExecutorServiceï¼ä¸»è¦æ¯å 为å®éç¨äºä¸ç§å·¥ä½çªå(work-stealing)çæºå¶ãææ被ForkJoinPool管çç线ç¨å°è¯çªåæ交å°æ± åéçä»»å¡æ¥æ§è¡ï¼æ§è¡ä¸åå¯äº§çåä»»å¡æ交å°æ± åä¸ã
ForkJoinPoolç»´æ¤äºä¸ä¸ªWorkQueueçæ°ç»(æ°ç»é¿åº¦æ¯2çæ´æ°æ¬¡æ¹ï¼èªå¨å¢é¿)ãæ¯ä¸ªworkQueueé½æä»»å¡éå(ForkJoinTaskçæ°ç»)ï¼å¹¶ä¸ç¨baseãtopæåä»»å¡éåéå°¾åé头ãwork-stealingæºå¶å°±æ¯å·¥ä½çº¿ç¨æ¨ä¸ªæ«æä»»å¡éåï¼å¦æéåä¸ä¸ºç©ºååéå°¾çä»»å¡å¹¶æ§è¡ã示æå¾å¦ä¸
æµç¨å¾ï¼
poolå±æ§
workQueuesæ¯poolçå±æ§ï¼å®æ¯WorkQueueç±»åçæ°ç»ãexternalPushåexternalSubmitæå建çworkQueue没æowner(å³ä¸æ¯worker)ï¼ä¸ä¼è¢«æ¾å°workQueuesçå¶æ°ä½ç½®ï¼ècreateWorkerå建çworkQueueï¼å³workerï¼æownerï¼ä¸ä¼è¢«æ¾å°workQueuesçå¥æ°ä½ç½®ã
WorkQueueçå 个éè¦æååé说æå¦ä¸ï¼
è¿æ¯WorkQueueçconfigï¼é«ä½è·poolçconfigå¼ä¿æä¸è´ï¼èä½ä½åæ¯workQueueå¨workQueuesæ°ç»çä½ç½®ã
ä»workQueueså±æ§çä»ç»ä¸ï¼æ们ç¥éï¼ä¸æ¯ææworkQueueé½æworkerï¼æ²¡æworkerçworkQueueç§°ä¸ºå ¬å ±éåï¼shared queueï¼ï¼configç第ä½å°±æ¯ç¨æ¥å¤ææ¯å¦æ¯å ¬å ±éåçãå¨externalSubmitå建工ä½éåæ¶ï¼æï¼
q.config = k | SHARED_QUEUE;
å ¶ä¸qæ¯æ°å建çworkQueueï¼kå°±æ¯qå¨workQueuesæ°ç»ä¸çä½ç½®ï¼SHARED_QUEUE=1<<ï¼æ³¨æè¿éconfig没æä¿çmodeçä¿¡æ¯ã
èå¨registerWorkerä¸ï¼åæ¯è¿æ ·ç»workQueueçconfigèµå¼çï¼
w.config = i | mode;
wæ¯æ°å建çworkQueueï¼iæ¯å ¶å¨workQueuesæ°ç»ä¸çä½ç½®ï¼æ²¡æ设置SHARED_QUEUEæ è®°ä½
scanStateæ¯workQueueçå±æ§ï¼æ¯intç±»åçãscanStateçä½ä½å¯ä»¥ç¨æ¥å®ä½å½åworkerå¤äºworkQueuesæ°ç»çåªä¸ªä½ç½®ãæ¯ä¸ªworkerå¨è¢«å建æ¶ä¼å¨å ¶æé å½æ°ä¸è°ç¨poolçregisterWorkerï¼èregisterWorkerä¼ç»scanStateèµä¸ä¸ªåå§å¼ï¼è¿ä¸ªå¼æ¯å¥æ°ï¼å 为workeræ¯ç±createWorkerå建ï¼å¹¶ä¼è¢«æ¾å°WorkQueuesçå¥æ°ä½ç½®ï¼ècreateWorkerå建workeræ¶ä¼è°ç¨registerWorkerã
ç®è¨ä¹ï¼workerçscanStateåå§å¼æ¯å¥æ°ï¼éworkerçscanstateåå§å¼=INACTIVE=1<<ï¼å°äº0ï¼éworkerçworkQueueå¨externalSubmitä¸å建ï¼ã
å½æ¯æ¬¡è°ç¨signalWorkï¼ætryReleaseï¼å¤éworkeræ¶ï¼workerçé«ä½å°±ä¼å 1
å¦å¤ï¼scanState<0表示workeræªæ¿æ´»ï¼å½workerè°ç¨runtaskæ§è¡ä»»å¡æ¶ï¼scanStateä¼è¢«ç½®ä¸ºå¶æ°ï¼å³è®¾ç½®scanStateçæå³è¾¹ä¸ä½ä¸º0ã
workerä¼ç æ¶ï¼æ¯è¿æ ·åå¨ç
workerçå¤é类似è¿æ ·ï¼
å¨workerä¼ç ç4è¡ä¼ªç ä¸ï¼è®©ctlçä½ä½çå¼å为worker.scanStateï¼è¿æ ·ä¸æ¬¡å°±å¯ä»¥éè¿scanStateå¤é该workerãå¤é该workeræ¶ï¼æ该workerçpreStack设置为ctlä½ä½çå¼ï¼è¿æ ·ä¸ä¸æ¬¡å¤éçworkerå°±æ¯scanStateçäºè¯¥preStackçworkerã
è¿ééè¿preStackä¿åä¸ä¸ä¸ªworkerï¼è¿ä¸ªworkeræ¯å½åworkeræ´æ©å°å¨çå¾ ï¼æ以形æä¸ä¸ªåè¿å åºçæ ã
runStateæ¯intç±»åçå¼ï¼æ§å¶æ´ä¸ªpoolçè¿è¡ç¶æåçå½å¨æï¼æä¸é¢å 个å¼ï¼å¯ä»¥å¥½å 个å¼åæ¶åå¨ï¼ï¼
å¦ærunStateå¼ä¸º0ï¼è¡¨ç¤ºpoolå°æªåå§åã
RSLOCK表示éå®poolï¼å½æ·»å workeråpoolç»æ¢æ¶ï¼å°±è¦ä½¿ç¨RSLOCKéå®æ´ä¸ªpoolãå¦æç±äºrunState被éå®ï¼å¯¼è´å ¶ä»æä½çå¾ runState解éï¼é常ç¨waitè¿è¡çå¾ ï¼ï¼å½runState设置äºRSIGNALï¼è¡¨ç¤ºrunState解éï¼å¹¶éç¥ï¼notifyAllï¼çå¾ çæä½ã
å©ä¸4个å¼é½è·runStateçå½å¨ææå ³ï¼é½å¯ä»¥é¡¾åæä¹ï¼
å½éè¦åæ¢æ¶ï¼è®¾ç½®runStateçSTOPå¼ï¼è¡¨ç¤ºåå¤å ³éï¼è¿æ ·å ¶ä»æä½çå°è¿ä¸ªæ è®°ä½ï¼å°±ä¸ä¼ç»§ç»æä½ï¼æ¯å¦tryAddWorkerçå°STOPå°±ä¸ä¼åå建workerï¼
ètryTerminate对è¿äºçå½å¨æç¶æçå¤çåæ¯è¿æ ·çï¼
å½åtopåbaseçåå§å¼ä¸º INITIAL_QUEUE_CAPACITY >>>1= (1 << )>>>1 = /2ãç¶åpushä¸ä¸ªtaskä¹åï¼top+=1ï¼ä¹å°±æ¯è¯´ï¼top对åºçä½ç½®æ¯æ²¡ætaskçï¼æè¿pushè¿æ¥çtaskå¨top-1çä½ç½®ãèbaseçä½ç½®åè½å¯¹åºå°taskï¼base对åºæå æ¾è¿éåçtaskï¼top-1对åºæåæ¾è¿éåçtaskã
qlockå¼å«ä¹ï¼1: locked, < 0: terminate; else 0
å³å½qlockå¼ä½0æ¶ï¼å¯ä»¥æ£å¸¸æä½ï¼å¼=1æ¶ï¼è¡¨ç¤ºéå®
int SQMASK=0xeï¼åä»»ä½æ´æ°è·SQMASKä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ã
è¯æï¼
注æè¿éå为äºè¿å¶æ¯ ï¼å°¤å ¶æ³¨ææå³è¾¹ç¬¬ä¸ä½æ¯0ï¼ä»»ä½æ°è·æå³è¾¹ç¬¬ä¸ä½æ¯0çæ°ä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ï¼å 为ä½ä¸ä¹åï¼ç¬¬ä¸ä½å°±æ¯0ï¼æ¯å¦s=A&SQMASKï¼Aå¯ä»¥æ¯ä»»ææ´æ°ï¼ç¶åæsæäºè¿å¶è¿è¡å¤é¡¹å¼å±å¼ï¼åæs=2 n1+2 n2 â¦â¦+2^nnï¼è¿énâ¥1ï¼æ以så¯ä»¥è¢«2æ´é¤ï¼å³sæ¯å¶æ°ã
æ以ä¸ä¸ªæ°æ¯å¥æ°è¿æ¯å¶æ°ï¼çå ¶æå³è¾¹ç¬¬ä¸ä½å³å¯ã
æ们ç¥éworkQueueæexternalPushå建çåcreateWorkerå建çworkerï¼ä¸¤ç§æ¹å¼å建çworkQueueï¼å ¶æ¾ç½®å°workQueuesçä½ç½®æ¯ä¸åçï¼åè æ¾å°workQueueçå¶æ°ä½ç½®ï¼èåè åæ¾å°å¥æ°ä½ç½®ãä¸åworkQueueæ¾å°èªå·±å¨workQueuesçä½ç½®çç®æ³æç¹ä¸åã
ä¸é¢çä¸ä¸forkjoinæ¡æ¶è·åworkQueuesä¸çå¶æ°ä½ç½®çworkQueueçç®æ³ï¼
è¿æ ·å°±è½è·åworkQueuesçå¶æ°ä½ç½®çworkQueueãmä¿è¯m & r & SQMASKè¿æ´ä¸ªè¿ç®ç»æä¸ä¼è¶ åºworkQueuesçä¸æ ï¼SQMASKä¿è¯åå°çæ¯å¶æ°ä½ç½®çworkQueueãè¿éæä¸ä¸ªæ趣çç°è±¡ï¼å设0å°workQueues.length-1ä¹é´æn个å¶æ°ï¼m & r & SQMASKæ¯æ¬¡é½è½åå°å ¶ä¸ä¸ä¸ªå¶æ°ï¼èä¸è¿ç»n次åå°çå¶æ°ä¸ä¼åºç°éå¤å¼ï¼æ£åæ§é常好ãèä¸æ¯å¾ªç¯çï¼å³1å°n次ån个ä¸åå¶æ°ï¼n+1å°2nä¹æ¯ån次ä¸åå¶æ°ï¼æ¤æ¶n个å¶æ°æ¯ä¸ªé½è¢«éæ°åä¸æ¬¡ãä¸é¢åæä¸rå¼æä»ä¹ç§å¯ï¼ä¸ºä½è½ä¿è¯è¿æ ·çæ£åæ§
ThreadLocalRandomå æä¸å¸¸éPROBE_INCREMENT = 0x9eb9ï¼ä»¥åä¸ä¸ªéæçprobeGenerator =new AtomicInteger() ï¼ç¶åæ¯ä¸ªçº¿ç¨çprobe= probeGenerator.addAndGet(PROBE_INCREMENT)æ以第ä¸ä¸ªçº¿ç¨çprobeå¼æ¯0x9eb9ï¼ç¬¬äºä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9ï¼ç¬¬ä¸ä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9+0x9eb9以æ¤ç±»æ¨ï¼æ´ä¸ªå¼æ¯çº¿æ§çï¼å¯ä»¥ç¨y=kx表示ï¼å ¶ä¸k=0x9eb9ï¼x表示第å 个线ç¨ãè¿æ ·æ¯ä¸ªçº¿ç¨çprobeå¯ä»¥ä¿è¯ä¸ä¸æ ·ï¼èä¸å ·æå¾å¥½ç离æ£æ§ã
å®é ä¸ï¼å¯ä»¥ä¸ç¨0x9eb9è¿ä¸ªå¼ï¼ç¨ä»»æä¸ä¸ªå¥æ°é½æ¯å¯ä»¥çï¼æ¯å¦1ãå¦æç¨1çè¯ï¼probe+=1ï¼è¿æ ·æ¯ä¸ªçº¿ç¨çprobeå°±é½æ¯ä¸åçï¼èä¸å ·æå¾å¥½ç离æ£æ§ãä¹å°±æ¯è¯´ï¼å设æéå¶æ¡ä»¶probe<nï¼è¶ è¿nå产ç溢åºãåprobeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼n次åprobeæ¯æ¬¡èªå çå¼é½ä¸åãå®é ä¸ç¨ä»»æä¸ä¸ªå¥æ°ï¼é½å¯ä»¥ä¿è¯probeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼æå ´è¶£å¯çæ¬ææåéå½é¨åãç±äºå¥æ°ç离æ£æ§ï¼æ以åªè¦çº¿ç¨æ°å°äºmæè SQMASK两è ä¸çæå°å¼ï¼åæ¯ä¸ªçº¿ç¨é½è½å¯ä¸å°å æ®ä¸ä¸ªwsä¸çä¸ä¸ªä½ç½®
å½ä¸ä¸ªæä½æ¯å¨éForkjoinThreadç线ç¨ä¸è¿è¡çï¼å称该æä½ä¸ºå¤é¨æä½ãæ¯å¦æ们åé¢æ§è¡pool.invokeï¼invokeå åæ§è¡externalPushãç±äºinvokeæ¯å¨éForkjoinThread线ç¨ä¸è¿è¡çï¼è¿éæ¯å¨main线ç¨ä¸è¿è¡ï¼ï¼æ以æ¯ä¸ä¸ªå¤é¨æä½ï¼è°ç¨çæ¯externalPushãä¹åtaskçæ§è¡æ¯éè¿ForkJoinThreadæ¥æ§è¡çï¼æ以taskä¸çforkå°±æ¯å é¨æä½ï¼è°ç¨çæ¯pushï¼æä»»å¡æ交å°å·¥ä½éåãå ¶å®forkçå®ç°æ¯ç±»ä¼¼ä¸é¢è¿æ ·çï¼
å³forkä¼æ ¹æ®æ§è¡èªèº«ç线ç¨æ¯å¦æ¯ForkJoinThreadçå®ä¾æ¥å¤ææ¯å¤äºå¤é¨è¿æ¯å é¨ãé£ä¸ºä½è¦åºåå å¤é¨ï¼
ä»»ä½çº¿ç¨é½å¯ä»¥ä½¿ç¨ForkJoinæ¡æ¶ï¼ä½æ¯å¯¹äºéForkJoinThreadç线ç¨ï¼å®å°åºæ¯ææ ·çï¼ForkJoinæ æ³æ§å¶ï¼ä¹æ æ³å¯¹å ¶ä¼åãå æ¤åºååºå å¤é¨ï¼è¿æ ·æ¹ä¾¿ForkJoinæ¡æ¶å¯¹ä»»å¡çæ§è¡è¿è¡æ§å¶åä¼å
forkJoinPool.invoke(task)æ¯æä»»å¡æ¾å ¥å·¥ä½éåï¼å¹¶çå¾ ä»»å¡æ§è¡ãæºç å¦ä¸
è¿éexternalPushè´è´£ä»»å¡æ交ï¼externalPushæºç å¦ä¸ï¼
编程「锁」事|详解乐观锁 CAS 的技术原理
本文深入探讨乐观锁的核心实现方式——CAS(Compare And Swap)技术原理。CAS是一种在多线程环境下实现同步功能的机制,相较于悲观锁的加锁操作,CAS允许在不使用锁的情况下实现多线程间的变量同步。Java的并发包中的原子类正是利用CAS实现乐观锁。
CAS操作包含三个操作数:需要更新的内存值V、进行比较的预期数值A和要写入的值B。其逻辑是将内存值V与预期值A进行比较,当且仅当V值等于A时,通过原子方式用新值B更新V值(“比较+更新”整体是一个原子操作),否则不执行任何操作。一般情况下,更新操作会不断重试直至成功。
以Java.util.concurrent.atomic并发包下的戏说网站源码AtomicInteger原子整型类为例,分析其CAS底层实现机制。方法`atomicData.incrementAndGet()`内部通过Unsafe类实现。Unsafe类是底层硬件CPU指令复制工具类,关键在于compareAndSet()方法的返回结果。
`unsafe.compareAndSwapInt(this, valueOffset, expect, update)`
此方法中,参数`this`是Unsafe对象本身,用于获取value的内存偏移地址。`valueOffset`是value变量的内存偏移地址,`expect`是期望更新的值,`update`是要更新的最新值。如果原子变量中的value值等于`expect`,则使用`update`值更新该值并返回true,否则返回false。
至于`valueOffset`的来源,这里提到value实际上是volatile关键字修饰的变量,以保证在多线程环境下的内存可见性。
CAS的底层是Unsafe类。如何通过`Unsafe.getUnsafe()`方法获得Unsafe类的实例?这是因为AtomicInteger类在rt.jar包下,因此通过Bootstrap根类加载器加载。Unsafe类的具体实现可以在hotspot源码中找到,而unsafe.cpp中的C++代码不在本文详细分析范围内。对CAS实现感兴趣的读者可以自行查阅。
CAS底层的totp算法源码Unsafe类在多处理器上运行时,为cmpxchg指令添加lock前缀(lock cmpxchg),在单处理器上则无需此步骤(单处理器自身维护单处理器内的顺序一致性)。这一机制确保了CAS操作的原子性。
最后,同学们会发现CAS的操作与原子性密切相关。CPU如何实现原子性操作是一个深入的话题,有机会可以继续探索。欢迎在评论区讨论,避免出现BUG!点赞转发不脱发!
从HotSpot源码,深度解读 park 和 unpark
我最近建立了一个在线自习室(App:番茄ToDO)用于相互监督学习,感兴趣的小伙伴可以加入。自习室加入码:D5A7A
Java并发包下的类大多基于AQS(AbstractQueuedSynchronizer)框架实现,而AQS线程安全的实现依赖于两个关键类:Unsafe和LockSupport。
其中,Unsafe主要提供CAS操作(关于CAS,在文章《读懂AtomicInteger源码(多线程专题)》中讲解过),LockSupport主要提供park/unpark操作。实际上,park/unpark操作的最终调用还是基于Unsafe类,因此Unsafe类才是核心。
Unsafe类的换砖源码实现是由native关键字说明的,这意味着这个方法是原生函数,是用C/C++语言实现的,并被编译成了DLL,由Java去调用。
park函数的作用是将当前调用线程阻塞,而unpark函数则是唤醒指定线程。
park是等待一个许可,unpark是为某线程提供一个许可。如果线程A调用park,除非另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。每次调用一次park,需要有一个unpark来解锁。
并且,unpark可以先于park调用,但不管unpark先调用多少次,都只提供一个许可,不可叠加。只需要一次park来消费掉unpark带来的许可,再次调用会阻塞。
在Linux系统下,park和unpark是通过Posix线程库pthread中的mutex(互斥量)和condition(条件变量)来实现的。
简单来说,mutex和condition保护了一个叫_counter的信号量。当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。当_counter=0时线程阻塞,当_counter>0时直接设为0并返回。
每个Java线程都有一个Parker实例,Parker类的部分源码如下:
由源码可知,Parker类继承于PlatformParker,实际上是用Posix的mutex和condition来实现的。Parker类里的_counter字段,就是用来记录park和unpark是否需要阻塞的标识。
具体的执行逻辑已经用注释标记在代码中,简要来说,就是检查_counter是不是大于0,如果是,则把_counter设置为0,返回。如果等于零,继续执行,阻塞等待。
unpark直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程。源码如下:
(如果不会下载JVM源码可以后台回复“jdk”,获得下载压缩包)
深入学习CAS底层原理
什么是CAS
CAS是Compare-And-Swap的缩写,意思为比较并交换。以AtomicInteger为例,其提供了compareAndSet(intexpect,intupdate)方法,expect为期望值(被修改的值在主内存中的期望值),update为修改后的值。compareAndSet方法返回值类型为布尔类型,修改成功则返回true,修改失败返回false。
举个compareAndSet方法的例子:
publicclassAtomticIntegerTest{ publicstaticvoidmain(String[]args){ AtomicIntegeratomicInteger=newAtomicInteger(0);booleanresult=atomicInteger.compareAndSet(0,1);System.out.println(result);System.out.println(atomicInteger.get());}}上面例子中,通过AtomicInteger(intinitialValue)构造方法指定了AtomicInteger类成员变量value的初始值为0:
publicclassAtomicIntegerextendsNumberimplementsjava.io.Serializable{ ......privatevolatileintvalue;/***CreatesanewAtomicIntegerwiththegiveninitialvalue.**@paraminitialValuetheinitialvalue*/publicAtomicInteger(intinitialValue){ value=initialValue;}......}接着执行compareAndSet方法,main线程从主内存中拷贝了value的副本到工作线程,值为0,并将这个值修改为1。如果此时主内存中value的值还是为0的话(言外之意就是没有被其他线程修改过),则将修改后的副本值刷回主内存更新value的值。所以上面的例子运行结果应该是true和1:
将上面的例子修改为:
publicclassAtomticIntegerTest{ publicstaticvoidmain(String[]args){ AtomicIntegeratomicInteger=newAtomicInteger(0);booleanfirstResult=atomicInteger.compareAndSet(0,1);booleansecondResult=atomicInteger.compareAndSet(0,1);System.out.println(firstResult);System.out.println(secondResult);System.out.println(atomicInteger.get());}}上面例子中,main线程第二次调用compareAndSet方法的时候,value的值已经被修改为1了,不符合其expect的值,所以修改将失败。上面例子输出如下:
CAS底层原理查看compareAndSet方法源码:
/***Atomicallysetsthevalueto{ @codenewValue}*ifthecurrentvalue{ @code==expectedValue},*withmemoryeffectsasspecifiedby{ @linkVarHandle#compareAndSet}.**@paramexpectedValuetheexpectedvalue*@paramnewValuethenewvalue*@return{ @codetrue}ifsuccessful.Falsereturnindicatesthat*theactualvaluewasnotequaltotheexpectedvalue.*/publicfinalbooleancompareAndSet(intexpectedValue,intnewValue){ returnU.compareAndSetInt(this,VALUE,expectedValue,newValue);}该方法通过调用unsafe类的compareAndSwapInt方法实现相关功能。compareAndSwapInt方法包含四个参数:
this,当前对象;
valueOffset,value成员变量的内存偏移量(也就是内存地址):
privatestaticfinallongvalueOffset;static{ try{ valueOffset=unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));}catch(Exceptionex){ thrownewError(ex);}}expect,期待值;
update,更新值。
所以这个方法的含义为:获取当前对象value成员变量在主内存中的值,和传入的期待值相比,如果相等则说明这个值没有被别的线程修改过,然后将其修改为更新值。
那么unsafe又是什么?它的compareAndSwapInt方法是原子性的么?查看该方法的源码:
/***AtomicallyupdatesJavavariableto{ @codex}ifitiscurrently*holding{ @codeexpected}.**<p>Thisoperationhasmemorysemanticsofa{ @codevolatile}read*andwrite.CorrespondstoCatomic_compare_exchange_strong.**@return{ @codetrue}ifsuccessful*/@HotSpotIntrinsicCandidatepublicfinalnativebooleancompareAndSetInt(Objecto,longoffset,intexpected,intx);该方法并没有具体Java代码实现,方法通过native关键字修饰。由于Java方法无法直接访问底层系统,Unsafe类相当于一个后门,可以通过该类的方法直接操作特定内存的数据。Unsafe类存在于sun.msic包中,JVM会帮我们实现出相应的汇编指令。Unsafe类中的CAS方法是一条CPU并发原语,由若干条指令组成,用于完成某个功能的一个过程。原语的执行必须是连续的,在执行过程中不允许被中断,不会存在数据不一致的问题。
getAndIncrement方法剖析了解了CAS原理后,我们回头看下AtomicInteger的getAndIncrement方法源码:
/***Atomicallyincrementsthecurrentvalue,*withmemoryeffectsasspecifiedby{ @linkVarHandle#getAndAdd}.**<p>Equivalentto{ @codegetAndAdd(1)}.**@returnthepreviousvalue*/publicfinalintgetAndIncrement(){ returnU.getAndAddInt(this,VALUE,1);}该方法通过调用unsafe类的getAndAddInt方法实现相关功能。继续查看getAndAddInt方法的源码:
/***Atomicallyaddsthegivenvaluetothecurrentvalueofafield*orarrayelementwithinthegivenobject{ @codeo}*atthegiven{ @codeoffset}.**@paramoobject/arraytoupdatethefield/elementin*@paramoffsetfield/elementoffset*@paramdeltathevaluetoadd*@returnthepreviousvalue*@since1.8*/@HotSpotIntrinsicCandidatepublicfinalintgetAndAddInt(Objecto,longoffset,intdelta){ intv;do{ v=getIntVolatile(o,offset);}while(!weakCompareAndSetInt(o,offset,v,v+delta));returnv;}结合源码,我们便可以很直观地看出为什么AtomicInteger的getAndIncrement方法是线程安全的了:
o是AtomicInteger对象本身;offset是AtomicInteger对象的成员变量value的内存地址;delta是需要变更的数量;v是通过unsafe的getIntVolatile方法获得AtomicInteger对象的成员变量value在主内存中的值。dowhile循环中的逻辑为:用当前对象的值和var5比较,如果相同,说明该值没有被别的线程修改过,更新为v+delta,并返回true(CAS);否则继续获取值并比较,直到更新完成。
CAS的缺点CAS并不是完美的,其存在以下这些缺点:
如果刚好while里的CAS操作一直不成功,那么对CPU的开销大;
只能确保一个共享变量的原子操作;
存在ABA问题。
CAS实现的一个重要前提是需要取出某一时刻的数据并在当下时刻比较交换,这之间的时间差会导致数据的变化。比如:thread1线程从主内存中取出了变量a的值为A,thread2页从主内存中取出了变量a的值为A。由于线程调度的不确定性,这时候thread1可能被短暂挂起了,thread2进行了一些操作将值修改为了B,然后又进行了一些操作将值修改回了A,这时候当thread1重新获取CPU时间片重新执行CAS操作时,会发现变量a在主内存中的值仍然是A,所以CAS操作成功。
解决ABA问题那么如何解决CAS的ABA问题呢?由上面的阐述课件,光通过判断值是否相等并不能确保在一定时间差内值没有变更过,所以我们需要一个额外的指标来辅助判断,类似于时间戳,版本号等。
JUC为我们提供了一个AtomicStampedReference类,通过查看它的构造方法就可以看出,除了指定初始值外,还需指定一个版本号(戳):
/***Createsanew{ @codeAtomicStampedReference}withthegiven*initialvalues.**@paraminitialReftheinitialreference*@paraminitialStamptheinitialstamp*/publicAtomicStampedReference(VinitialRef,intinitialStamp){ pair=Pair.of(initialRef,initialStamp);}我们就用这个类来解决ABA问题,首先模拟一个ABA问题场景:
publicclassAtomticIntegerTest{ publicstaticvoidmain(String[]args){ AtomicReference<String>atomicReference=newAtomicReference<>("A");newThread(()->{ //模拟一次ABA操作atomicReference.compareAndSet("A","B");atomicReference.compareAndSet("B","A");System.out.println(Thread.currentThread().getName()+"线程完成了一次ABA操作");},"thread1").start();newThread(()->{ //让thread2先睡眠2秒钟,确保thread1的ABA操作完成try{ TimeUnit.SECONDS.sleep(2);}catch(InterruptedExceptione){ e.printStackTrace();}booleanresult=atomicReference.compareAndSet("A","B");if(result){ System.out.println(Thread.currentThread().getName()+"线程修改值成功,当前值为:"+atomicReference.get());}},"thread2").start();}}运行程序,输出如下:
使用AtomicStampedReference解决ABA问题:
publicclassAtomicIntegerextendsNumberimplementsjava.io.Serializable{ ......privatevolatileintvalue;/***CreatesanewAtomicIntegerwiththegiveninitialvalue.**@paraminitialValuetheinitialvalue*/publicAtomicInteger(intinitialValue){ value=initialValue;}......}0程序输出如下:
2024-11-28 10:31
2024-11-28 10:18
2024-11-28 10:16
2024-11-28 08:56
2024-11-28 08:29
2024-11-28 08:22