1.RDD的内内存cache和persist原理
2.Apache 两个开源项目比较:Flink vs Spark
3.spark有哪些组件
4.Sparkåç | å
å管ç
5.SparkShuffle及Spark SQL图解执行流程语法
6.Spark原理详解
RDD的cache和persist原理
在Spark数据处理中,为了提升性能,存管通常会利用RDD的理源缓存功能,通过persist()或cache()方法将计算结果存储在内存或磁盘中。管理这不仅避免了重复计算,源码还支持算法迭代和快速交互式使用。内内存c 鼠标点击源码RDD的存管缓存机制具有容错性,数据丢失后会自动重新计算。理源
Spark提供了多种存储级别,管理如MEMORY_ONLY(仅内存)、源码MEMORY_AND_DISK(内存和磁盘)、内内存MEMORY_ONLY_SER(序列化内存)等,存管用户可以根据需求选择合适的理源级别,以平衡内存使用和CPU效率。管理默认情况下,源码RDD缓存级别为NONE,调用persist()后才会生效。Shuffle操作时,Spark会自动缓存一些数据以提高容错性。
选择存储级别时,需考虑计算成本和数据访问速度。例如,如果数据量大且计算代价高,选择DISK_ONLY可能更合适;对于快速访问,MEMORY_ONLY或MEMORY_ONLY_SER更为理想。此外,replication选项允许设置数据副本,提供容错,但会增加存储和计算资源消耗。
persist()函数的实现并不直接进行数据缓存,而是设置RDD的storageLevel,当读取或计算分区时,根据存储级别决定是否进行缓存。这个过程在计算RDD时触发,数据会存储在内存或磁盘,具体步骤包括判断存储级别、从内存或磁盘获取数据,或计算并保存数据。
通过unpersist()函数,用户可以手动清除缓存,Spark会自动管理LRU缓存。在SparkContext中,一旦设置的storageLevel不可修改,确保数据操作的一致性。
通过实践操作,如测试不同存储级别的缓存,可以更好地理解RDD缓存和persist的工作原理。总的来说,Spark的缓存功能是提高性能的关键手段,合理选择和管理存储级别至关重要。
Apache 两个开源项目比较:Flink vs Spark
时间久远,我对云计算与大数据已感生疏,尤其是Flink的崛起。自动驾驶平台需云计算支撑,包括机器学习、深度学习训练、淮南网站推广源码高清地图、模拟仿真模块,以及车联网。近日看到一篇Spark与Flink的比较文章,遂转发分享,以便日后重新学习该领域新知识。
Apache Flink作为新一代通用大数据处理引擎,致力于整合各类数据负载。它似乎与Apache Spark有着相似目标。两者都旨在构建一个单一平台,用于批处理、流媒体、交互式、图形处理、机器学习等。因此,Flink与Spark在理念上并无太大差异。但在实施细节上,它们却存在显著区别。
以下比较Spark与Flink的不同之处。尽管两者在某些方面存在相似之处,但也有许多不同之处。
1. 抽象
在Spark中,批处理采用RDD抽象,而流式传输使用DStream。Flink为批处理数据集提供数据集抽象,为流应用程序提供DataStream。尽管它们听起来与RDD和DStreams相似,但实际上并非如此。
以下是差异点:
在Spark中,RDD在运行时表示为Java对象。随着project Tungsten的推出,它略有变化。但在Apache Flink中,数据集被表示为一个逻辑计划。这与Spark中的Dataframe相似,因此在Flink中可以像使用优化器优化的一等公民那样使用API。然而,Spark RDD之间并不进行任何优化。
Flink的数据集类似Spark的Dataframe API,在执行前进行了优化。
在Spark 1.6中,数据集API被添加到spark中,可能最终取代RDD抽象。
在Spark中,所有不同的抽象,如DStream、Dataframe都建立在RDD抽象之上。但在Flink中,Dataset和DataStream是基于顶级通用引擎构建的两个独立抽象。尽管它们模仿了类似的API,但在DStream和RDD的情况下,无法将它们组合在一起。尽管在这方面有一些努力,但最终结果还不够明确。广东seo优化源码
无法将DataSet和DataStream组合在一起,如RDD和DStreams。
因此,尽管Flink和Spark都有类似的抽象,但它们的实现方式不同。
2. 内存管理
直到Spark 1.5,Spark使用Java堆来缓存数据。虽然项目开始时更容易,但它导致了内存不足(OOM)问题和垃圾收集(gc)暂停。因此,从1.5开始,Spark进入定制内存管理,称为project tungsten。
Flink从第一天起就开始定制内存管理。实际上,这是Spark向这个方向发展的灵感之一。不仅Flink将数据存储在它的自定义二进制布局中,它确实直接对二进制数据进行操作。在Spark中,所有数据帧操作都直接在Spark 1.5的project tungsten二进制数据上运行。
在JVM上执行自定义内存管理可以提高性能并提高资源利用率。
3. 实施语言
Spark在Scala中实现。它提供其他语言的API,如Java、Python和R。
Flink是用Java实现的。它确实提供了Scala API。
因此,与Flink相比,Spark中的选择语言更好。在Flink的一些scala API中,java抽象也是API的。这会有所改进,因为已经使scala API获得了更多用户。
4. API
Spark和Flink都模仿scala集合API。所以从表面来看,两者的API看起来非常相似。
5. 流
Apache Spark将流式处理视为快速批处理。Apache Flink将批处理视为流处理的特殊情况。这两种方法都具有令人着迷的含义。
以下是两种不同方法的差异或含义:
Apache Flink提供事件级处理,也称为实时流。它与Storm模型非常相似。
Spark只有不提供事件级粒度的最小批处理(mini-batch)。这种方法被称为近实时。
Spark流式处理是更快的批处理,Flink批处理是有限的流处理。
虽然大多数应用程序都可以近乎实时地使用,但很少有应用程序需要事件级实时处理。这些应用程序通常是Storm流而不是Spark流。对于他们来说,Flink将成为一个非常有趣的选择。
运行流处理作为更快批处理的优点之一是,我们可以在两种情况下使用相同的抽象。Spark非常支持组合批处理和流数据,领航器指标源码因为它们都使用RDD抽象。
在Flink的情况下,批处理和流式传输不共享相同的API抽象。因此,尽管有一些方法可以将基于历史文件的数据与流相结合,但它并不像Spark那样干净。
在许多应用中,这种能力非常重要。在这些应用程序中,Spark代替Flink流式传输。
由于最小批处理的性质,Spark现在对窗口的支持非常有限。允许根据处理时间窗口批量处理。
与其他任何系统相比,Flink提供了非常灵活的窗口系统。Window是Flink流API的主要焦点之一。它允许基于处理时间、数据时间和无记录等的窗口。这种灵活性使Flink流API与Spark相比非常强大。
6. SQL界面
截至目前,最活跃的Spark库之一是spark-sql。Spark提供了像Hive一样的查询语言和像DSL这样的Dataframe来查询结构化数据。它是成熟的API并且在批处理中广泛使用,并且很快将在流媒体世界中使用。
截至目前,Flink Table API仅支持DSL等数据帧,并且仍处于测试阶段。有计划添加sql接口,但不确定何时会落在框架中。
目前为止,Spark与Flink相比有着不错的SQL故事。
7. 数据源集成
Spark数据源API是框架中最好的API之一。数据源API使得所有智能资源如NoSQL数据库、镶嵌木地板、优化行列(Optimized Row Columnar,ORC)成为Spark上的头等公民。此API还提供了在源级执行谓词下推(predicate push down)等高级操作的功能。
Flink仍然在很大程度上依赖于map / reduce InputFormat来进行数据源集成。虽然它是足够好的提取数据API,但它不能巧妙地利用源能力。因此Flink目前落后于目前的数据源集成技术。
8. 迭代处理
Spark最受关注的功能之一就是能够有效地进行机器学习。在内存缓存和其他实现细节中,它是实现机器学习算法的真正强大的平台。
虽然ML算法是循环数据流,但它表示为Spark内部的直接非循环图。通常,没有分布式处理系统鼓励循环数据流,因为它们变得难以理解。
但是Flink对其他人采取了一些不同的方法。它们在运行时支持受控循环依赖图(cyclic dependence graph)。这使得它们与DAG表示相比以非常有效的方式表示ML算法。因此,Flink支持本机平台中的迭代,与DAG方法相比,图书座位编程源码可实现卓越的可扩展性和性能。
9. 流作为平台与批处理作为平台
Apache Spark来自Map / Reduce时代,它将整个计算表示为数据作为文件集合的移动。这些文件可能作为磁盘上的阵列或物理文件驻留在内存中。这具有非常好的属性,如容错等。
但是Flink是一种新型系统,它将整个计算表示为流处理,其中数据有争议地移动而没有任何障碍。这个想法与像akka-streams这样的新的反应流系统非常相似。
. 成熟
Flink像批处理这样的部分已经投入生产,但其他部分如流媒体、Table API仍在不断发展。这并不是说在生产中就没人使用Flink流。
spark有哪些组件
Spark的组件主要包括以下几个部分: 一、Spark Core(Spark核心组件) Spark Core是Spark框架的核心,它提供了Spark集群的运行环境以及任务调度、内存管理、错误检测等功能。Spark Core是整个Spark应用程序的起点和中心,负责管理和协调其他组件的工作。 二、Spark SQL(Spark SQL组件) Spark SQL是Spark用于处理结构化数据的工具,它允许用户使用SQL语言来查询和分析数据。通过Spark SQL,用户可以更方便地处理大数据集并获取结果。 三、Spark Streaming(Spark流处理组件) Spark Streaming是Spark中用于处理实时数据的组件。它可以接收来自各种源(如Kafka、Twitter等)的实时数据,并将其转换为DStream(离散数据流),然后进行处理和分析。这对于需要实时分析大数据的应用程序非常有用。 四、Spark MLlib(Spark机器学习库) Spark MLlib是Spark中用于数据分析和机器学习的库。它提供了许多常用的算法和工具,如分类、聚类、回归等。通过Spark MLlib,用户可以在Spark集群上进行大规模的数据分析和机器学习任务。 除了上述主要组件外,Spark还有其他一些辅助组件,如GraphX(用于图计算)、PySpark(Python接口的Spark)等。这些组件都是为了让用户在处理和分析大数据时更加方便和高效而设计的。通过结合使用这些组件,用户可以在Spark上构建出强大的大数据处理和分析应用程序。 总体来说,Apache Spark是一个集成了多种组件的大规模数据处理框架。这些组件协同工作,使得在分布式环境中进行高效、快速的数据处理和分析成为可能。Sparkåç | å å管ç
Sparkä½ä¸ºä¸ä¸ªåºäºå åçåå¸å¼è®¡ç®å¼æï¼å ¶å å管ç模åå¨æ´ä¸ªç³»ç»ä¸æ®æ¼çé常éè¦çè§è²ã
å¨æ§è¡Sparkçåºç¨ç¨åºæ¶ï¼Sparké群ä¼å¯å¨DriveråExecutor两ç§JVMè¿ç¨ï¼
Spark管ççå å主è¦åå为4个åºåï¼
Executorä½ä¸ºä¸ä¸ªJVMè¿ç¨ï¼å®çå å管ç建ç«å¨JVMçå å管çä¹ä¸ï¼Spark对JVMçå å ï¼On-heapï¼ç©ºé´è¿è¡äºæ´ä¸ºè¯¦ç»çåé ï¼ä»¥å åå©ç¨å åãåæ¶ï¼Sparkå¼å ¥äºå å¤ï¼Off-heapï¼å åï¼ä½¿ä¹å¯ä»¥ç´æ¥å¨å·¥ä½èç¹çç³»ç»å åä¸å¼è¾ç©ºé´ï¼è¿ä¸æ¥ä¼åäºå åç使ç¨ã
å å å åç大å°ï¼ç± Spark åºç¨ç¨åºå¯å¨æ¶ç executor-memory æ spark.executor.memory åæ°é ç½®ãExecutor å è¿è¡ç并åä»»å¡å ±äº« JVM å å å åï¼è¿äºä»»å¡å¨ç¼å RDD æ°æ®å广æï¼Broadcastï¼æ°æ®æ¶å ç¨çå å被è§å为åå¨ï¼Storageï¼å åï¼èè¿äºä»»å¡å¨æ§è¡ Shuffle æ¶å ç¨çå å被è§å为æ§è¡ï¼Executionï¼å åï¼å©ä½çé¨åä¸åç¹æ®è§åï¼é£äº Spark å é¨ç对象å®ä¾ï¼æè ç¨æ·å®ä¹ç Spark åºç¨ç¨åºä¸ç对象å®ä¾ï¼åå ç¨å©ä½ç空é´ãä¸åç管ç模å¼ä¸ï¼è¿ä¸é¨åå ç¨ç空é´å¤§å°åä¸ç¸åã
Spark 对å å å åç管çæ¯ä¸ç§é»è¾ä¸ç"è§åå¼"ç管çï¼å 为对象å®ä¾å ç¨å åçç³è¯·åéæ¾é½ç± JVM å®æï¼Spark åªè½å¨ç³è¯·ååéæ¾åè®°å½è¿äºå åï¼æ们æ¥çå ¶å ·ä½æµç¨ï¼
为äºè¿ä¸æ¥ä¼åå åç使ç¨ä»¥åæé« Shuffle æ¶æåºçæçï¼Spark å¼å ¥äºå å¤ï¼Off-heapï¼å åï¼ä½¿ä¹å¯ä»¥ç´æ¥å¨å·¥ä½èç¹çç³»ç»å åä¸å¼è¾ç©ºé´ï¼åå¨ç»è¿åºååçäºè¿å¶æ°æ®ãå©ç¨ JDK Unsafe APIï¼ä» Spark 2.0 å¼å§ï¼ï¼å¨ç®¡çå å¤çåå¨å åæ¶ä¸ååºäº Tachyonï¼èæ¯ä¸å å¤çæ§è¡å åä¸æ ·ï¼åºäº JDK Unsafe API å®ç°ï¼Spark å¯ä»¥ç´æ¥æä½ç³»ç»å å¤å åï¼åå°äºä¸å¿ è¦çå åå¼éï¼ä»¥åé¢ç¹ç GC æ«æååæ¶ï¼æåäºå¤çæ§è½ãå å¤å åå¯ä»¥è¢«ç²¾ç¡®å°ç³è¯·åéæ¾ï¼èä¸åºååçæ°æ®å ç¨ç空é´å¯ä»¥è¢«ç²¾ç¡®è®¡ç®ï¼æ以ç¸æ¯å å å åæ¥è¯´éä½äºç®¡ççé¾åº¦ï¼ä¹éä½äºè¯¯å·®ã
å¨é»è®¤æ åµä¸å å¤å å并ä¸å¯ç¨ï¼å¯éè¿é ç½® spark.memory.offHeap.enabled åæ°å¯ç¨ï¼å¹¶ç± spark.memory.offHeap.size åæ°è®¾å®å å¤ç©ºé´ç大å°ãé¤äºæ²¡æ other 空é´ï¼å å¤å åä¸å å å åçååæ¹å¼ç¸åï¼ææè¿è¡ä¸ç并åä»»å¡å ±äº«åå¨å ååæ§è¡å åã
Spark 1.6 ä¹åé»è®¤ä¸ºç»ä¸ç®¡çï¼UnifiedMemoryManagerï¼æ¹å¼ï¼1.6 ä¹åéç¨çéæ管çï¼StaticMemoryManagerï¼æ¹å¼ä»è¢«ä¿çï¼å¯éè¿é ç½® spark.memory.useLegacyMode=true åæ°å¯ç¨éæå å管çæ¹å¼ãä¸é¢æ们ä»ç»ä¸ä¸¤ç§å å管ç模åçè¿åã
å¨ Spark æåéç¨çéæå å管çæºå¶ä¸ï¼åå¨å åãæ§è¡å ååå ¶ä»å åç大å°å¨ Spark åºç¨ç¨åºè¿è¡æé´å为åºå®çï¼ä½ç¨æ·å¯ä»¥åºç¨ç¨åºå¯å¨åè¿è¡é ç½®ï¼å å å åçåé å¦ä¸æ示ï¼
Spark 1.6 ä¹åå¼å ¥çç»ä¸å å管çæºå¶ï¼ä¸éæå å管ççåºå«å¨äºåå¨å ååæ§è¡å åå ±äº«åä¸å空é´ï¼å¯ä»¥å¨æå ç¨å¯¹æ¹ç空é²åºåãå¦ä¸å¾æ示ï¼
å ¶ä¸æéè¦çä¼åå¨äºå¨æå ç¨æºå¶ï¼å ¶è§åå¦ä¸ï¼
æ°ççæ¬å¼å ¥äºæ°çé 置项ï¼
ååç»ä¸å å管çæºå¶ï¼Spark å¨ä¸å®ç¨åº¦ä¸æé«äºå å åå å¤å åèµæºçå©ç¨çï¼éä½äºå¼åè ç»´æ¤ Spark å åçé¾åº¦ï¼ä½å¹¶ä¸æå³çå¼åè å¯ä»¥é«ææ 忧ãè¬å¦ï¼æ以å¦æåå¨å åç空é´å¤ªå¤§æè 说ç¼åçæ°æ®è¿å¤ï¼åèä¼å¯¼è´é¢ç¹çå ¨éåå¾åæ¶ï¼éä½ä»»å¡æ§è¡æ¶çæ§è½ï¼å 为ç¼åç RDD æ°æ®é常é½æ¯é¿æé©»çå åçãæ以è¦æ³å ååæ¥ Spark çæ§è½ï¼éè¦å¼åè è¿ä¸æ¥äºè§£åå¨å ååæ§è¡å ååèªç管çæ¹å¼åå®ç°åçã
SparkShuffle及Spark SQL图解执行流程语法
SparkShuffle是Apache Spark中的一个核心概念,主要涉及数据分片、聚合与分发的过程。在使用reduceByKey等操作时,数据会被划分到不同的partition中,但每个key可能分布在不同的节点上。为了解决这一问题,Spark引入了Shuffle机制,主要分为两种类型:HashShuffleManager与SortShuffleManager。
HashShuffleManager在Spark 1.2之前是默认选项,它通过分区器(默认是hashPartitioner)决定数据写入的磁盘小文件。在Shuffle Write阶段,每个map task将结果写入到不同的文件中。Shuffle Read阶段,reduce task从所有map task所在的机器上寻找属于自己的文件,确保了数据的聚合。然而,这种方法会产生大量的磁盘小文件,导致频繁的磁盘I/O操作、内存对象过多、频繁的垃圾回收(GC)以及网络通信故障,从而影响性能。
SortShuffleManager在Spark 1.2引入,它改进了数据的处理流程。在Shuffle阶段,数据写入内存结构,当内存结构达到一定大小时(默认5M),内存结构会自动进行排序分区并溢写磁盘。这种方式在Shuffle阶段减少了磁盘小文件的数量,同时在Shuffle Read阶段通过解析索引文件来拉取数据,提高了数据读取的效率。
Spark内存管理分为静态内存管理和统一内存管理。静态内存管理中内存大小在应用运行期间固定,统一内存管理则允许内存空间共享,提高了资源的利用率。Spark1.6版本默认采用统一内存管理,可通过配置参数spark.memory.useLegacyMode来切换。
Shuffle优化涉及多个参数的调整。例如,`spark.shuffle.file.buffer`参数用于设置缓冲区大小,适当增加此值可以减少磁盘溢写次数。`spark.reducer.maxSizeInFlight`参数则影响数据拉取的次数,增加此值可以减少网络传输,提升性能。`spark.shuffle.io.maxRetries`参数控制重试次数,增加重试次数可以提高稳定性。
Shark是一个基于Spark的SQL执行引擎,兼容Hive语法,性能显著优于MapReduce的Hive。Shark支持交互式查询应用服务,其设计架构对Hive的依赖性强,限制了其长期发展,但提供了与Spark其他组件更好的集成性。SparkSQL则是Spark平台的SQL接口,支持查询原生的RDD和执行Hive语句,提供了Scala中写SQL的能力。
DataFrame作为Spark中的分布式数据容器,类似于传统数据库的二维表格,不仅存储数据,还包含数据结构信息(schema)。DataFrame支持嵌套数据类型,提供了一套更加用户友好的API,简化了数据处理的复杂性。通过注册为临时表,DataFrame的列默认按ASCII顺序显示。
SparkSQL的数据源丰富,包括JSON、JDBC、Parquet、HDFS等。其底层架构包括解析、分析、优化、生成物理计划以及任务执行。谓词下推(predicate Pushdown)是优化策略之一,能够提前执行条件过滤,减少数据的处理量。
创建DataFrame的方式多样,可以从JSON、非JSON格式的RDD、Parquet文件以及JDBC中的数据导入。DataFrame的转换与操作提供了灵活性和效率,支持通过反射方式转换非JSON格式的RDD,但不推荐使用。动态创建Schema是将非JSON格式的RDD转换成DataFrame的一种方法。读取Parquet文件和Hive中的数据均支持DataFrame的创建和数据的持久化存储。
总之,SparkShuffle及Spark SQL通过高效的内存管理、优化的Shuffle机制以及灵活的数据源支持,为大数据处理提供了强大而高效的能力。通过合理配置参数和优化流程,能够显著提升Spark应用程序的性能。
Spark原理详解
Spark原理详解: Spark是一个专为大规模数据处理设计的内存计算框架,其高效得益于其核心组件——弹性数据分布集RDD。RDD是Spark的数据结构,它将数据存储在分布式内存中,通过逻辑上的集中管理和物理上的分布式存储,提供了高效并行计算的能力。 RDD的五个关键特性如下:每个RDD由多个partition组成,用户可以指定分区数量,默认为CPU核心数。每个partition独立处理,便于并行计算。
Spark的计算基于partition,算子作用于partition上,无需保存中间结果,提高效率。
RDD之间有依赖性,数据丢失时仅重新计算丢失分区,避免全量重算。
对于key-value格式的RDD,有Partitioner决定分片和数据分布,优化数据处理的本地化。
Spark根据数据位置调度任务,实现“移动计算”而非数据。
Spark区分窄依赖(一对一)和宽依赖(一对多),前者不涉及shuffle,后者则会根据key进行数据切分。 Spark的执行流程包括用户提交任务、生成DAG、划分stage和task、在worker节点执行计算等步骤。创建RDD的方式多样,包括程序中的集合、本地文件、HDFS、数据库、NoSQL和数据流等。 技术栈方面,Spark与HDFS、YARN、MR、Hive等紧密集成,提供SparkCore、SparkSQL、SparkStreaming等扩展功能。 在编写Spark代码时,首先创建SparkConf和SparkContext,然后操作RDD进行转换和应用Action,最后关闭SparkContext。理解底层机制有助于优化资源使用,如HDFS文件的split与partition关系。 搭建Spark集群涉及上传、配置worker和master信息,以及启动和访问。内存管理则需注意Executor的off-heap和heap,以及Spark内存的分配和使用。Sparkå å管ç详解ï¼ä¸ï¼ââå å管ç
å¼¹æ§åå¸å¼æ°æ®éï¼RDDï¼ä½ä¸ºSparkææ ¹æ¬çæ°æ®æ½è±¡ï¼æ¯åªè¯»çååºè®°å½ï¼Partitionï¼çéåï¼åªè½åºäºå¨ç¨³å®ç©çåå¨ä¸çæ°æ®éä¸å建ï¼æè å¨å ¶ä»å·²æçRDDä¸æ§è¡è½¬æ¢ï¼Transformationï¼æä½äº§çä¸ä¸ªæ°çRDDã转æ¢åçRDDä¸åå§çRDDä¹é´äº§ççä¾èµå ³ç³»ï¼ææäºè¡ç»ï¼Lineageï¼ãååè¡ç»ï¼Sparkä¿è¯äºæ¯ä¸ä¸ªRDDé½å¯ä»¥è¢«éæ°æ¢å¤ãä½RDDçææ转æ¢é½æ¯æ°æ§çï¼å³åªæå½ä¸ä¸ªè¿åç»æç»Driverçè¡å¨ï¼Actionï¼åçæ¶ï¼Sparkæä¼å建任å¡è¯»åRDDï¼ç¶åçæ£è§¦å转æ¢çæ§è¡ãTaskå¨å¯å¨ä¹å读åä¸ä¸ªååºæ¶ï¼ä¼å å¤æè¿ä¸ªååºæ¯å¦å·²ç»è¢«æä¹ åï¼å¦æ没æåéè¦æ£æ¥Checkpointææç §è¡ç»éæ°è®¡ç®ãæ以å¦æä¸ä¸ªRDDä¸è¦æ§è¡å¤æ¬¡è¡å¨ï¼å¯ä»¥å¨ç¬¬ä¸æ¬¡è¡å¨ä¸ä½¿ç¨persistæcacheæ¹æ³ï¼å¨å åæç£çä¸æä¹ åæç¼åè¿ä¸ªRDDï¼ä»èå¨åé¢çè¡å¨æ¶æå计ç®é度ãäºå®ä¸ï¼cacheæ¹æ³æ¯ä½¿ç¨é»è®¤çMEMORY_ONLYçåå¨çº§å«å°RDDæä¹ åå°å åï¼æ ç¼åæ¯ä¸ç§ç¹æ®çæä¹ åãå å åå å¤åå¨å åç设计ï¼ä¾¿å¯ä»¥å¯¹ç¼åRDDæ¶ä½¿ç¨çå ååç»ä¸çè§åå管çï¼åå¨å åçå ¶ä»åºç¨åºæ¯ï¼å¦ç¼åbroadcastæ°æ®ï¼ææ¶ä¸å¨æ¬æç讨论èå´ä¹å ï¼ã
RDDçæä¹ åç±SparkçStorage模å [1] è´è´£ï¼å®ç°äºRDDä¸ç©çåå¨ç解è¦åãStorage模åè´è´£ç®¡çSparkå¨è®¡ç®è¿ç¨ä¸äº§ççæ°æ®ï¼å°é£äºå¨å åæç£çãå¨æ¬å°æè¿ç¨ååæ°æ®çåè½å°è£ äºèµ·æ¥ãå¨å ·ä½å®ç°æ¶Driver端åExecutor端çStorage模åææäºä¸»ä»å¼çæ¶æï¼å³Driver端çBlockManager为Masterï¼Executor端çBlockManager为SlaveãStorage模åå¨é»è¾ä¸ä»¥Block为åºæ¬åå¨åä½ï¼RDDçæ¯ä¸ªPartitionç»è¿å¤çåå¯ä¸å¯¹åºä¸ä¸ªBlockï¼BlockIdçæ ¼å¼ä¸º rdd_RDD-ID_PARTITION-ID ï¼ãMasterè´è´£æ´ä¸ªSparkåºç¨ç¨åºçBlockçå æ°æ®ä¿¡æ¯ç管çåç»´æ¤ï¼èSlaveéè¦å°Blockçæ´æ°çç¶æä¸æ¥å°Masterï¼åæ¶æ¥æ¶Masterçå½ä»¤ï¼ä¾å¦æ°å¢æå é¤ä¸ä¸ªRDDã
å¨å¯¹RDDæä¹ åæ¶ï¼Sparkè§å®äºMEMORY_ONLYãMEMORY_AND_DISKç7ç§ä¸åç åå¨çº§å« ï¼èåå¨çº§å«æ¯ä»¥ä¸5个åéçç»å [2] ï¼
éè¿å¯¹æ°æ®ç»æçåæï¼å¯ä»¥çåºåå¨çº§å«ä»ä¸ä¸ªç»´åº¦å®ä¹äºRDDçPartitionï¼åæ¶ä¹å°±æ¯Blockï¼çåå¨æ¹å¼ï¼
RDDå¨ç¼åå°åå¨å åä¹åï¼Partitionä¸çæ°æ®ä¸è¬ä»¥è¿ä»£å¨ï¼ Iterator ï¼çæ°æ®ç»ææ¥è®¿é®ï¼è¿æ¯Scalaè¯è¨ä¸ä¸ç§éåæ°æ®éåçæ¹æ³ãéè¿Iteratorå¯ä»¥è·åååºä¸æ¯ä¸æ¡åºååæè éåºååçæ°æ®é¡¹(Record)ï¼è¿äºRecordç对象å®ä¾å¨é»è¾ä¸å ç¨äºJVMå å å åçotheré¨åç空é´ï¼åä¸Partitionçä¸åRecordç空é´å¹¶ä¸è¿ç»ã
RDDå¨ç¼åå°åå¨å åä¹åï¼Partition被转æ¢æBlockï¼Recordå¨å å æå å¤åå¨å åä¸å ç¨ä¸åè¿ç»ç空é´ãå°Partitionç±ä¸è¿ç»çåå¨ç©ºé´è½¬æ¢ä¸ºè¿ç»åå¨ç©ºé´çè¿ç¨ï¼Spark称ä¹ä¸ºâå±å¼âï¼Unrollï¼ãBlockæåºåååéåºåå两ç§åå¨æ ¼å¼ï¼å ·ä½ä»¥åªç§æ¹å¼åå³äºè¯¥RDDçåå¨çº§å«ãéåºååçBlock以ä¸ç§DeserializedMemoryEntryçæ°æ®ç»æå®ä¹ï¼ç¨ä¸ä¸ªæ°ç»åå¨ææçJava对象ï¼åºååçBlockå以SerializedMemoryEntryçæ°æ®ç»æå®ä¹ï¼ç¨åèç¼å²åºï¼ByteBufferï¼æ¥åå¨äºè¿å¶æ°æ®ãæ¯ä¸ªExecutorçStorage模åç¨ä¸ä¸ªé¾å¼Mapç»æï¼LinkedHashMapï¼æ¥ç®¡çå å åå å¤åå¨å åä¸ææçBlock对象çå®ä¾ [6] ï¼å¯¹è¿ä¸ªLinkedHashMapæ°å¢åå é¤é´æ¥è®°å½äºå åçç³è¯·åéæ¾ã
å 为ä¸è½ä¿è¯åå¨ç©ºé´å¯ä»¥ä¸æ¬¡å®¹çº³Iteratorä¸çæææ°æ®ï¼å½åç计ç®ä»»å¡å¨Unrollæ¶è¦åMemoryManagerç³è¯·è¶³å¤çUnroll空é´æ¥ä¸´æ¶å ä½ï¼ç©ºé´ä¸è¶³åUnroll失败ï¼ç©ºé´è¶³å¤æ¶å¯ä»¥ç»§ç»è¿è¡ã对äºåºååçPartitionï¼å ¶æéçUnroll空é´å¯ä»¥ç´æ¥ç´¯å 计ç®ï¼ä¸æ¬¡ç³è¯·ãèéåºååçPartitionåè¦å¨éåRecordçè¿ç¨ä¸ä¾æ¬¡ç³è¯·ï¼å³æ¯è¯»åä¸æ¡Recordï¼éæ ·ä¼°ç®å ¶æéçUnroll空é´å¹¶è¿è¡ç³è¯·ï¼ç©ºé´ä¸è¶³æ¶å¯ä»¥ä¸æï¼éæ¾å·²å ç¨çUnroll空é´ãå¦ææç»Unrollæåï¼å½åPartitionæå ç¨çUnroll空é´è¢«è½¬æ¢ä¸ºæ£å¸¸çç¼åRDDçåå¨ç©ºé´ï¼å¦ä¸å¾2æ示ã
å¨ ãSparkå å管ç详解ï¼ä¸ï¼ââå ååé ã çå¾3åå¾5ä¸å¯ä»¥çå°ï¼å¨éæå å管çæ¶ï¼Sparkå¨åå¨å åä¸ä¸é¨ååäºä¸åUnroll空é´ï¼å ¶å¤§å°æ¯åºå®çï¼ç»ä¸å å管çæ¶å没æ对Unroll空é´è¿è¡ç¹å«åºåï¼å½åå¨ç©ºé´ä¸è¶³æ¯ä¼æ ¹æ®å¨æå ç¨æºå¶è¿è¡å¤çã
ç±äºåä¸ä¸ªExecutorçææç计ç®ä»»å¡å ±äº«æéçåå¨å å空é´ï¼å½ææ°çBlockéè¦ç¼åä½æ¯å©ä½ç©ºé´ä¸è¶³ä¸æ æ³å¨æå ç¨æ¶ï¼å°±è¦å¯¹LinkedHashMapä¸çæ§Blockè¿è¡æ·æ±°ï¼Eviction)ï¼è被æ·æ±°çBlockå¦æå ¶åå¨çº§å«ä¸åæ¶å å«åå¨å°ç£ççè¦æ±ï¼åè¦å¯¹å ¶è¿è¡è½çï¼Dropï¼ï¼å¦åç´æ¥å é¤è¯¥Blockã
åå¨å åçæ·æ±°è§å为ï¼
è½ççæµç¨åæ¯è¾ç®åï¼å¦æå ¶åå¨çº§å«ç¬¦å _useDisk 为trueçæ¡ä»¶ï¼åæ ¹æ®å ¶ _deserialized å¤ææ¯å¦æ¯éåºååçå½¢å¼ï¼è¥æ¯åå¯¹å ¶è¿è¡åºååï¼æåå°æ°æ®åå¨å°ç£çï¼å¨Storage模åä¸æ´æ°å ¶ä¿¡æ¯ã
Executorå è¿è¡çä»»å¡åæ ·å ±äº«æ§è¡å åï¼Sparkç¨ä¸ä¸ªHashMapç»æä¿åäºä»»å¡å°å åèè´¹çæ å°ãæ¯ä¸ªä»»å¡å¯å ç¨çæ§è¡å å大å°çèå´ä¸º 1/2N ~ 1/N ï¼å ¶ä¸N为å½åExecutorå æ£å¨è¿è¡çä»»å¡ç个æ°ãæ¯ä¸ªä»»å¡å¨å¯å¨ä¹æ¶ï¼è¦åMemoryManager请æ±ç³è¯·æå°ä¸º1/2Nçæ§è¡å åï¼å¦æä¸è½è¢«æ»¡è¶³è¦æ±å该任å¡è¢«é»å¡ï¼ç´å°æå ¶ä»ä»»å¡éæ¾äºè¶³å¤çæ§è¡å åï¼è¯¥ä»»å¡æå¯ä»¥è¢«å¤éã
æ§è¡å å主è¦ç¨æ¥åå¨ä»»å¡å¨æ§è¡Shuffleæ¶å ç¨çå åï¼Shuffleæ¯æç §ä¸å®è§å对RDDæ°æ®éæ°ååºçè¿ç¨ï¼æ们æ¥çShuffleçWriteåRead两é¶æ®µå¯¹æ§è¡å åç使ç¨ï¼
å¨ExternalSorteråAggregatorä¸ï¼Sparkä¼ä½¿ç¨ä¸ç§å«AppendOnlyMapçåå¸è¡¨å¨å å æ§è¡å åä¸åå¨æ°æ®ï¼ä½å¨Shuffleè¿ç¨ä¸æææ°æ®å¹¶ä¸è½é½ä¿åå°è¯¥åå¸è¡¨ä¸ï¼å½è¿ä¸ªåå¸è¡¨å ç¨çå åä¼è¿è¡å¨ææ§å°éæ ·ä¼°ç®ï¼å½å ¶å¤§å°ä¸å®ç¨åº¦ï¼æ æ³åä»MemoryManagerç³è¯·å°æ°çæ§è¡å åæ¶ï¼Sparkå°±ä¼å°å ¶å ¨é¨å 容åå¨å°ç£çæ件ä¸ï¼è¿ä¸ªè¿ç¨è¢«ç§°ä¸ºæº¢å(Spill)ï¼æº¢åå°ç£ççæ件æåä¼è¢«å½å¹¶(Merge)ã
Shuffle Writeé¶æ®µä¸ç¨å°çTungstenæ¯Databrickså ¬å¸æåºç对Sparkä¼åå ååCPU使ç¨ç计å [4] ï¼è§£å³äºä¸äºJVMå¨æ§è½ä¸çéå¶åå¼ç«¯ãSparkä¼æ ¹æ®Shuffleçæ åµæ¥èªå¨éæ©æ¯å¦éç¨TungstenæåºãTungstenéç¨ç页å¼å å管çæºå¶å»ºç«å¨MemoryManagerä¹ä¸ï¼å³Tungsten对æ§è¡å åç使ç¨è¿è¡äºä¸æ¥çæ½è±¡ï¼è¿æ ·å¨Shuffleè¿ç¨ä¸æ éå ³å¿æ°æ®å ·ä½åå¨å¨å å è¿æ¯å å¤ãæ¯ä¸ªå å页ç¨ä¸ä¸ªMemoryBlockæ¥å®ä¹ï¼å¹¶ç¨ Object obj å long offset è¿ä¸¤ä¸ªåéç»ä¸æ è¯ä¸ä¸ªå å页å¨ç³»ç»å åä¸çå°åãå å çMemoryBlockæ¯ä»¥longåæ°ç»çå½¢å¼åé çå åï¼å ¶ obj çå¼ä¸ºæ¯è¿ä¸ªæ°ç»ç对象å¼ç¨ï¼ offset æ¯longåæ°ç»çå¨JVMä¸çåå§å移å°åï¼ä¸¤è é å使ç¨å¯ä»¥å®ä½è¿ä¸ªæ°ç»å¨å å çç»å¯¹å°åï¼å å¤çMemoryBlockæ¯ç´æ¥ç³è¯·å°çå ååï¼å ¶ obj 为nullï¼ offset æ¯è¿ä¸ªå ååå¨ç³»ç»å åä¸çä½ç»å¯¹å°åãSparkç¨MemoryBlockå·§å¦å°å°å å åå å¤å å页ç»ä¸æ½è±¡å°è£ ï¼å¹¶ç¨é¡µè¡¨(pageTable)管çæ¯ä¸ªTaskç³è¯·å°çå å页ã
Tungsten页å¼ç®¡çä¸çææå åç¨ä½çé»è¾å°å表示ï¼ç±é¡µå·å页å å移éç»æï¼
æäºç»ä¸ç寻åæ¹å¼ï¼Sparkå¯ä»¥ç¨ä½é»è¾å°åçæéå®ä½å°å å æå å¤çå åï¼æ´ä¸ªShuffle Writeæåºçè¿ç¨åªéè¦å¯¹æéè¿è¡æåºï¼å¹¶ä¸æ éååºååï¼æ´ä¸ªè¿ç¨é常é«æï¼å¯¹äºå å访é®æçåCPU使ç¨æç带æ¥äºææ¾çæå [5] ã
Sparkçåå¨å ååæ§è¡å åæçæªç¶ä¸åç管çæ¹å¼ï¼å¯¹äºåå¨å åæ¥è¯´ï¼Sparkç¨ä¸ä¸ªLinkedHashMapæ¥éä¸ç®¡çææçBlockï¼Blockç±éè¦ç¼åçRDDçPartition转åèæï¼è对äºæ§è¡å åï¼Sparkç¨AppendOnlyMapæ¥åå¨Shuffleè¿ç¨ä¸çæ°æ®ï¼å¨Tungstenæåºä¸çè³æ½è±¡æ为页å¼å å管çï¼å¼è¾äºå ¨æ°çJVMå å管çæºå¶ã
Sparkçå å管çæ¯ä¸å¥å¤æçæºå¶ï¼ä¸Sparkççæ¬æ´æ°æ¯è¾å¿«ï¼ç¬è æ°´å¹³æéï¼é¾å æåè¿°ä¸æ¸ ãé误çå°æ¹ï¼è¥è¯»è æ好ç建议åæ´æ·±çç解ï¼è¿æä¸åèµæã
大数据面试题-Spark的内存模型
面试题来源:可回答:1)Spark内存管理的结构;2)Spark的Executor内存分布(参考“内存空间分配”)
1、堆内和堆外内存规划
作为一个JVM 进程,Executor 的内存管理建立在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:堆内内存的大小,由Spark应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划。
Spark对堆内内存的管理是一种逻辑上的”规划式”的管理。不同管理模式下,这三部分占用的空间大小各不相同。
堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
利用JDK Unsafe API(从Spark 2.0开始),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
2、内存空间分配
静态内存管理与统一内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。
统一内存管理的堆内内存结构如图所示:其中最重要的优化在于动态占用机制。统一内存管理的堆外内存结构如下图所示。
凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能。
3、存储内存管理
RDD的持久化机制
弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合。RDD的持久化由 Spark的Storage模块负责,实现了RDD与物理存储的解耦合。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver端和 Executor 端的Storage模块构成了主从式的架构。
在对RDD持久化时,Spark规定了MEMORY_ONLY、MEMORY_AND_DISK 等7种不同的存储级别,而存储级别是以下5个变量的组合。
通过对数据结构的分析,可以看出存储级别从三个维度定义了RDD的 Partition(同时也就是Block)的存储方式。
4、执行内存管理
执行内存主要用来存储任务在执行Shuffle时占用的内存。
若在map端选择普通的排序方式,会采用ExternalSorter进行外排,在内存中存储数据时主要占用堆内执行空间。
若在map端选择 Tungsten 的排序方式,则采用ShuffleExternalSorter直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
在Shuffle Write 阶段中用到的Tungsten是Databricks公司提出的对Spark优化内存和CPU使用的计划。在Shuffle过程中,Spark会根据Shuffle的情况来自动选择是否采用Tungsten排序。
Tungsten 采用的页式内存管理机制建立在MemoryManager之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个MemoryBlock来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。
spark sql源码系列 | with as 语句真的会把查询的数据存内存嘛?
在探讨 Spark SQL 中 with...as 语句是否真的会把查询的数据存入内存之前,我们需要理清几个关键点。首先,网上诸多博客常常提及 with...as 语句会将数据存放于内存中,来提升性能。那么,实际情况究竟如何呢?
让我们以 hive-sql 的视角来解答这一问题。在 hive 中,有一个名为 `hive.optimize.cte.materialize.threshold` 的参数。默认情况下,其值为 -1,代表关闭。当值大于 0 时(如设置为 2),with...as 语句生成的表将在被引用次数达到设定值后物化,从而确保 with...as 语句仅执行一次,进而提高效率。
接下来,我们通过具体测试来验证上述结论。在不调整该参数的情况下,执行计划显示 test 表被读取了两次。此时,我们将参数调整为 `set hive.optimize.cte.materialize.threshold=1`,执行计划显示了 test 表被物化的情况,表明查询结果已被缓存。
转而观察 Spark SQL 端,我们并未发现相关优化参数。Spark 对 with...as 的操作相对较少,在源码层面,通过获取元数据时所做的参数判断(如阈值与 cte 引用次数),我们可以发现 Spark 在这个逻辑上并未提供明确的优化机制,来专门针对 with...as 语句进行高效管理。
综上所述,通过与 hive-sql 的对比以及深入源码分析,我们得出了 with...as 语句在 Spark SQL 中是否把数据存入内存的结论,答案并不是绝对的。关键在于是否通过参数调整来物化结果,以及 Spark 在自身框架层面并未提供特定优化策略来针对 with...as 语句进行内存管理。因此,正确使用 with...as 语句并结合具体业务场景,灵活调整优化参数策略,是实现性能提升的关键。