大白话讲讲 Go 语言的 sync.Map(二)
Go 语言的 sync.Map机制通过entry数据结构解决了标准库map的线程安全问题。当需要删除key,比如李四销户,旧数据可能标记为expunged,entry.p可能指向真实数据、nil或已删除状态。在线直播课程源码sync.Map实际上有两个账本:readOnly和dirty,用于同步数据状态,避免数据不一致。readOnly账本的amended标记用于标记脏账本中未同步到readOnly的key。
查找过程通过amended字段加速,如果两个账本数据一致,卡通射手2源码就无需进一步操作。sync.Map的写入是通过CAS(Compare and Swap)实现的,如加法函数中的atomic.CompareAndSwapInt操作。这个操作在src/runtime/internal/atomic/asm_amd.s中详细定义,包括使用LOCK指令锁定总线,确保内存独占访问,以及CMPXCHGL指令进行比较和交换。
删除数据逻辑同样复杂,涉及隐含的逻辑关系。写入逻辑是整个sync.Map的核心,而遍历则相对简单。京东源码谁有本文详细讲解了sync.Map的工作原理,尽管篇幅较长,但确保了内容的全面性和深度。文章由博主于--发布,原文链接可参考imlht.com。
线程池执行过程中遇到异常会发生什么?怎样处理?
线程遇到未处理的异常就结束了
这个好理解,当线程出现未捕获异常的时候就执行不下去了,留给它的就是垃圾回收了。
线程池中线程频繁出现未捕获异常当线程池中线程频繁出现未捕获的异常,那线程的复用率就大大降低了,需要不断地创建新线程。gm联运平台源码
做个实验:
publicclassThreadExecutor{ privateThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}threadPoolExecutor.execute(()->{ intj=1/0;});});}}新建一个只有一个线程的线程池,每隔0.1s提交一个任务,任务中是一个1/0的计算。
Exceptioninthread"customThread0"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread1"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread2"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread3"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread4"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread5"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)可见每次执行的线程都不一样,之前的线程都没有复用。原因是因为出现了未捕获的异常。
我们把异常捕获试试:
publicclassThreadExecutor{ privateThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}threadPoolExecutor.execute(()->{ try{ intj=1/0;}catch(Exceptione){ System.out.println(Thread.currentThread().getName()+""+e.getMessage());}});});}}customThread0/byzerocustomThread0/byzerocustomThread0/byzerocustomThread0/byzerocustomThread0/byzero可见当异常捕获了,线程就可以复用了。
问题来了,我们的代码中异常不可能全部捕获如果要捕获那些没被业务代码捕获的异常,可以设置Thread类的qpython3源码uncaughtExceptionHandler属性。这时使用ThreadFactoryBuilder会比较方便,ThreadFactoryBuilder是guava提供的ThreadFactory生成器。
newThreadFactoryBuilder().setNameFormat("customThread%d").setUncaughtExceptionHandler((t,e)->System.out.println(t.getName()+"发生异常"+e.getCause())).build()修改之后:
publicclassThreadExecutor{ privatestaticThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").setUncaughtExceptionHandler((t,e)->System.out.println("UncaughtExceptionHandler捕获到:"+t.getName()+"发生异常"+e.getMessage())).build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}threadPoolExecutor.execute(()->{ System.out.println("线程"+Thread.currentThread().getName()+"执行");intj=1/0;});});}}线程customThread0执行UncaughtExceptionHandler捕获到:customThread0发生异常/byzero线程customThread1执行UncaughtExceptionHandler捕获到:customThread1发生异常/byzero线程customThread2执行UncaughtExceptionHandler捕获到:customThread2发生异常/byzero线程customThread3执行UncaughtExceptionHandler捕获到:customThread3发生异常/byzero线程customThread4执行UncaughtExceptionHandler捕获到:customThread4发生异常/byzero可见,结果并不是我们想象的那样,线程池中原有的线程没有复用!所以通过UncaughtExceptionHandler想将异常吞掉使线程复用这招貌似行不通。它只是做了一层异常的保底处理。
将excute改成submit试试
publicclassThreadExecutor{ privatestaticThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,1,,TimeUnit.SECONDS,newArrayBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("customThread%d").setUncaughtExceptionHandler((t,e)->System.out.println("UncaughtExceptionHandler捕获到:"+t.getName()+"发生异常"+e.getMessage())).build());@Testpublicvoidtest(){ IntStream.rangeClosed(1,5).forEach(i->{ try{ Thread.sleep();}catch(InterruptedExceptione){ e.printStackTrace();}Future<?>future=threadPoolExecutor.submit(()->{ System.out.println("线程"+Thread.currentThread().getName()+"执行");intj=1/0;});try{ future.get();}catch(InterruptedExceptione){ e.printStackTrace();}catch(ExecutionExceptione){ e.printStackTrace();}});}}线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero线程customThread0执行java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/byzero通过submit提交线程可以屏蔽线程中产生的异常,达到线程复用。当get()执行结果时异常才会抛出。
原因是通过submit提交的线程,当发生异常时,会将异常保存,待future.get();时才会抛出。
这是Futuretask的部分run()方法,看setException:
publicvoidrun(){ try{ Callable<V>c=callable;if(c!=null&&state==NEW){ Vresult;booleanran;try{ result=c.call();ran=true;}catch(Throwableex){ result=null;ran=false;setException(ex);}if(ran)set(result);}}}protectedvoidsetException(Throwablet){ if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){ outcome=t;UNSAFE.putOrderedInt(this,stateOffset,EXCEPTIONAL);//finalstatefinishCompletion();}}将异常存在outcome对象中,没有抛出,再看get方法:
Exceptioninthread"customThread0"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread1"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread2"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread3"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread4"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)Exceptioninthread"customThread5"java.lang.ArithmeticException:/byzeroatthread.ThreadExecutor.lambda$null$0(ThreadExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)atjava.lang.Thread.run(Thread.java:)0当outcome是异常时才抛出。
总结1、线程池中线程中异常尽量手动捕获
2、通过设置ThreadFactory的UncaughtExceptionHandler可以对未捕获的异常做保底处理,通过execute提交任务,线程依然会中断,而通过submit提交任务,可以获取线程执行结果,线程异常会在get执行结果时抛出。
原文链接:pareAndSet()方法的返回结果。
`unsafe.compareAndSwapInt(this, valueOffset, expect, update)`
此方法中,参数`this`是Unsafe对象本身,用于获取value的内存偏移地址。`valueOffset`是value变量的内存偏移地址,`expect`是期望更新的值,`update`是要更新的最新值。如果原子变量中的value值等于`expect`,则使用`update`值更新该值并返回true,否则返回false。
至于`valueOffset`的来源,这里提到value实际上是volatile关键字修饰的变量,以保证在多线程环境下的内存可见性。
CAS的底层是Unsafe类。如何通过`Unsafe.getUnsafe()`方法获得Unsafe类的实例?这是因为AtomicInteger类在rt.jar包下,因此通过Bootstrap根类加载器加载。Unsafe类的具体实现可以在hotspot源码中找到,而unsafe.cpp中的C++代码不在本文详细分析范围内。对CAS实现感兴趣的读者可以自行查阅。
CAS底层的Unsafe类在多处理器上运行时,为cmpxchg指令添加lock前缀(lock cmpxchg),在单处理器上则无需此步骤(单处理器自身维护单处理器内的顺序一致性)。这一机制确保了CAS操作的原子性。
最后,同学们会发现CAS的操作与原子性密切相关。CPU如何实现原子性操作是一个深入的话题,有机会可以继续探索。欢迎在评论区讨论,避免出现BUG!点赞转发不脱发!
2024-11-23 06:13
2024-11-23 05:23
2024-11-23 05:21
2024-11-23 05:08
2024-11-23 04:58