1.经验总结:分享一个Flink checkpoint失败的系统问题和解决办法
2.HDFS和Burst都是基于POC共识机制,有什么区别?
3.Alluxio 客户端源码分析
4.Hadoop最全八股
5.Hudi 基础入门篇
经验总结:分享一个Flink checkpoint失败的源码问题和解决办法
本文分享了Flink作业在执行过程中遇到的checkpoint失败问题及其解决策略。问题背景是系统Flink作业在执行过程中,多次出现checkpoint失败的源码情况,导致作业频繁重启,系统尽管重启后作业通常能恢复正常。源码大漠多开脚本源码最近,系统同事频繁遇到此问题,源码因此,系统本文将深入分析问题原因并提出解决方案。源码
我们的系统Flink测试环境配置了三个节点,其中每个节点部署了一个HDFS的源码DataNode节点,用于Flink的系统checkpoint和savepoint。日志显示有三个datanode存活,源码文件副本数量为1,系统但写文件时出现失败。通过网络搜索相关错误信息,我们尝试了在HDFS上上传和下载文件,结果均正常,这表明HDFS服务没有问题,datanode也处于正常状态。
继续排查过程中,我们注意到namenode日志中出现了一些警告信息,进而怀疑可能与块放置策略有关。按照日志提示,我们开启相应的debug开关并对配置进行了调整。通过追踪日志信息,我们发现存储空间虽有G,但写入块所需的多M空间超出了预留的存储量,导致namenode认为空间不足。
接下来,我们分析了HDFS源码,发现BlockPlacementPolicyDefault等类负责在为块选择datanode时进行检查,包括剩余空间和繁忙程度的评估。在我们的130行代码源码 核酸场景中,日志显示存储空间的预留量与实际需求不匹配,导致namenode误判datanode的空间不足。
经过深入分析,我们发现问题的根本原因在于HDFS的块大小设置不当,导致在高并发作业时,短时间内预留了大量存储空间。Flink的checkpoint机制在多个任务线程中频繁写入HDFS,特别是在大量小文件的场景下,短时间内产生的大量小文件(每个文件只有几K大小)导致了datanode的存储空间被大量预占,从而出现空间不足的问题。
为了解决此问题,我们提出了一套配置策略。首先,明确指出块大小不是集群属性,而是文件属性,可以通过客户端配置进行调整。在conf/flink-conf.yaml文件中,我们配置了一个HDFS配置文件路径,与Flink配置文件路径相一致。此外,我们编写了一个hdfs-site.xml文件,其中包含了blockSize的配置,例如设置为1M。配置块大小时,需要根据作业状态文件大小灵活调整,以适应不同的作业需求。
通过上述配置调整,我们成功解决了Flink checkpoint失败的问题,并将其同步至集群自动化部署脚本中,部署时会专门添加blockSize的配置。尽管Flink依赖HDFS的checkpoint方案在轻量级流计算场景中显得较为复杂,但通过优化HDFS的块大小配置,我们有效地避免了空间预占问题,确保了Flink作业的三维传奇源码稳定执行。未来,我们期望探索使用其他存储方案,如Elasticsearch,作为checkpoint的分布式存储选项,以进一步优化Flink作业的性能和稳定性。
HDFS和Burst都是基于POC共识机制,有什么区别?
据我了解,POC(容量证明)共识机制是Burst团队在年研发的,其共识优势是设备成本低、利用空闲资源,实现了人人皆可锻造的可能性。Burst将项目源代码进行共享。虽然由于经济模型和奖机制的问题,Burst项目并没有获得传统意义上的成功,但其源代码的分享为后来者提供许多启发。
而HDFS则是基于Burst开发的POC共识机制,对POC进行了优化和升级,弥补了POC存在的一些不足之处。至于详细优化了哪些,你可以去看看HDFS的白皮书。
Alluxio 客户端源码分析
Alluxio是一个用于云分析和人工智能的开源数据编排技术,作为分布式文件系统,采用与HDFS相似的主从架构。系统中包含一个或多个Master节点存储集群元数据信息,以及Worker节点管理缓存的数据块。本文将深入分析Alluxio客户端的实现。
创建客户端逻辑在类alluxio.client.file.FileSystem中,简单示例代码如下。
客户端初始化包括调用FileSystem.Context.create创建客户端对象的上下文,在此过程中需要初始化客户端以创建与Master和Worker连接的连接池。若启用了配置alluxio.user.metrics.collection.enabled,将启动后台守护线程定时与Master节点进行心跳传输监控指标信息。同时,客户端初始化时还会创建负责重新初始化的h5体彩源码后台线程,定期从Master拉取配置文件的哈希值,若Master节点配置发生变化,则重新初始化客户端,期间阻塞所有请求直到重新初始化完成。
创建具有缓存功能的客户端在客户端初始化后,调用FileSystem.Factory.create进行客户端创建。客户端实现分为BaseFileSystem、MetadataCachingBaseFileSystem和LocalCacheFileSystem三种,其中MetadataCachingBaseFileSystem和LocalCacheFileSystem对BaseFileSystem进行封装,提供元数据和数据缓存功能。BaseFileSystem的调用主要分为三大类:纯元数据操作、读取文件操作和写入文件操作。针对元数据操作,直接调用对应GRPC接口(例如listStatus)。接下来,将介绍客户端如何与Master节点进行通信以及读取和写入的流程。
客户端需要先通过MasterInquireClient接口获取主节点地址,当前有三种实现:PollingMasterInquireClient、SingleMasterInquireClient和ZkMasterInquireClient。其中,PollingMasterInquireClient是针对嵌入式日志模式下选择主节点的实现类,SingleMasterInquireClient用于选择单节点Master节点,ZkMasterInquireClient用于Zookeeper模式下的主节点选择。因为Alluxio中只有主节点启动GRPC服务,其他节点连接客户端会断开,PollingMasterInquireClient会依次轮询所有主节点,直到找到可以连接的节点。之后,客户端记录该主节点,如果无法连接主节点,则重新调用PollingMasterInquireClient过程以连接新的主节点。
数据读取流程始于BaseFileSystem.openFile函数,首先通过getStatus向Master节点获取文件元数据,然后检查文件是山楂串 小游戏源码否为目录或未写入完成等条件,若出现异常则抛出异常。寻找合适的Worker节点根据getStatus获取的文件信息中包含所有块的信息,通过偏移量计算当前所需读取的块编号,并寻找最接近客户端并持有该块的Worker节点,从该节点读取数据。判断最接近客户端的Worker逻辑位于BlockLocationUtils.nearest,考虑使用domain socket进行短路读取时的Worker节点地址一致性。根据配置项alluxio.worker.data.server.domain.socket.address,判断每个Worker使用的domain socket路径是否一致。如果没有使用域名socket信息寻找到最近的Worker节点,则根据配置项alluxio.user.ufs.block.read.location.policy选择一个Worker节点进行读取。若客户端和数据块在同一节点上,则通过短路读取直接从本地文件系统读取数据,否则通过与Worker节点建立GRPC通信读取文件。
如果无法通过短路读取数据,客户端会回退到使用GRPC连接与选中的Worker节点通信。首先判断是否可以通过domain socket连接Worker节点,优先选择使用domain socket方式。创建基于GRPC的块输入流代码位于BlockInStream.createGrpcBlockInStream。通过GRPC进行连接时,每次读取一个chunk大小并缓存chunk,减少RPC调用次数提高性能,chunk大小由配置alluxio.user.network.reader.chunk.size.bytes决定。
读取数据块完成后或出现异常终止,Worker节点会自动释放针对该块的写入锁。读取异常处理策略是记录失败的Worker节点,尝试从其他Worker节点读取,直到达到重试次数上限或没有可用的Worker节点。
若无法通过本地Worker节点读取数据,则客户端尝试发起异步缓存请求。若启用了配置alluxio.user.file.passive.cache.enabled且存在本地Worker节点,则向本地Worker节点发起异步缓存请求,否则向负责读取该块数据的Worker节点发起请求。
数据写入流程首先向Master节点发送CreateFile请求,Master验证请求合法性并返回新文件的基本信息。根据不同的写入类型,进行不同操作。如果是THROUGH或CACHE_THROUGH等需要直接写入底层文件系统的写入类型,则选择一个Worker节点处理写入到UFS的数据。对于MUST_CACHE、CACHE_THROUGH、ASYNC_THROUGH等需要缓存数据到Worker节点上的写入类型,则打开另一个流负责将每个写入的块缓存到不同的Worker上。写入worker缓存块流程类似于读取流程,若写入的Worker与客户端在同一个主机上,则使用短路写直接将块数据写入Worker本地,无需通过网络发送到Worker上。数据完成写入后,客户端向Master节点发送completeFile请求,表示文件已写入完成。
写入失败时,取消当前流以及所有使用过的输出流,删除所有缓存的块和底层存储中的数据,与读取流程不同,写入失败后不进行重试。
零拷贝实现用于优化写入和读取流程中WriteRequest和ReadResponse消息体积大的问题,通过配置alluxio.user.streaming.zerocopy.enabled开启零拷贝特性。Alluxio通过实现了GRPC的MethodDescriptor.Marshaller和Drainable接口来实现GRPC零拷贝特性。MethodDescriptor.Marshaller负责对消息序列化和反序列化的抽象,用于自定义消息序列化和反序列化行为。Drainable扩展java.io.InputStream,提供将所有内容转移到OutputStream的方法,避免数据拷贝,优化内容直接写入OutputStream的过程。
总结,阅读客户端代码有助于了解Alluxio体系结构,明白读取和写入数据时的数据流向。深入理解Alluxio客户端实现对于后续阅读其他Alluxio代码非常有帮助。
Hadoop最全八股
分布式系统基础架构,主要解决海量数据存储与分析计算问题。
Hadoop特点:1.x版本MapReduce功能与资源调度耦合性较高,2.x版本引入Yarn,专责资源调度。
Hadoop运行模式包括:HDFS客户端、NameNode(Master)、DataNode(Slave)和Secondary NameNode(备NN)。
Block概念:磁盘读写最小单位,文件系统块为磁盘块整数倍,HDFS同样采用此概念,分解文件为块存储。
HDFS组件详解:包括HDFS客户端、NameNode、DataNode和Secondary NameNode。
HDFS的Block块大小默认在2.7.2版本前为M,版本2.7.3及以上调整为M。
HDFS写流程:文件传输至NameNode,分配Block,DataNode存储Block。
HDFS读流程:从DataNode读取Block,组装成文件。
DN节点数据完整性:通过Secondary NameNode监控和备份。
HDFS HA实现:集群同时运行两个NN,实时同步,故障切换。
HDFS数据一致性:JN节点确保数据同步,避免脑裂。
MapReduce区域:分布式运算框架,整合用户代码和默认组件,实现并行计算。
MapReduce优缺点:高效并行处理数据,但复杂度高,资源管理复杂。
MapReduce进程:包括InputFormat数据输入、切片与并行度机制、Job提交流程、源码详解、切片机制、FileInputFormat、CombineTextInputFormat。
MapReduce工作流程:数据切片、Map处理、Shuffle、Reduce处理、数据输出。
Shuffle机制:数据从MapTask传输至ReduceTask,包括分区、排序、合并。
OutputFormat数据输出:默认格式TextOutputFormat,实现MapReduce输出。
MapTask与ReduceTask:MapTask执行Map阶段,ReduceTask执行Reduce阶段。
MapReduce数据倾斜:数据分布不均,影响计算效率,解决方案包括数据均衡、调整切片策略等。
Yarn区域:资源调度平台,为运算程序提供运算资源。
Yarn组件:包含ResourceManager(RM)、NodeManager(NM)、ApplicationMaster和Container模块。
Yarn工作机制:调度资源,运行MapReduce等运算程序。
Yarn调度器:FIFO、容量(Capacity Scheduler)、公平(Fair Scheduler),默认设置。
Yarn生产环境核心参数:监控与日志聚合,确保系统高效稳定运行。
总结:Hadoop与Yarn是大数据处理的核心技术,涉及分布式存储、计算、资源调度等关键环节,通过优化配置与策略,可实现高效、稳定的数据处理能力。
Hudi 基础入门篇
为了深入理解Hudi这一湖仓一体的流式数据湖平台,本文将提供一个基础入门的步骤指南,从环境准备到编译与测试,再到实际操作。
在开始之前,首先需要准备一个大数据环境。第一步是安装Maven,这是构建和管理Hudi项目的关键工具。在CentOS 7.7版本的位操作系统上,通过下载并解压Maven软件包,然后配置系统环境变量,即可完成Maven的安装。确保使用的Maven版本为3.5.4,仓库目录命名为m2。
接下来,需要下载Hudi的源码包。通过访问Apache软件归档目录并使用wget命令下载Hudi 0.8版本的源码包。下载完成后,按照源码包的说明进行编译。
在编译过程中,将需要添加Maven镜像以确保所有依赖能够正确获取。完成编译后,进入$HUDI_HOME/hudi-cli目录并执行hudi-cli脚本。如果此脚本能够正常运行,说明编译成功。
为了构建一个完整的数据湖环境,需要安装HDFS。从解压软件包开始,配置环境变量,设置bin和sbin目录下的脚本与etc/hadoop下的配置文件。确保正确配置HADOOP_*环境变量,以确保Hadoop的各个组件可以正常运行。
下一步,需要配置hadoop-env.sh文件,以及核心配置文件core-site.xml和HDFS配置文件hdfs-site.xml。这些配置文件中包含了Hadoop Common模块的公共属性、HDFS分布式文件系统相关的属性,以及集群的节点列表。通过执行格式化HDFS和启动HDFS集群的命令,可以确保HDFS服务正常运行。
总结而言,Hudi被广泛应用于国内的大公司中,用于构建数据湖并整合大数据仓库,形成湖仓一体化的平台。这使得数据处理更加高效和灵活。
为了更好地学习Hudi,推荐基于0.9.0版本的资料,从数据湖的概念出发,深入理解如何集成Spark和Flink,并通过实际需求案例来掌握Hudi的使用。这些资料将引导用户从基础到深入,逐步掌握Hudi的核心功能和应用场景。