1.Vert.x 源码解析(4.x)——Future源码解析
2.一文详解RocketMQ-Spring的监听源码解析与实战
3.物联网设备常见的web服务器——uhttpd源码分析(二)
4.å
³äºjavaççå¬å¨
5.å¦ä½å¨ASP.NET Core 6ä¸ä½¿ç¨è·è¸ªçå¬å¨
6.图解+源码讲解 Nacos 客户端动态监听配置机制
Vert.x 源码解析(4.x)——Future源码解析
在现代软件开发中,异步编程的器源重要性日益凸显,提升并发性能并处理大量并行操作。码监码多Vert.x,听代作为一款基于事件驱动和非阻塞设计的监听异步框架,提供了丰富的器源复仇源码工具简化异步编程。本文将深入解析Vert.x 4.x版本的码监码多Future源码,理解其关键类和功能。听代1. 异步核心
Vert.x的监听核心在于FutureImpl和PromiseImpl,它们是器源实现异步操作的关键。AsyncResult是码监码多通用接口,用于表示异步操作的听代结果,包含成功值或失败异常。监听2. Future类详解
Future扩展了AsyncResult,器源提供了组合操作如join、码监码多any、all和map等功能。内部的FutureInternal主要负责添加监听器,FutureBase负责执行监听器和转换函数。 具体来说,FutureImpl的onComplete方法接收一个handler,任务完成后执行,群控源码而tryComplete则在异步操作有结果时触发,最终调用用户指定的handler。 相比之下,Promise允许用户手动设置异步结果,PromiseImpl继承自FutureImpl,并增加了context获取功能。3. 实例与源码分析
通过简单的入门实例,如独立使用Future,我们可以看到Vert.x如何通过创建PromiseImpl获取Future。源码分析显示,Promise.future获取Future,OnComplete用于添加监听,而complete方法则用于设置值并通知监听器。4. 深入源码
在源码层面,addListener和emitSuccess方法在OnComplete中扮演重要角色。而complete方法,特别是tryComplete,是设置值并触发监听的关键。5. 总结
总的来说,理解Vert.x中的Future,就是ssc源码创建PromiseImpl获取Future,通过OnComplete添加监听器,然后通过Promise的complete方法设置值并通知监听器。后续还将深入探讨其他Future实现类,如all、any和map的原理。一文详解RocketMQ-Spring的源码解析与实战
RocketMQ-Spring源码解析与实战概览
这篇文章详细阐述了在Spring Boot项目中如何运用rocketmq-spring SDK进行消息收发,以及开发者视角下SDK的设计逻辑。通过一步步操作流程,理解其在生产者和消费者端的实际应用。SDK简介
rocketmq-spring本质上是一个Spring Boot启动器,通过“约定优于配置”的理念简化集成过程。只需在pom.xml中引入依赖,并在配置文件中进行简单的配置,如添加名字服务地址和生产者组。配置与操作流程
1. 在pom.xml引入依赖并配置,如生产者和消费者配置。生产者配置:包含名字服务地址和生产者组
消费者配置:实现消息监听器
核心源码分析
rocketmq-spring的核心模块包括启动器、SDK模块和示例代码模块,源码中着重解析了RocketMQTemplate类和消费者启动机制,如生产者模板封装和消费者消息处理逻辑。生产者模板与消费者启动
生产者:通过RocketMQProperties对象绑定配置,在线工具源码创建生产者Bean并整合到RocketMQTemplate中
消费者:通过ListenerContainerConfiguration自动启动,封装RocketMQListener的消费逻辑
进阶学习
要深入学习rocketmq-spring,可以从实际操作、模块设计、starter设计思路和源码理解四个方面逐步提升。物联网设备常见的web服务器——utl` 函数通过改变已打开文件的性质来实现对文件的控制,具体操作包括改变描述符的属性,为后续的服务器操作提供灵活性。关于这一函数的使用,详细内容可参考相关技术文档。
`uh_setup_listeners` 函数在服务器配置中占有重要地位,主要关注点在于设置监听器的回调函数。这一过程确保了当通过 epoll 有数据到达时,能够调用正确的处理函数。这一环节是实现高效服务器响应的关键步骤。
`setsockopt` 函数被用于检查网络异常后的操作,通过设置选项层次(如 SOL_SOCKET、IPPROTO_TCP 等)和特定选项的值,实现对网络连接的优化与控制。此功能的详细解释和示例请查阅相关开源社区或技术资料。
`listener_cb` 函数是分销小程序源码 uHTTPd 的关键回调函数之一,它在 epoll 事件发生时被调用,用于处理客户端连接。其后,`uh_accept_client` 函数负责实际的连接接受过程,通过 `calloc` 函数分配内存空间,并返回指向新分配内存的指针。这一步骤确保了分配的内存空间被初始化为零,为后续数据处理做好准备。
`accept` 函数在客户端连接请求处理中扮演重要角色,它从服务器监听的 socket 中接收新的连接请求,并返回一个用于与客户端通信的新的套接字描述符。对于这一函数的具体实现和使用细节,可以参考相关技术论坛或开发者文档。
`getsockname` 函数用于服务器端获取相关客户端的地址信息,这对于维护连接状态和进行数据传输具有重要意义。此函数的详细用法和示例可查阅相关技术资源。
`ustream_fd_init` 函数通过回调函数 `client_ustream_read_cb` 实现客户端数据的真正读取,而 `client_ustream_read_cb` 则负责操作从客户端读取的数据,确保数据处理的高效性和准确性。
å ³äºjavaççå¬å¨
1ãpublic void addWindowListener(WindowListener l)æ·»å æå®ççªå£ä¾¦å¬å¨ï¼ä»¥ä»æ¤çªå£æ¥æ¶çªå£äºä»¶ãå¦æ l 为 nullï¼åä¸æåºä»»ä½å¼å¸¸ï¼ä¸ä¸æ§è¡ä»»ä½æä½ãè¿ä¸ªæ¯APIä¸çæ¹æ³å®ä¹ï¼æ¤æ¹æ³åæ°ä¸ºæ¥å£WindowListenerï¼ä»»ä½å®ç°è¯¥æ¥å£çç±»é½å¯ä»¥ä½ä¸ºåæ°ã
2ãpublic abstract class WindowAdapter implements WindowListener, WindowStateListener, WindowFocusListener
æ¥æ¶çªå£äºä»¶çæ½è±¡éé å¨ç±»ãæ¤ç±»ä¸çæ¹æ³ä¸ºç©ºãæ¤ç±»åå¨çç®çæ¯æ¹ä¾¿å建侦å¬å¨å¯¹è±¡ã
æ©å±æ¤ç±»å¯å建 WindowEvent 侦å¬å¨å¹¶ä¸ºæéäºä»¶éå该æ¹æ³ãï¼å¦æè¦å®ç°
WindowListener æ¥å£ï¼åå¿ é¡»å®ä¹è¯¥æ¥å£å çæææ¹æ³ãæ¤æ½è±¡ç±»å°æææ¹æ³é½å®ä¹ä¸º
nullï¼æ以åªééå¯¹å ³å¿çäºä»¶å®ä¹æ¹æ³ãï¼
使ç¨æ©å±çç±»å¯ä»¥å建侦å¬å¨å¯¹è±¡ï¼ç¶å使ç¨çªå£ç addWindowListener
æ¹æ³å该çªå£æ³¨å侦å¬å¨ãå½éè¿æå¼ãå ³éãæ¿æ´»æåç¨ãå¾æ åæåæ¶å¾æ åèæ¹åäºçªå£ç¶ææ¶ï¼å°è°ç¨è¯¥ä¾¦å¬å¨å¯¹è±¡ä¸çç¸å ³æ¹æ³ï¼å¹¶å°
WindowEvent ä¼ éç»è¯¥æ¹æ³ã
3ãå¦æææ³å¨ä»£ç ä¸ä¸æ¬¡æ§ä½¿ç¨æ个类ï¼æ½è±¡ç±»æå ·ä½ç±»ï¼ææ¥å£ï¼å¯ä»¥ä½¿ç¨å¿åç±»çæ¹å¼ï¼è¿æ ·ä¸éèªå·±å®ä¹ä¸ä¸ªMy***ç±»ï¼ç¶åå使ç¨ï¼æ¯è¾æ¹ä¾¿ãç¨æ³å°±æ¯ç´æ¥å¨new WindowAdapter()åé¢å å ¥ç±»å®ä¹ï¼å¨å ¶ä¸å®ç°æè¦çæ¹æ³å°±å¯ä»¥äºã
å¿åç±»ä¸æ¯è¿åå¼ï¼èæ¯ç¸å½äºnew String(âhelloâ)è¿ç§çæ©å±å½¢å¼ãæè§å¾å¿åç±»çæå¤ç¨å¤å°±æ¯å çå¬å¨æ¶ã
éä¸WindowAdapteræºä»£ç ï¼
public abstract class WindowAdapterimplements WindowListener, WindowStateListener, WindowFocusListener
{
public void windowOpened(WindowEvent e) { }
public void windowClosing(WindowEvent e) { }
public void windowClosed(WindowEvent e) { }
public void windowIconified(WindowEvent e) { }
public void windowDeiconified(WindowEvent e) { }
public void windowActivated(WindowEvent e) { }
public void windowDeactivated(WindowEvent e) { }
public void windowStateChanged(WindowEvent e) { }
public void windowGainedFocus(WindowEvent e) { }
public void windowLostFocus(WindowEvent e) { }
}
å¦ä½å¨ASP.NET Core 6ä¸ä½¿ç¨è·è¸ªçå¬å¨
å½ä½¿ç¨ASP.NET Core 6æ建çåºç¨ç¨åºæ¶ï¼ä½ å¯è½ç»å¸¸æ³ä½¿ç¨è·è¸ªåè®°å½æ¥çæ§ä½ çåºç¨ç¨åºçæ§è½åè¯æé误ãä½ ä¹å¯ä»¥å¨ç产ç¯å¢ä¸ä½¿ç¨è·è¸ªæ¥è¡¡éä½ çåºç¨ç¨åºå¨è¿è¡æ¶ç表ç°ãè¿ç¯æç« è®¨è®ºäºæ们å¦ä½å¨ASP.NET Core 6ä¸ä½¿ç¨è¿½è¸ªãæ们å°ç 究å¦ä½ä½¿ç¨è·è¸ªçå¬å¨æ¥æ¶éè·è¸ªä¿¡æ¯ï¼å¹¶ä½¿ç¨ILoggerå°è·è¸ªè¾åºå¼å¯¼å°äºä»¶æ¥å¿ä¸ã
è¦ä½¿ç¨æ¬ææä¾ç代ç 示ä¾ï¼ä½ çç³»ç»ä¸åºè¯¥å®è£ æVisual Studio ãå¦æä½ è¿æ²¡æå¯æ¬ï¼ä½ å¯ä»¥å¨è¿éä¸è½½Visual Studio ã
å¨Visual Studio ä¸å建ä¸ä¸ªASP.NET Core Web API项ç®é¦å ï¼è®©æ们å¨Visual Studio ä¸å建ä¸ä¸ªASP.NET Core项ç®ãæç §è¿äºæ¥éª¤å°å¨Visual Studio ä¸å建ä¸ä¸ªæ°çASP.NET Core 6 Web API项ç®ã
å¯å¨Visual Studio IDEã
ç¹å» "å建æ°é¡¹ç®"ã
å¨ "å建æ°é¡¹ç® "çªå£ï¼ä»æ¾ç¤ºç模æ¿å表ä¸éæ© "ASP.NET Core Web API"ã
ç¹å» "ä¸ä¸æ¥"ã
å¨ "é ç½®ä½ çæ°é¡¹ç® "çªå£ä¸ï¼æå®æ°é¡¹ç®çå称åä½ç½®ã
æ ¹æ®ä½ çå好ï¼å¯ä»¥éæ©å¾é "å°è§£å³æ¹æ¡å项ç®æ¾å¨åä¸ç®å½ä¸ "å¤éæ¡ã
ç¹å» "ä¸ä¸æ¥"ã
å¨æ¥ä¸æ¥æ¾ç¤ºç "éå ä¿¡æ¯ "çªå£ä¸ï¼ç¡®ä¿å¾é "使ç¨æ§å¶å¨... "çå¤éæ¡ï¼å 为æ们å¨è¿ä¸ªä¾åä¸ä¸ä¼ä½¿ç¨æå°çAPIãå° "éªè¯ç±»å "ä¿ç为 "æ "ï¼é»è®¤ï¼ã
ç¡®ä¿ "å¯ç¨Docker"ã"为HTTPSé ç½® "å "å¯ç¨å¼æ¾APIæ¯æ "çå¤éæ¡ä¸è¢«éä¸ï¼å 为æ们ä¸ä¼å¨è¿é使ç¨ä»»ä½è¿äºåè½ã
ç¹å»å建ã
æ们å°ä½¿ç¨è¿ä¸ªASP.NET Core 6 Web API项ç®ï¼å¨æ¬æçåç»é¨å使ç¨è·è¸ªçå¬å¨ã
ä»ä¹æ¯è·è¸ªï¼ä¸è¿½è¸ªä¸»è¦äºä»¶çäºä»¶æ¥å¿ç¸æ¯ï¼è¿½è¸ªå¯ä»¥æ´å ¨é¢å°äºè§£è¿è¡ä¸çåºç¨ç¨åºåå ¶ç»ä»¶ãæ¥å¿ç±ç»æåæéç»æåçæ¶é´æ³æ°æ®ç»æï¼æ¾ç¤ºäºä½ çåºç¨ç¨åºä¸åççäºä»¶çè®°å½ã追踪æä¾äºå¯¹å个请æ±ä»¥åå®å¦ä½è¢«å¤ççæ´å¤å¯è§æ§ã
System.Diagnosticså½å空é´å å«TraceåDebugç±»ãè·è¸ªç±»å¨ç产ç¯å¢ä¸ä½¿ç¨ï¼èè°è¯ç±»å¨å¼åæ¶ä½¿ç¨ã
追踪é常å æ¬ä»¥ä¸ä¸ä¸ªé¶æ®µã
å·¥å ·åãæ们ç¼åå¿ è¦ç代ç æ¥æè·ç¸å ³ä¿¡æ¯
追踪ãæ们æè·è¸ªä¿¡æ¯åå°æå®çç®æ ï¼å³äºä»¶æ¥å¿ãææ¬æ件ãæ°æ®åºè¡¨çã
åæãæ们åæä»è·è¸ªä¸æ¶éå°çä¿¡æ¯ï¼ä»¥ç¡®å®åºç¨ç¨åºä¸çç¶é¢ã
ä»ä¹æ¯è·è¸ªçå¬å¨ï¼ä¸ºä»ä¹éè¦å®ä»¬ï¼è·è¸ªçå¬å¨æ¶éè·è¸ªä¿¡æ¯ï¼åå¨å®ä»¬ï¼å¹¶å°å®ä»¬å¼å¯¼å°ä¸ä¸ªéå½çç®æ ï¼å¦ææ¬æ件ã.NETæä¾äºå ç§è·è¸ªçå¬å¨ï¼å æ¬ä»¥ä¸å ç§ã
ConsoleTraceListener - å°è·è¸ªä¿¡æ¯åéå°æ§å¶å°çªå£ã
DefaultTraceListener - å°è·è¸ªä¿¡æ¯åéå°æ åè°è¯è¾åºã
DelimitedListTraceListener - å°è·è¸ªè¾åºä»¥éå®çæ ¼å¼åéå°æµãæµä½å®¶æææ¬ä½å®¶ã
EventLogTraceListener - åéè·è¸ªä¿¡æ¯å°äºä»¶æ¥å¿ã
TextWriterTraceListener - åéè·è¸ªä¿¡æ¯å°ä¸ä¸ªææ¬æ件ã
XmlWriterTraceListener - å°è·è¸ªä¿¡æ¯è½¬æ¢ä¸ºXMLã
System.Diagnostics.DebugåSystem.Diagnostics.Traceç±»å¯ä»¥åè·è¸ªçå¬å¨åéæ¶æ¯ï¼èçå¬å¨åå°æ¶æ¯åéå°éå½çç®æ ã
å¨ASP.NET Core 6ä¸ä½¿ç¨é ç½®æ件å建ä¸ä¸ªè·è¸ªçå¬å¨ä½ å¯ä»¥éè¿ä½¿ç¨é ç½®æ件æç¼åèªå®ä¹ä»£ç æ¥å建ä¸ä¸ªè·è¸ªçå¬å¨ãä¸é¢æ¾ç¤ºç代ç çæ说æäºå¦ä½ä½¿ç¨ä½ çåºç¨ç¨åºé ç½®æ件å建ä¸ä¸ªè·è¸ªçå¬å¨ã
<configuration>æææ·»å å°çå¬å¨éåä¸ççå¬å¨é½ä¼æ¶å°è·è¸ªè¾åºãç¶èï¼ä½ å¯ä»¥ä½¿ç¨ä¸ä¸ªçå¬å¨èä¸æå®æ·»å å°çå¬å¨éåä¸ãå¨è¿ç§æ åµä¸ï¼ä½ å¨çå¬å¨ä¸ä½¿ç¨WriteæWriteLineæ¹æ³åéè¾åºã
ä¸é¢ç代ç 说æäºä¸ä¸ªçå¬å¨ï¼å®æ²¡æ被添å å°çå¬å¨éåä¸ï¼ä½ä»ç¶è½å¤å°è·è¸ªä¿¡æ¯åéå°è¾åºçªå£ãæ件æä»»ä½é¢å é ç½®çè¾åºã
TextWriterTraceListenermyFirstListener=newå¨ASP.NET Core 6ä¸å建ä¸ä¸ªèªå®ä¹è·è¸ªçå¬å¨å¨å¤§å¤æ°æ åµä¸ï¼.NET 6é»è®¤é带çè·è¸ªçå¬å¨å°æ»¡è¶³æ¨çè¦æ±ãç¶èï¼å¦æä½ æ³æä½ çè·è¸ªä¿¡æ¯è¾åºå°ä¸åçç®çå°ï¼ä½ å¯ä»¥å®ç°ä½ èªå·±çè·è¸ªçå¬å¨ã
è¦å»ºç«ä¸ä¸ªèªå®ä¹çè·è¸ªçå¬å¨ï¼ä½ åºè¯¥å建ä¸ä¸ªæ©å±TraceListeneræ½è±¡ç±»çç±»ãå¨TraceListenerç±»ä¸æå 个èæåæ½è±¡çæ¹æ³ãä½ è³å°åºè¯¥å®ç°WriteåWriteLineæ¹æ³ãè³å°ï¼ä½ çèªå®ä¹è·è¸ªçå¬å¨åºè¯¥çèµ·æ¥åè¿æ ·ã
publicclassCustomTraceListener:TraceListeneræ以ï¼ä½ çèªå®ä¹è·è¸ªçå¬å¨ç±»å¿ é¡»æä¸ä¸ªåæ°æé å¨åWriteåWriteLineæ¹æ³ã
ä½ è¿éè¦ä¸ä¸ªILoggerå®ä¾æ¥ä»£è¡¨è®°å½å¨ï¼ä¸ä¸ªè®°å½å¨å·¥åæ¥å建记å½å¨ï¼ä»¥åä¸ä¸ªStringBuilderæ¥åå¨è·è¸ªæ¶æ¯ï¼ç¶ååå°å®ä»¬åéå°æ¥å¿ç®æ ã
privatereadonlyILoggerFactory_loggerFactory;ä½ å¯ä»¥å©ç¨ä¾èµæ³¨å ¥çä¼å¿ï¼å¨æé å½æ°ä¸æ³¨å ¥ILoggerFactoryçä¸ä¸ªå®ä¾ï¼ç¶å使ç¨è¯¥å®ä¾æ¥å建ILoggerçä¸ä¸ªå®ä¾ã
publicCustomTraceListener(ILoggerFactoryloggerFactory)è¿éæ¯WriteåWriteLineæ¹æ³çä¸ä¸ªæå°å®ç°ã
publicoverridevoidWrite(string?message,string?category)ASP.NET Core 6ä¸å®æ´çèªå®ä¹è·è¸ªçå¬å¨ç¤ºä¾ä¸é¢æ¯æ们对èªå®ä¹è·è¸ªçå¬å¨çæå°å®ç°çå®æ´æºä»£ç ï¼ä¾æ¨åèã
usingSystem.Collections.Concurrent;å¨Program.csæ件ä¸æ³¨åèªå®ä¹è·è¸ªçå¬å¨è¦ä½¿ç¨èªå®ä¹è·è¸ªçå¬å¨ï¼ä½ åºè¯¥ä½¿ç¨ä»¥ä¸ä»£ç å¨çå¬å¨éåä¸æ³¨åå®ã
varloggerFactory=app.Services.GetRequiredService<ILoggerFactory>();å 为æ们çèªå®ä¹è·è¸ªçå¬å¨å·²ç»è¢«æ·»å å°çå¬å¨éåä¸ï¼å®å°æè·ææç±è¿è¡æ¶äº§ççè·è¸ªæ¶æ¯ï¼å¹¶å°è¾åºåéå°æ们çè®°å½å¨ãå®è¿å°åéæ们å¨åºç¨ç¨åºä¸æç¡®åéçä»»ä½è·è¸ªæ¶æ¯ï¼å°±åæ们å¨åé¢çmyFirstListenerä¾åä¸åçé£æ ·ï¼ã
å æ¤ï¼ä»»ä½æ·»å å°çå¬å¨éåççå¬å¨é½å¯ä»¥æè·ç±è¿è¡æ¶äº§ççè·è¸ªä¿¡æ¯ï¼ä»¥åå¨åºç¨ç¨åºä¸æç¡®åéçä»»ä½è·è¸ªä¿¡æ¯ãç¶èï¼å¦æä¸ä¸ªè·è¸ªçå¬å¨æ²¡æ被添å å°éåä¸ï¼å®åªè½åéåºç¨ç¨åºä¸æç¡®åéçè·è¸ªæ¶æ¯ãå®ä¸ä¼æè·ä»»ä½ç±è¿è¡æ¶äº§ççè·è¸ªæ¶æ¯ã
å½ä½¿ç¨èªå®ä¹è·è¸ªçå¬å¨æ¶ï¼ä½ å¿ é¡»è®°ä½å ³éæå·æ°è·è¸ªçå¬å¨ï¼ä»¥ç¡®ä¿è¾åºç¼å²åºè¢«æ¸ 空ãä½ å¯ä»¥å©ç¨StringBuilderCacheç±»æ¥ä¼åä½ ä½¿ç¨StringBuilderç代ç ï¼å¨CustomTraceListenerç±»ï¼ã
图解+源码讲解 Nacos 客户端动态监听配置机制
图解+源码讲解 Nacos 客户端动态监听配置机制
在人生中第一要紧的是发现自己。为了这个目的,各位时常需要孤独和深思 —— 南森 Nacos 源码分析系列相关文章
从零开始看 Nacos 源码环境搭建
图解+源码讲解 Nacos 客户端发起注册流程
图解+源码讲解 Nacos 服务端处理注册请求逻辑
图解+源码讲解 Nacos 客户端下线流程
图解+源码讲解 Nacos 服务端处理下线请求
图解+源码讲解 Nacos 客户端发起心跳请求
图解+源码讲解 Nacos 服务端处理心跳请求
图解+源码讲解 Nacos 服务端处理配置获取请求
图解+源码讲解 Nacos 客户端动态监听配置机制
NacosConfigAutoConfiguration我们看到这里面其实注入了一个 Nacos 配置刷新的关键 NacosContextRefresherBean
@Configuration@ConditionalOnProperty(name?=?"spring.cloud.nacos.config.enabled",?matchIfMissing?=?true)public?class?NacosConfigAutoConfiguration?{ //?Nacos?配置属性@Beanpublic?NacosConfigProperties?nacosConfigProperties(ApplicationContext?context)?{ if?(context.getParent()?!=?null&&?BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getParent(),?NacosConfigProperties.class).length?>?0)?{ return?BeanFactoryUtils.beanOfTypeIncludingAncestors(context.getParent(),NacosConfigProperties.class);}return?new?NacosConfigProperties();}//?Nacos?配置刷新属性@Beanpublic?NacosRefreshProperties?nacosRefreshProperties()?{ return?new?NacosRefreshProperties();}//?Nacos?刷新历史@Beanpublic?NacosRefreshHistory?nacosRefreshHistory()?{ return?new?NacosRefreshHistory();}//?Nacos?配置管理@Beanpublic?NacosConfigManager?nacosConfigManager(NacosConfigProperties?nacosConfigProperties)?{ return?new?NacosConfigManager(nacosConfigProperties);}//?Nacos?配置刷新@Beanpublic?NacosContextRefresher?nacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?nacosRefreshHistory)?{ return?new?NacosContextRefresher(nacosConfigManager,?nacosRefreshHistory);}}NacosContextRefresher 配置中心刷新public?NacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?refreshHistory)?{ //?获取配置属性信息this.nacosConfigProperties?=?nacosConfigManager.getNacosConfigProperties();//?刷新历史this.nacosRefreshHistory?=?refreshHistory;//?获取配置服务this.configService?=?nacosConfigManager.getConfigService();//?是否开启刷新,是truethis.isRefreshEnabled?=?this.nacosConfigProperties.isRefreshEnabled();}获取配置服务 getConfigServicenacosConfigManager.getConfigService(),这行代码其实就是为了创建 NcaosConfigService 对象,我们看看你是怎么创建的,其实核心代码就是通过 NacosFactory 反射创建的 NcaosConfigService 对象,这个对象是一个核心对象后续会讲到的
public?static?ConfigService?createConfigService(Properties?properties)?throws?NacosException?{ try?{ //?加载?NacosConfigService?类Class<?>?driverImplClass?=?Class.forName("com.alibaba.nacos.client.config.NacosConfigService");//?获取构造器Constructor?constructor?=?driverImplClass.getConstructor(Properties.class);//?创建实例ConfigService?vendorImpl?=?(ConfigService)?constructor.newInstance(properties);return?vendorImpl;}?catch?(Throwable?e)?{ throw?new?NacosException(NacosException.CLIENT_INVALID_PARAM,?e);}}监听器NacosContextRefresher 实现了 ApplicationListener ,一看这就是一个监听器了,我们看看这个在监听器里面做了什么操作
@Overridepublic?void?onApplicationEvent(ApplicationReadyEvent?event)?{ //?这是一个?CAS?操作,只设置一次if?(this.ready.compareAndSet(false,?true))?{ //?注册?Nacos?监听器对于应用this.registerNacosListenersForApplications();}}注册 Nacos 监听/**register Nacos Listeners. 注册Nacos监听器 */ private void registerNacosListenersForApplications() { // 默认是 true if (isRefreshEnabled()) { // 遍历Nacos属性资源中心 for (NacosPropertySource propertySource : NacosPropertySourceRepository .getAll()) { if (!propertySource.isRefreshable()) { continue; } // 获取资源ID ?String dataId = propertySource.getDataId(); // 通过组和 dataId 注册 Nacos 监听器 registerNacosListener(propertySource.getGroup(), dataId); } } }
private void registerNacosListener(final String groupKey, final String dataKey) { // 构建 Key 信息 String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); // 在 listenerMap中放入了 key 对应 AbstractSharedListener 响应的方法 Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() { @Override public void innerReceive(String dataId, String group, String configInfo) { // 刷新次数 refreshCountIncrement(); // 记录刷新历史,就是改变历史 nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // 发布刷新事件 applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); } }); // 向配置服务中添加监听器 configService.addListener(dataKey, groupKey, listener);
}
####?向配置服务中添加监听器 此时调用的是?NacosConfigService?中的?addListener?方法,但是最终执行的还是?ClientWorker?中的?addTenantListeners?方法,后面会进行分析?ClientWorker?这个类的```java@Overridepublic?void?addListener(String?dataId,?String?group,?Listener?listener)?throws?NacosException?{ //?这个?ClientWorker?worker?也是一个核心类worker.addTenantListeners(dataId,?group,?Arrays.asList(listener));}构建 CacheData 信息此时向 ClientWorker 中的 CacheData 中添加数据,之后遍历监听器添加到 CacheData 中
创建 CacheData 对象public?CacheData(ConfigFilterChainManager?configFilterChainManager,?String?name,?String?dataId,?String?group,String?tenant)?{ //?dataId?不能为空if?(null?==?dataId?||?null?==?group)?{ throw?new?IllegalArgumentException("dataId="?+?dataId?+?",?group="?+?group);}this.name?=?name;this.configFilterChainManager?=?configFilterChainManager;this.dataId?=?dataId;?//?设置dataIdthis.group?=?group;?//?设置组信息this.tenant?=?tenant;?//?设置租户listeners?=?new?CopyOnWriteArrayList<ManagerListenerWrap>();?//?装饰器集合this.isInitializing?=?true;//?加载缓存数据从本地磁盘this.content?=?loadCacheContentFromDiskLocal(name,?dataId,?group,?tenant);//?计算本地缓存信息的MD5this.md5?=?getMd5String(content);}向 CacheData 中添加数据public?void?addTenantListeners(String?dataId,?String?group,List<?extends?Listener>?listeners)throws?NacosException?{ //?DefaultGroupgroup?=?null2defaultGroup(group);String?tenant?=?agent.getTenant();?//?是?""//?向缓存数据中添加监听器CacheData?cache?=?addCacheDataIfAbsent(dataId,?group,?tenant);for?(Listener?listener?:?listeners)?{ cache.addListener(listener);}}public?CacheData?addCacheDataIfAbsent(String?dataId,?String?group,?String?tenant)throws?NacosException?{ //?获取Key信息String?key?=?GroupKey.getKeyTenant(dataId,?group,?tenant);//?在缓存?Map?中获取缓存数据CacheData?cacheData?=?cacheMap.get(key);//?如果不为空的情况下那么就返回,如果为空那么就创建一个?CacheDataif?(cacheData?!=?null)?{ return?cacheData;}//?创建一个?CacheData?cacheData?=?new?CacheData(configFilterChainManager,?agent.getName(),dataId,?group,?tenant);//?将创建好的?cacheData?放入缓存?Map?中CacheData?lastCacheData?=?cacheMap.putIfAbsent(key,?cacheData);//?如果缓存数据为空的话那么从配置中心拉取,不过此时不为空if?(lastCacheData?==?null)?{ //fix?issue?#?if?(enableRemoteSyncConfig)?{ String[]?ct?=?getServerConfig(dataId,?group,?tenant,?L);cacheData.setContent(ct[0]);}//?计算任务IDint?taskId?=?cacheMap.size()?/?(int)?ParamUtil.getPerTaskConfigSize();//?设置任务IDcacheData.setTaskId(taskId);lastCacheData?=?cacheData;}//?缓存数据初始化完成//?reset?so?that?server?not?hang?this?checklastCacheData.setInitializing(true);LOGGER.info("[{ }]?[subscribe]?{ }",?agent.getName(),?key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());//?返回最新的缓存数据return?lastCacheData;}到这里 CacheData 对象 和 cacheMap 集合已经构建完成了,后续会用到这个数据的
NacosConfigService 分析NacosConfigService这个类在创建的时候主要做了什么事情,这这里面创建了一个 ClientWorker对象,这个对象是一个核心的类,有关于配置的一些操作都是归功于 ClientWorker类
public?NacosConfigService(Properties?properties)?throws?NacosException?{ ......this.agent?=?new?MetricsHttpAgent(new?ServerHttpAgent(properties));this.agent.start();//?核心工作类this.worker?=?new?ClientWorker(this.agent,this.configFilterChainManager,?properties);}核心配置类 ClientWorker分析一下这个类都在做什么事情,都有哪些核心方法 其实能看到里面有一个构造函数、添加缓存数据、添加监听器、检查配置中心相关方法、获取服务配置、解析数据响应、移除缓存数据、删除监听器以及 shutdown方法
构造函数看到这里其实看到了定义了两个调度线程池,一个是用于配置检测的,一个是用于执行长轮询服务的
@SuppressWarnings("PMD.ThreadPoolCreationRule")public?ClientWorker(final?HttpAgent?agent,final?ConfigFilterChainManager?configFilterChainManager,?final?Properties?properties){ this.agent?=?agent;this.configFilterChainManager?=?configFilterChainManager;//?初始化操作init(properties);//?定义一个调度线程池,只有一个线程还是守护线程this.executor?=?Executors.newScheduledThreadPool(1,?new?ThreadFactory()?{ @Overridepublic?Thread?newThread(Runnable?r)?{ Thread?t?=?new?Thread(r);t.setName("com.alibaba.nacos.client.Worker."?+?agent.getName());t.setDaemon(true);return?t;}});//?定义一个多个线程的调度线程池,线程个数和CPU?核心数有关,也是守护线程,是一个长轮询this.executorService?=?Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),?new?ThreadFactory()?{ @Overridepublic?Thread?newThread(Runnable?r)?{ Thread?t?=?new?Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling."?+agent.getName());t.setDaemon(true);return?t;}});//?定义一个定时的调度任务,第一次执行的时候延时1毫秒,后续毫秒调度一次this.executor.scheduleWithFixedDelay(new?Runnable()?{ @Overridepublic?void?run()?{ try?{ //?检查配置信息方法checkConfigInfo();}?catch?(Throwable?e)?{ LOGGER.error("["?+?agent.getName()?+?"]?"+?"[sub-check]?rotate?check?error",?e);}}},?1L,?L,?TimeUnit.MILLISECONDS);}检查配置服务方法这个 cacheMap 包含了一些任务信息,这里面的任务是怎么来的呢,他是在添加监听器的时候添加的,上面已经分析过了
public?NacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?refreshHistory)?{ //?获取配置属性信息this.nacosConfigProperties?=?nacosConfigManager.getNacosConfigProperties();//?刷新历史this.nacosRefreshHistory?=?refreshHistory;//?获取配置服务this.configService?=?nacosConfigManager.getConfigService();//?是否开启刷新,是truethis.isRefreshEnabled?=?this.nacosConfigProperties.isRefreshEnabled();}0长轮询任务 LongPollingRunnableSpring事件监听机制
Spring事件体系包括三个组件:事件,事件监听器,事件广播器。
1.1 事件
1.1.1 Spring内置事件
1.1.2 自定义事件
事件类需要继承ApplicationEvent,代码如下:
这里为了简单测试,所以写的很简单。
事件类是一种很简单的pojo,除了需要继承ApplicationEvent也没什么了,这个类有一个构造方法需要super。
1.2 事件监听器
1.2.1 事件监听器-基于接口
1.2.2 事件监听器-基于注解
1.3 事件发布操作
事件发布方式很简单
然后调用方法就能看到
疑问
同样的事件能有多个监听器吗 -- 可以的
事件监听器一定要写一个类去实现吗 -- 其实是可以不需要的,spring有个注解@EventListener,修饰在方法上,稍后给出使用方法
事件监听操作和发布事件的操作是同步的吗? -- 是的,所以如果有事务,监听操作也在事务内
可以作为异步处理吗? --可以 看源码有解释。
1.4 Spring事件原理
原理:观察者模式
spring的事件监听有三个部分组成:
事件(ApplicationEvent) 负责对应相应监听器 事件源发生某事件是特定事件监听器被触发的原因。
监听器(ApplicationListener) 对应于观察者模式中的观察者。监听器监听特定事件,并在内部定义了事件发生后的响应逻辑。
事件发布器(ApplicationEventMulticaster )对应于观察者模式中的被观察者/主题, 负责通知观察者 对外提供发布事件和增删事件监听器的接口,维护事件和事件监听器之间的映射关系,并在事件发生时负责通知相关监听器。
Spring事件机制是观察者模式的一种实现,但是除了发布者和监听者者两个角色之外,还有一个EventMultiCaster的角色负责把事件转发给监听者,工作流程如下:
Spring事件机制
也就是说上面代码中发布者调用applicationEventPublisher.publishEvent(msg); 是会将事件发送给了EventMultiCaster, 而后由EventMultiCaster注册着所有的Listener,然后根据事件类型决定转发给那个Listener。
二、源码流程
2.1 源码流程
Spring在ApplicationContext接口的抽象实现类AbstractApplicationContext中完成了事件体系的搭建。
2.2 事件广播器的初始化
2.3 注册事件监听器
2.4 发布事件
2.5 Spring默认的事件广播器SimpleApplicationEventMulticaster
从代码可以看出,applicationContext.publishEvent()方法,需要同步等待各个监听器处理完之后,才返回。
也就是说,Spring提供的事件机制,默认是同步的。如果想用异步的,可以自己实现ApplicationEventMulticaster接口,并在Spring容器中注册id为applicationEventMulticaster的Bean。
Spring发布事件之后,所有注册的事件监听器,都会收到该事件,因此,事件监听器在处理事件时,需要先判断该事件是否是自己关心的。
Sping事件体系所使用的设计模式是:观察者模式。ApplicationListener是观察者接口,接口中定义了onApplicationEvent方法,该方法的作用是对ApplicationEvent事件进行处理。
三、看几个问题
3.1 怎么样可以在所有Bean创建完后做扩展代码?
第一种:使用spring内置事件ContextRefreshedEvent
第二种:通过实现SmartInitializingSingleton接口重写afterSingletonsInstantiated()方法
3.2 请介绍下Spring事件监听器的原理。
原理:观察者模式
spring的事件监听有三个部分组成:
事件(ApplicationEvent) 负责对应相应监听器 事件源发生某事件是特定事件监听器被触发的原因。
监听器(ApplicationListener) 对应于观察者模式中的观察者。监听器监听特定事件,并在内部定义了事件发生后的响应逻辑。
事件发布器(ApplicationEventMulticaster )对应于观察者模式中的被观察者/主题, 负责通知观察者 对外提供发布事件和增删事件监听器的接口,维护事件和事件监听器之间的映射关系,并在事件发生时负责通知相关监听器。
Spring事件机制是观察者模式的一种实现,但是除了发布者和监听者者两个角色之外,还有一个EventMultiCaster的角色负责把事件转发给监听者,工作流程如下:
Spring事件机制
也就是说上面代码中发布者调用applicationEventPublisher.publishEvent(msg); 是会将事件发送给了EventMultiCaster, 而后由EventMultiCaster注册着所有的Listener,然后根据事件类型决定转发给那个Listener。