【源码在线解析】【源码显示框】【anyref 源码 scala】hadoop2.6源码分析

来源:oa指标公式源码

1.yarn源码分析(四)AppMaster启动
2.yarn源码分析(二)创建Application
3.Spark Core读取ES的源码分区问题分析
4.hadoop 安装包的区别。在线等谢谢各位。

hadoop2.6源码分析

yarn源码分析(四)AppMaster启动

       在容器分配完成之后,启动容器的分析代码主要在ContainerImpl.java中进行。通过状态机转换,源码container从NEW状态向其他状态转移时,分析会调用RequestResourceTransition对象。源码RequestResourceTransition负责将所需的分析源码在线解析资源进行本地化,或者避免资源本地化。源码若需本地化,分析还需过渡到LOCALIZING状态。源码为简化理解,分析此处仅关注是源码否进行资源本地化的情况。

       为了将LAUNCH_CONTAINER事件加入事件处理队列,分析调用了sendLaunchEvent方法。源码该事件由ContainersLauncher负责处理。分析源码显示框ContainersLauncher的源码handle方法中,使用一个ExecutorService(线程池)容器Launcher。ContainerLaunch实现了Callable接口,其call方法生成并执行launch_container脚本。以MapReduce框架为例,该脚本在hadoop.tmp.dir/application name/container name目录下生成,其主要作用是启动MRAppMaster进程,即MapReduce的ApplicationMaster。

yarn源码分析(二)创建Application

       深入剖析YARN源码中的Application创建机制,核心在于通过client向ResourceManager发起请求。这一过程中,Hadoop RPC协议作为桥梁,确保了客户端与ResourceManager间通信的anyref 源码 scala高效与可靠。客户端通过调用接口ApplicationClientProtocol来执行操作。以`yarnClient.createApplication()`与`yarnClient.submitApplication(appContext)`为例,揭示了创建Application的主要流程。

       关注点集中于两个关键步骤:初始化Application及提交Application至ResourceManager。初始化通过`createApplication()`完成,此过程在`YarnClientImpl`类中实现。此方法内部调用`getNewApplication()`以获取ApplicationID,作为后续操作的基础。

       获取ApplicationID是创建过程的基石,而其实现细节则深藏于`RMClientService`中。在理解这一部分时,我们需关注`RMClientService`对于长期对象的服务化处理,以及在`YarnClientImpl`中对`submitApplication`调用的qrspeed插件源码具体实现。

       当ApplicationID获得后,便正式步入提交阶段。通过`submitApplication()`,客户端与ResourceManager间建立联系,资源分配与应用状态监控得以实现。此过程中的关键在于`rmClient.submitApplication`方法的调用,之后通过轮询`ApplicationReport`来监控提交状态,确保应用成功部署。

       深入探究`submitApplication`方法的内部逻辑,我们会发现它在`RMClientService`中调用`rmAppManager.submitApplication`,接着通过事件调度器对新建的Application进行处理。这一处理阶段主要负责保存应用信息,同时引入了YARN中的电影解说源码状态机与事件模型概念,将在后续文章中进行详尽解析。

Spark Core读取ES的分区问题分析

       撰写本文的初衷是因近期一位星球球友面试时,面试官询问了Spark分析ES数据时,生成的RDD分区数与哪些因素相关。

       初步推测,这与分片数有关,但具体关系是什么呢?以下是两种可能的关系:

       1).类似于KafkaRDD的分区与kafka topic分区数的关系,一对一。

       2).ES支持游标查询,那么是否可以对较大的ES索引分片进行拆分,形成多个RDD分区呢?

       下面,我将与大家共同探讨源码,了解具体情况。

       1.Spark Core读取ES

       ES官网提供了elasticsearch-hadoop插件,对于ES 7.x,hadoop和Spark版本的支持如下:

       在此,我使用的ES版本为7.1.1,测试用的Spark版本为2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

       a,导入整个elasticsearch-hadoop包

       b,仅导入spark模块的包

       为了方便测试,我在本机启动了一个单节点的ES实例,简单的测试代码如下:

       可以看到,Spark Core读取RDD主要有两种形式的API:

       a,esRDD。这种返回的是一个tuple2类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。

       b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。

       尽管这两种RDD的类型不同,但它们都是ScalaEsRDD类型。

       要分析Spark Core读取ES的并行度,只需分析ScalaEsRDD的getPartitions函数。

       2.源码分析

       首先,导入源码github.com/elastic/elasticsearch-hadoop这个gradle工程,可以直接导入idea,然后切换到7.x版本。

       接下来,找到ScalaEsRDD,发现getPartitions方法是在其父类中实现的,方法内容如下:

       esPartitions是一个lazy型的变量:

       这种声明的原因是什么呢?

       lazy+transient的原因大家可以思考一下。

       RestService.findPartitions方法只是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法:

       a).findSlicePartitions

       这个方法实际上是在5.x及以后的ES版本,同时配置了

       之后,才会执行。实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

       实际上,分片就是通过游标方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据读取,组装过程是通过SearchRequestBuilder.assemble方法实现的。

       这个实际上会浪费一定的性能,如果真的要将ES与Spark结合,建议合理设置分片数。

       b).findShardPartitions方法

       这个方法没有疑问,一个RDD分区对应于ES index的一个分片。

       3.总结

       以上就是Spark Core读取ES数据时,分片和RDD分区的对应关系分析。默认情况下,一个ES索引分片对应Spark RDD的一个分区。如果分片数过大,且ES版本在5.x及以上,可以配置参数

       进行拆分。

hadoop 安装包的区别。在线等谢谢各位。

       hadoop-2.6.0-src.tar.gz是源码压缩文件。可以用eclipse导入研究源码,或者Maven构建编译打包。

       hadoop-2.6.0.tar.gz是已经官方发布版压缩包,可以直接使用。不过官网下载的hadoop发布版本只适合x环境,若要x的则需要Maven重新构建。

       *.mds 是描述文件,记录压缩包的MD5,SHA1等信息。

文章所属分类:热点频道,点击进入>>