1.eworkerԴ??
2.图解+源码讲解 Nacos 客户端动态监听配置机制
3.ForkjoinPool -1
eworkerԴ??
文件处理一直都是前端人的心头病,如何控制好文件大小,文件太大上传不了,文件下载时间太长,tcp直接给断开了?等效果为了方便大家有意义的学习,这里就先放效果图,wire源码如果不满足直接返回就行,不浪费大家的时间。
文件上传文件上传实现,分片上传,暂停上传,恢复上传,文件合并等
文件下载为了方便测试,我上传了1个1g的大文件拿来下载,前端用的是流的方式来保存文件的,具体的可以看这个apiTransformStream
正文本项目的地址是:/post/
requestIdleCallback有不明白的可以看这里:/post/
接下来咋们来计算文件的hash,计算文件的hash需要使用spark-md5这个库,
全量计算文件hashexportasyncfunctioncalcHashSync(file:File){ //对文件进行分片,每一块文件都是分为2MB,这里可以自己来控制constsize=2**;letchunks:any[]=[];letcur=0;while(cur<file.size){ chunks.push({ file:file.slice(cur,opencv3.4.6源码cur+size)});cur+=size;}//可以拿到当前计算到第几块文件的进度lethashProgress=0returnnewPromise(resolve=>{ constspark=newSparkMD5.ArrayBuffer();letcount=0;constloadNext=(index:number)=>{ constreader=newFileReader();reader.readAsArrayBuffer(chunks[index].file);reader.onload=e=>{ //累加器不能依赖index,count++;//增量计算md5spark.append(e.target?.resultasArrayBuffer);if(count===chunks.length){ //通知主线程,计算结束hashProgress=;resolve({ hashValue:spark.end(),progress:hashProgress});}else{ //每个区块计算结束,通知进度即可hashProgress+=/chunks.length//计算下一个loadNext(count);}};};//启动loadNext(0);});}全量计算文件hash,在文件小的时候计算是很快的,但是在文件大的情况下,计算文件的hash就会非常慢,并且影响主进程哦?
抽样计算文件hash抽样就是取文件的一部分来继续,原理如下:
/***抽样计算hash值大概是1G文件花费1S的时间**采用抽样hash的方式来计算hash*我们在计算hash的时候,将超大文件以2M进行分割获得到另一个chunks数组,房源推荐系统源码*第一个元素(chunks[0])和最后一个元素(chunks[-1])我们全要了*其他的元素(chunks[1,2,3,4....])我们再次进行一个分割,这个时候的分割是一个超小的大小比如2kb,我们取*每一个元素的头部,尾部,中间的2kb。*最终将它们组成一个新的文件,我们全量计算这个新的文件的hash值。*@paramfile{ File}*@returns*/exportasyncfunctioncalcHashSample(file:File){ returnnewPromise(resolve=>{ constspark=newSparkMD5.ArrayBuffer();constreader=newFileReader();//文件大小constsize=file.size;letoffset=2**;letchunks=[file.slice(0,offset)];//前面2mb的数据letcur=offset;while(cur<size){ //最后一块全部加进来if(cur+offset>=size){ chunks.push(file.slice(cur,cur+offset));}else{ //中间的前中后去两个字节constmid=cur+offset/2;constend=cur+offset;chunks.push(file.slice(cur,cur+2));chunks.push(file.slice(mid,mid+2));chunks.push(file.slice(end-2,end));}//前取两个字节cur+=offset;}//拼接reader.readAsArrayBuffer(newBlob(chunks));//最后Kreader.onload=e=>{ spark.append(e.target?.resultasArrayBuffer);resolve({ hashValue:spark.end(),progress:});};});}这个设计是不是发现挺灵活的,真是游戏厅源码个人才哇
在这两个的基础上,咋们还可以分别使用web-worker和requestIdleCallback来实现,源代码在hereヾ(≧▽≦*)o
这里把我电脑配置说一下,公司给我分的电脑配置比较lower,8g内存的老机器。计算(3.3g文件的)hash的结果如下:
结果很显然,全量无论怎么弄,都是比抽样的更慢。
文件分片的方式这里可能大家会说,文件分片方式不就是等分吗,其实还可以根据网速上传的Set的底层源码速度来实时调整分片的大小哦!
consthandleUpload1=async(file:File)=>{ if(!file)return;constfileSize=file.sizeletoffset=2**letcur=0letcount=0//每一刻的大小需要保存起来,方便后台合并constchunksSize=[0,2**]constobj=awaitcalcHashSample(file)as{ hashValue:string};fileHash.value=obj.hashValue;//todo判断文件是否存在存在则不需要上传,也就是秒传while(cur<fileSize){ constchunk=file.slice(cur,cur+offset)cur+=offsetconstchunkName=fileHash.value+"-"+count;constform=newFormData();form.append("chunk",chunk);form.append("hash",chunkName);form.append("filename",file.name);form.append("fileHash",fileHash.value);form.append("size",chunk.size.toString());letstart=newDate().getTime()//todo上传单个碎片constnow=newDate().getTime()consttime=((now-start)/).toFixed(4)letrate=Number(time)///速率有最大和最小可以考虑更平滑的过滤比如1/tanif(rate<0.5)rate=0.5if(rate>2)rate=2offset=parseInt((offset/rate).toString())chunksSize.push(offset)count++}//todo可以发送合并操作了} ATTENTION!!!?如果是这样上传的文件碎片,如果中途断开是无法续传的(每一刻的网速都是不一样的),除非每一次上传都把chunksSize(分片的数组)保存起来哦控制/post/图解+源码讲解 Nacos 客户端动态监听配置机制
图解+源码讲解 Nacos 客户端动态监听配置机制
在人生中第一要紧的是发现自己。为了这个目的,各位时常需要孤独和深思 —— 南森 Nacos 源码分析系列相关文章
从零开始看 Nacos 源码环境搭建
图解+源码讲解 Nacos 客户端发起注册流程
图解+源码讲解 Nacos 服务端处理注册请求逻辑
图解+源码讲解 Nacos 客户端下线流程
图解+源码讲解 Nacos 服务端处理下线请求
图解+源码讲解 Nacos 客户端发起心跳请求
图解+源码讲解 Nacos 服务端处理心跳请求
图解+源码讲解 Nacos 服务端处理配置获取请求
图解+源码讲解 Nacos 客户端动态监听配置机制
NacosConfigAutoConfiguration我们看到这里面其实注入了一个 Nacos 配置刷新的关键 NacosContextRefresherBean
@Configuration@ConditionalOnProperty(name?=?"spring.cloud.nacos.config.enabled",?matchIfMissing?=?true)public?class?NacosConfigAutoConfiguration?{ //?Nacos?配置属性@Beanpublic?NacosConfigProperties?nacosConfigProperties(ApplicationContext?context)?{ if?(context.getParent()?!=?null&&?BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getParent(),?NacosConfigProperties.class).length?>?0)?{ return?BeanFactoryUtils.beanOfTypeIncludingAncestors(context.getParent(),NacosConfigProperties.class);}return?new?NacosConfigProperties();}//?Nacos?配置刷新属性@Beanpublic?NacosRefreshProperties?nacosRefreshProperties()?{ return?new?NacosRefreshProperties();}//?Nacos?刷新历史@Beanpublic?NacosRefreshHistory?nacosRefreshHistory()?{ return?new?NacosRefreshHistory();}//?Nacos?配置管理@Beanpublic?NacosConfigManager?nacosConfigManager(NacosConfigProperties?nacosConfigProperties)?{ return?new?NacosConfigManager(nacosConfigProperties);}//?Nacos?配置刷新@Beanpublic?NacosContextRefresher?nacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?nacosRefreshHistory)?{ return?new?NacosContextRefresher(nacosConfigManager,?nacosRefreshHistory);}}NacosContextRefresher 配置中心刷新public?NacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?refreshHistory)?{ //?获取配置属性信息this.nacosConfigProperties?=?nacosConfigManager.getNacosConfigProperties();//?刷新历史this.nacosRefreshHistory?=?refreshHistory;//?获取配置服务this.configService?=?nacosConfigManager.getConfigService();//?是否开启刷新,是truethis.isRefreshEnabled?=?this.nacosConfigProperties.isRefreshEnabled();}获取配置服务 getConfigServicenacosConfigManager.getConfigService(),这行代码其实就是为了创建 NcaosConfigService 对象,我们看看你是怎么创建的,其实核心代码就是通过 NacosFactory 反射创建的 NcaosConfigService 对象,这个对象是一个核心对象后续会讲到的
public?static?ConfigService?createConfigService(Properties?properties)?throws?NacosException?{ try?{ //?加载?NacosConfigService?类Class<?>?driverImplClass?=?Class.forName("com.alibaba.nacos.client.config.NacosConfigService");//?获取构造器Constructor?constructor?=?driverImplClass.getConstructor(Properties.class);//?创建实例ConfigService?vendorImpl?=?(ConfigService)?constructor.newInstance(properties);return?vendorImpl;}?catch?(Throwable?e)?{ throw?new?NacosException(NacosException.CLIENT_INVALID_PARAM,?e);}}监听器NacosContextRefresher 实现了 ApplicationListener ,一看这就是一个监听器了,我们看看这个在监听器里面做了什么操作
@Overridepublic?void?onApplicationEvent(ApplicationReadyEvent?event)?{ //?这是一个?CAS?操作,只设置一次if?(this.ready.compareAndSet(false,?true))?{ //?注册?Nacos?监听器对于应用this.registerNacosListenersForApplications();}}注册 Nacos 监听/**register Nacos Listeners. 注册Nacos监听器 */ private void registerNacosListenersForApplications() { // 默认是 true if (isRefreshEnabled()) { // 遍历Nacos属性资源中心 for (NacosPropertySource propertySource : NacosPropertySourceRepository .getAll()) { if (!propertySource.isRefreshable()) { continue; } // 获取资源ID ?String dataId = propertySource.getDataId(); // 通过组和 dataId 注册 Nacos 监听器 registerNacosListener(propertySource.getGroup(), dataId); } } }
private void registerNacosListener(final String groupKey, final String dataKey) { // 构建 Key 信息 String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); // 在 listenerMap中放入了 key 对应 AbstractSharedListener 响应的方法 Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() { @Override public void innerReceive(String dataId, String group, String configInfo) { // 刷新次数 refreshCountIncrement(); // 记录刷新历史,就是改变历史 nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // 发布刷新事件 applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); } }); // 向配置服务中添加监听器 configService.addListener(dataKey, groupKey, listener);
}
####?向配置服务中添加监听器 此时调用的是?NacosConfigService?中的?addListener?方法,但是最终执行的还是?ClientWorker?中的?addTenantListeners?方法,后面会进行分析?ClientWorker?这个类的```java@Overridepublic?void?addListener(String?dataId,?String?group,?Listener?listener)?throws?NacosException?{ //?这个?ClientWorker?worker?也是一个核心类worker.addTenantListeners(dataId,?group,?Arrays.asList(listener));}构建 CacheData 信息此时向 ClientWorker 中的 CacheData 中添加数据,之后遍历监听器添加到 CacheData 中
创建 CacheData 对象public?CacheData(ConfigFilterChainManager?configFilterChainManager,?String?name,?String?dataId,?String?group,String?tenant)?{ //?dataId?不能为空if?(null?==?dataId?||?null?==?group)?{ throw?new?IllegalArgumentException("dataId="?+?dataId?+?",?group="?+?group);}this.name?=?name;this.configFilterChainManager?=?configFilterChainManager;this.dataId?=?dataId;?//?设置dataIdthis.group?=?group;?//?设置组信息this.tenant?=?tenant;?//?设置租户listeners?=?new?CopyOnWriteArrayList<ManagerListenerWrap>();?//?装饰器集合this.isInitializing?=?true;//?加载缓存数据从本地磁盘this.content?=?loadCacheContentFromDiskLocal(name,?dataId,?group,?tenant);//?计算本地缓存信息的MD5this.md5?=?getMd5String(content);}向 CacheData 中添加数据public?void?addTenantListeners(String?dataId,?String?group,List<?extends?Listener>?listeners)throws?NacosException?{ //?DefaultGroupgroup?=?null2defaultGroup(group);String?tenant?=?agent.getTenant();?//?是?""//?向缓存数据中添加监听器CacheData?cache?=?addCacheDataIfAbsent(dataId,?group,?tenant);for?(Listener?listener?:?listeners)?{ cache.addListener(listener);}}public?CacheData?addCacheDataIfAbsent(String?dataId,?String?group,?String?tenant)throws?NacosException?{ //?获取Key信息String?key?=?GroupKey.getKeyTenant(dataId,?group,?tenant);//?在缓存?Map?中获取缓存数据CacheData?cacheData?=?cacheMap.get(key);//?如果不为空的情况下那么就返回,如果为空那么就创建一个?CacheDataif?(cacheData?!=?null)?{ return?cacheData;}//?创建一个?CacheData?cacheData?=?new?CacheData(configFilterChainManager,?agent.getName(),dataId,?group,?tenant);//?将创建好的?cacheData?放入缓存?Map?中CacheData?lastCacheData?=?cacheMap.putIfAbsent(key,?cacheData);//?如果缓存数据为空的话那么从配置中心拉取,不过此时不为空if?(lastCacheData?==?null)?{ //fix?issue?#?if?(enableRemoteSyncConfig)?{ String[]?ct?=?getServerConfig(dataId,?group,?tenant,?L);cacheData.setContent(ct[0]);}//?计算任务IDint?taskId?=?cacheMap.size()?/?(int)?ParamUtil.getPerTaskConfigSize();//?设置任务IDcacheData.setTaskId(taskId);lastCacheData?=?cacheData;}//?缓存数据初始化完成//?reset?so?that?server?not?hang?this?checklastCacheData.setInitializing(true);LOGGER.info("[{ }]?[subscribe]?{ }",?agent.getName(),?key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());//?返回最新的缓存数据return?lastCacheData;}到这里 CacheData 对象 和 cacheMap 集合已经构建完成了,后续会用到这个数据的
NacosConfigService 分析NacosConfigService这个类在创建的时候主要做了什么事情,这这里面创建了一个 ClientWorker对象,这个对象是一个核心的类,有关于配置的一些操作都是归功于 ClientWorker类
public?NacosConfigService(Properties?properties)?throws?NacosException?{ ......this.agent?=?new?MetricsHttpAgent(new?ServerHttpAgent(properties));this.agent.start();//?核心工作类this.worker?=?new?ClientWorker(this.agent,this.configFilterChainManager,?properties);}核心配置类 ClientWorker分析一下这个类都在做什么事情,都有哪些核心方法 其实能看到里面有一个构造函数、添加缓存数据、添加监听器、检查配置中心相关方法、获取服务配置、解析数据响应、移除缓存数据、删除监听器以及 shutdown方法
构造函数看到这里其实看到了定义了两个调度线程池,一个是用于配置检测的,一个是用于执行长轮询服务的
@SuppressWarnings("PMD.ThreadPoolCreationRule")public?ClientWorker(final?HttpAgent?agent,final?ConfigFilterChainManager?configFilterChainManager,?final?Properties?properties){ this.agent?=?agent;this.configFilterChainManager?=?configFilterChainManager;//?初始化操作init(properties);//?定义一个调度线程池,只有一个线程还是守护线程this.executor?=?Executors.newScheduledThreadPool(1,?new?ThreadFactory()?{ @Overridepublic?Thread?newThread(Runnable?r)?{ Thread?t?=?new?Thread(r);t.setName("com.alibaba.nacos.client.Worker."?+?agent.getName());t.setDaemon(true);return?t;}});//?定义一个多个线程的调度线程池,线程个数和CPU?核心数有关,也是守护线程,是一个长轮询this.executorService?=?Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),?new?ThreadFactory()?{ @Overridepublic?Thread?newThread(Runnable?r)?{ Thread?t?=?new?Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling."?+agent.getName());t.setDaemon(true);return?t;}});//?定义一个定时的调度任务,第一次执行的时候延时1毫秒,后续毫秒调度一次this.executor.scheduleWithFixedDelay(new?Runnable()?{ @Overridepublic?void?run()?{ try?{ //?检查配置信息方法checkConfigInfo();}?catch?(Throwable?e)?{ LOGGER.error("["?+?agent.getName()?+?"]?"+?"[sub-check]?rotate?check?error",?e);}}},?1L,?L,?TimeUnit.MILLISECONDS);}检查配置服务方法这个 cacheMap 包含了一些任务信息,这里面的任务是怎么来的呢,他是在添加监听器的时候添加的,上面已经分析过了
public?NacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?refreshHistory)?{ //?获取配置属性信息this.nacosConfigProperties?=?nacosConfigManager.getNacosConfigProperties();//?刷新历史this.nacosRefreshHistory?=?refreshHistory;//?获取配置服务this.configService?=?nacosConfigManager.getConfigService();//?是否开启刷新,是truethis.isRefreshEnabled?=?this.nacosConfigProperties.isRefreshEnabled();}0长轮询任务 LongPollingRunnableForkjoinPool -1
ForkJoinæ¯ç¨äºå¹¶è¡æ§è¡ä»»å¡çæ¡æ¶ï¼ æ¯ä¸ä¸ªæ大任å¡åå²æè¥å¹²ä¸ªå°ä»»å¡ï¼æç»æ±æ»æ¯ä¸ªå°ä»»å¡ç»æåå¾å°å¤§ä»»å¡ç»æçæ¡æ¶ãForkå°±æ¯æä¸ä¸ªå¤§ä»»å¡åå为è¥å¹²åä»»å¡å¹¶è¡çæ§è¡ï¼Joinå°±æ¯å并è¿äºåä»»å¡çæ§è¡ç»æï¼æåå¾å°è¿ä¸ªå¤§ä»»å¡çç»æãä¸é¢æ¯ä¸ä¸ªæ¯ä¸ä¸ªç®åçJoin/Fork计ç®è¿ç¨ï¼å°1âæ°åç¸å
é常è¿æ ·ä¸ªæ¨¡åï¼ä½ 们ä¼æ³å°ä»ä¹ï¼
Release Framework ï¼ å¸¸è§çå¤ç模åæ¯ä»ä¹ï¼ task pool - worker poolç模åã ä½æ¯Forkjoinpool éåäºå®å ¨ä¸åç模åã
ForkJoinPoolä¸ç§ExecutorServiceçå®ç°ï¼è¿è¡ForkJoinTaskä»»å¡ãForkJoinPoolåºå«äºå ¶å®ExecutorServiceï¼ä¸»è¦æ¯å 为å®éç¨äºä¸ç§å·¥ä½çªå(work-stealing)çæºå¶ãææ被ForkJoinPool管çç线ç¨å°è¯çªåæ交å°æ± åéçä»»å¡æ¥æ§è¡ï¼æ§è¡ä¸åå¯äº§çåä»»å¡æ交å°æ± åä¸ã
ForkJoinPoolç»´æ¤äºä¸ä¸ªWorkQueueçæ°ç»(æ°ç»é¿åº¦æ¯2çæ´æ°æ¬¡æ¹ï¼èªå¨å¢é¿)ãæ¯ä¸ªworkQueueé½æä»»å¡éå(ForkJoinTaskçæ°ç»)ï¼å¹¶ä¸ç¨baseãtopæåä»»å¡éåéå°¾åé头ãwork-stealingæºå¶å°±æ¯å·¥ä½çº¿ç¨æ¨ä¸ªæ«æä»»å¡éåï¼å¦æéåä¸ä¸ºç©ºååéå°¾çä»»å¡å¹¶æ§è¡ã示æå¾å¦ä¸
æµç¨å¾ï¼
poolå±æ§
workQueuesæ¯poolçå±æ§ï¼å®æ¯WorkQueueç±»åçæ°ç»ãexternalPushåexternalSubmitæå建çworkQueue没æowner(å³ä¸æ¯worker)ï¼ä¸ä¼è¢«æ¾å°workQueuesçå¶æ°ä½ç½®ï¼ècreateWorkerå建çworkQueueï¼å³workerï¼æownerï¼ä¸ä¼è¢«æ¾å°workQueuesçå¥æ°ä½ç½®ã
WorkQueueçå 个éè¦æååé说æå¦ä¸ï¼
è¿æ¯WorkQueueçconfigï¼é«ä½è·poolçconfigå¼ä¿æä¸è´ï¼èä½ä½åæ¯workQueueå¨workQueuesæ°ç»çä½ç½®ã
ä»workQueueså±æ§çä»ç»ä¸ï¼æ们ç¥éï¼ä¸æ¯ææworkQueueé½æworkerï¼æ²¡æworkerçworkQueueç§°ä¸ºå ¬å ±éåï¼shared queueï¼ï¼configç第ä½å°±æ¯ç¨æ¥å¤ææ¯å¦æ¯å ¬å ±éåçãå¨externalSubmitå建工ä½éåæ¶ï¼æï¼
q.config = k | SHARED_QUEUE;
å ¶ä¸qæ¯æ°å建çworkQueueï¼kå°±æ¯qå¨workQueuesæ°ç»ä¸çä½ç½®ï¼SHARED_QUEUE=1<<ï¼æ³¨æè¿éconfig没æä¿çmodeçä¿¡æ¯ã
èå¨registerWorkerä¸ï¼åæ¯è¿æ ·ç»workQueueçconfigèµå¼çï¼
w.config = i | mode;
wæ¯æ°å建çworkQueueï¼iæ¯å ¶å¨workQueuesæ°ç»ä¸çä½ç½®ï¼æ²¡æ设置SHARED_QUEUEæ è®°ä½
scanStateæ¯workQueueçå±æ§ï¼æ¯intç±»åçãscanStateçä½ä½å¯ä»¥ç¨æ¥å®ä½å½åworkerå¤äºworkQueuesæ°ç»çåªä¸ªä½ç½®ãæ¯ä¸ªworkerå¨è¢«å建æ¶ä¼å¨å ¶æé å½æ°ä¸è°ç¨poolçregisterWorkerï¼èregisterWorkerä¼ç»scanStateèµä¸ä¸ªåå§å¼ï¼è¿ä¸ªå¼æ¯å¥æ°ï¼å 为workeræ¯ç±createWorkerå建ï¼å¹¶ä¼è¢«æ¾å°WorkQueuesçå¥æ°ä½ç½®ï¼ècreateWorkerå建workeræ¶ä¼è°ç¨registerWorkerã
ç®è¨ä¹ï¼workerçscanStateåå§å¼æ¯å¥æ°ï¼éworkerçscanstateåå§å¼=INACTIVE=1<<ï¼å°äº0ï¼éworkerçworkQueueå¨externalSubmitä¸å建ï¼ã
å½æ¯æ¬¡è°ç¨signalWorkï¼ætryReleaseï¼å¤éworkeræ¶ï¼workerçé«ä½å°±ä¼å 1
å¦å¤ï¼scanState<0表示workeræªæ¿æ´»ï¼å½workerè°ç¨runtaskæ§è¡ä»»å¡æ¶ï¼scanStateä¼è¢«ç½®ä¸ºå¶æ°ï¼å³è®¾ç½®scanStateçæå³è¾¹ä¸ä½ä¸º0ã
workerä¼ç æ¶ï¼æ¯è¿æ ·åå¨ç
workerçå¤é类似è¿æ ·ï¼
å¨workerä¼ç ç4è¡ä¼ªç ä¸ï¼è®©ctlçä½ä½çå¼å为worker.scanStateï¼è¿æ ·ä¸æ¬¡å°±å¯ä»¥éè¿scanStateå¤é该workerãå¤é该workeræ¶ï¼æ该workerçpreStack设置为ctlä½ä½çå¼ï¼è¿æ ·ä¸ä¸æ¬¡å¤éçworkerå°±æ¯scanStateçäºè¯¥preStackçworkerã
è¿ééè¿preStackä¿åä¸ä¸ä¸ªworkerï¼è¿ä¸ªworkeræ¯å½åworkeræ´æ©å°å¨çå¾ ï¼æ以形æä¸ä¸ªåè¿å åºçæ ã
runStateæ¯intç±»åçå¼ï¼æ§å¶æ´ä¸ªpoolçè¿è¡ç¶æåçå½å¨æï¼æä¸é¢å 个å¼ï¼å¯ä»¥å¥½å 个å¼åæ¶åå¨ï¼ï¼
å¦ærunStateå¼ä¸º0ï¼è¡¨ç¤ºpoolå°æªåå§åã
RSLOCK表示éå®poolï¼å½æ·»å workeråpoolç»æ¢æ¶ï¼å°±è¦ä½¿ç¨RSLOCKéå®æ´ä¸ªpoolãå¦æç±äºrunState被éå®ï¼å¯¼è´å ¶ä»æä½çå¾ runState解éï¼é常ç¨waitè¿è¡çå¾ ï¼ï¼å½runState设置äºRSIGNALï¼è¡¨ç¤ºrunState解éï¼å¹¶éç¥ï¼notifyAllï¼çå¾ çæä½ã
å©ä¸4个å¼é½è·runStateçå½å¨ææå ³ï¼é½å¯ä»¥é¡¾åæä¹ï¼
å½éè¦åæ¢æ¶ï¼è®¾ç½®runStateçSTOPå¼ï¼è¡¨ç¤ºåå¤å ³éï¼è¿æ ·å ¶ä»æä½çå°è¿ä¸ªæ è®°ä½ï¼å°±ä¸ä¼ç»§ç»æä½ï¼æ¯å¦tryAddWorkerçå°STOPå°±ä¸ä¼åå建workerï¼
ètryTerminate对è¿äºçå½å¨æç¶æçå¤çåæ¯è¿æ ·çï¼
å½åtopåbaseçåå§å¼ä¸º INITIAL_QUEUE_CAPACITY >>>1= (1 << )>>>1 = /2ãç¶åpushä¸ä¸ªtaskä¹åï¼top+=1ï¼ä¹å°±æ¯è¯´ï¼top对åºçä½ç½®æ¯æ²¡ætaskçï¼æè¿pushè¿æ¥çtaskå¨top-1çä½ç½®ãèbaseçä½ç½®åè½å¯¹åºå°taskï¼base对åºæå æ¾è¿éåçtaskï¼top-1对åºæåæ¾è¿éåçtaskã
qlockå¼å«ä¹ï¼1: locked, < 0: terminate; else 0
å³å½qlockå¼ä½0æ¶ï¼å¯ä»¥æ£å¸¸æä½ï¼å¼=1æ¶ï¼è¡¨ç¤ºéå®
int SQMASK=0xeï¼åä»»ä½æ´æ°è·SQMASKä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ã
è¯æï¼
注æè¿éå为äºè¿å¶æ¯ ï¼å°¤å ¶æ³¨ææå³è¾¹ç¬¬ä¸ä½æ¯0ï¼ä»»ä½æ°è·æå³è¾¹ç¬¬ä¸ä½æ¯0çæ°ä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ï¼å 为ä½ä¸ä¹åï¼ç¬¬ä¸ä½å°±æ¯0ï¼æ¯å¦s=A&SQMASKï¼Aå¯ä»¥æ¯ä»»ææ´æ°ï¼ç¶åæsæäºè¿å¶è¿è¡å¤é¡¹å¼å±å¼ï¼åæs=2 n1+2 n2 â¦â¦+2^nnï¼è¿énâ¥1ï¼æ以så¯ä»¥è¢«2æ´é¤ï¼å³sæ¯å¶æ°ã
æ以ä¸ä¸ªæ°æ¯å¥æ°è¿æ¯å¶æ°ï¼çå ¶æå³è¾¹ç¬¬ä¸ä½å³å¯ã
æ们ç¥éworkQueueæexternalPushå建çåcreateWorkerå建çworkerï¼ä¸¤ç§æ¹å¼å建çworkQueueï¼å ¶æ¾ç½®å°workQueuesçä½ç½®æ¯ä¸åçï¼åè æ¾å°workQueueçå¶æ°ä½ç½®ï¼èåè åæ¾å°å¥æ°ä½ç½®ãä¸åworkQueueæ¾å°èªå·±å¨workQueuesçä½ç½®çç®æ³æç¹ä¸åã
ä¸é¢çä¸ä¸forkjoinæ¡æ¶è·åworkQueuesä¸çå¶æ°ä½ç½®çworkQueueçç®æ³ï¼
è¿æ ·å°±è½è·åworkQueuesçå¶æ°ä½ç½®çworkQueueãmä¿è¯m & r & SQMASKè¿æ´ä¸ªè¿ç®ç»æä¸ä¼è¶ åºworkQueuesçä¸æ ï¼SQMASKä¿è¯åå°çæ¯å¶æ°ä½ç½®çworkQueueãè¿éæä¸ä¸ªæ趣çç°è±¡ï¼å设0å°workQueues.length-1ä¹é´æn个å¶æ°ï¼m & r & SQMASKæ¯æ¬¡é½è½åå°å ¶ä¸ä¸ä¸ªå¶æ°ï¼èä¸è¿ç»n次åå°çå¶æ°ä¸ä¼åºç°éå¤å¼ï¼æ£åæ§é常好ãèä¸æ¯å¾ªç¯çï¼å³1å°n次ån个ä¸åå¶æ°ï¼n+1å°2nä¹æ¯ån次ä¸åå¶æ°ï¼æ¤æ¶n个å¶æ°æ¯ä¸ªé½è¢«éæ°åä¸æ¬¡ãä¸é¢åæä¸rå¼æä»ä¹ç§å¯ï¼ä¸ºä½è½ä¿è¯è¿æ ·çæ£åæ§
ThreadLocalRandomå æä¸å¸¸éPROBE_INCREMENT = 0x9eb9ï¼ä»¥åä¸ä¸ªéæçprobeGenerator =new AtomicInteger() ï¼ç¶åæ¯ä¸ªçº¿ç¨çprobe= probeGenerator.addAndGet(PROBE_INCREMENT)æ以第ä¸ä¸ªçº¿ç¨çprobeå¼æ¯0x9eb9ï¼ç¬¬äºä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9ï¼ç¬¬ä¸ä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9+0x9eb9以æ¤ç±»æ¨ï¼æ´ä¸ªå¼æ¯çº¿æ§çï¼å¯ä»¥ç¨y=kx表示ï¼å ¶ä¸k=0x9eb9ï¼x表示第å 个线ç¨ãè¿æ ·æ¯ä¸ªçº¿ç¨çprobeå¯ä»¥ä¿è¯ä¸ä¸æ ·ï¼èä¸å ·æå¾å¥½ç离æ£æ§ã
å®é ä¸ï¼å¯ä»¥ä¸ç¨0x9eb9è¿ä¸ªå¼ï¼ç¨ä»»æä¸ä¸ªå¥æ°é½æ¯å¯ä»¥çï¼æ¯å¦1ãå¦æç¨1çè¯ï¼probe+=1ï¼è¿æ ·æ¯ä¸ªçº¿ç¨çprobeå°±é½æ¯ä¸åçï¼èä¸å ·æå¾å¥½ç离æ£æ§ãä¹å°±æ¯è¯´ï¼å设æéå¶æ¡ä»¶probe<nï¼è¶ è¿nå产ç溢åºãåprobeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼n次åprobeæ¯æ¬¡èªå çå¼é½ä¸åãå®é ä¸ç¨ä»»æä¸ä¸ªå¥æ°ï¼é½å¯ä»¥ä¿è¯probeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼æå ´è¶£å¯çæ¬ææåéå½é¨åãç±äºå¥æ°ç离æ£æ§ï¼æ以åªè¦çº¿ç¨æ°å°äºmæè SQMASK两è ä¸çæå°å¼ï¼åæ¯ä¸ªçº¿ç¨é½è½å¯ä¸å°å æ®ä¸ä¸ªwsä¸çä¸ä¸ªä½ç½®
å½ä¸ä¸ªæä½æ¯å¨éForkjoinThreadç线ç¨ä¸è¿è¡çï¼å称该æä½ä¸ºå¤é¨æä½ãæ¯å¦æ们åé¢æ§è¡pool.invokeï¼invokeå åæ§è¡externalPushãç±äºinvokeæ¯å¨éForkjoinThread线ç¨ä¸è¿è¡çï¼è¿éæ¯å¨main线ç¨ä¸è¿è¡ï¼ï¼æ以æ¯ä¸ä¸ªå¤é¨æä½ï¼è°ç¨çæ¯externalPushãä¹åtaskçæ§è¡æ¯éè¿ForkJoinThreadæ¥æ§è¡çï¼æ以taskä¸çforkå°±æ¯å é¨æä½ï¼è°ç¨çæ¯pushï¼æä»»å¡æ交å°å·¥ä½éåãå ¶å®forkçå®ç°æ¯ç±»ä¼¼ä¸é¢è¿æ ·çï¼
å³forkä¼æ ¹æ®æ§è¡èªèº«ç线ç¨æ¯å¦æ¯ForkJoinThreadçå®ä¾æ¥å¤ææ¯å¤äºå¤é¨è¿æ¯å é¨ãé£ä¸ºä½è¦åºåå å¤é¨ï¼
ä»»ä½çº¿ç¨é½å¯ä»¥ä½¿ç¨ForkJoinæ¡æ¶ï¼ä½æ¯å¯¹äºéForkJoinThreadç线ç¨ï¼å®å°åºæ¯ææ ·çï¼ForkJoinæ æ³æ§å¶ï¼ä¹æ æ³å¯¹å ¶ä¼åãå æ¤åºååºå å¤é¨ï¼è¿æ ·æ¹ä¾¿ForkJoinæ¡æ¶å¯¹ä»»å¡çæ§è¡è¿è¡æ§å¶åä¼å
forkJoinPool.invoke(task)æ¯æä»»å¡æ¾å ¥å·¥ä½éåï¼å¹¶çå¾ ä»»å¡æ§è¡ãæºç å¦ä¸
è¿éexternalPushè´è´£ä»»å¡æ交ï¼externalPushæºç å¦ä¸ï¼