1.golang中使用kafka的综合指南
2.9、NIFI综合应用场景-通过NIFI配置kafka的数据同步
3.kafkaç®ä»
4.sarama 源码解析--Kafka的重平衡
5.kafka原理 消费者偏移量__consumer_offsets_相关解析
golang中使用kafka的综合指南
kafka是一个被广泛使用的分布式、可扩展、高性能、可靠的流处理平台。在处理kafka数据时,答题活动助手源码确保处理效率和可靠性需要采取多种最佳实践,本文将通过介绍这些实践以及使用sarama库实现,来提高kafka消费的效率。
选择合适的提交策略是关键步骤之一。自动提交是sarama默认选择,它会定期提交已成功消费的消息偏移量,确保消费者在重新启动或消费失败时可以从断点继续。手动提交则允许用户更灵活地控制何时提交消息偏移量,提供更高的控制度。
减少kafka的传输次数可以优化数据读取与写入。批量发送消息的效果优于逐个发送,较大的批次能提高kafka数据发送效率。同时,长轮询策略减少无数据时的请求次数,进一步降低传输频率。冰蝎3.0源码
消费者组的使用是另一个关键点。它允许在多个消费者之间分配消息,提供横向扩展能力。在sarama中,通过消费者组接口的三个方法:Setup、Cleanup、ConsumeClaim来实现消费组的创建、清理和消费分区任务。确保处理rebalance事件,可以避免消息处理的混乱和重复。
调整消费者缓冲区大小可以优化内存使用和消息处理速度。增加缓冲区大小可以提高吞吐量,但同时也会消耗更多内存。正确设置缓冲区大小有助于平衡性能与资源使用。
处理rebalance事件是维护消费者组稳定性的关键。在sarama中,正确处理Setup和Cleanup函数,确保在消费者组发生变化时,应用程序能够正常处理消费者离开或加入,避免消息处理的公式源码涨停跌停混乱。
监控消费者状态对确保系统健康和性能至关重要。虽然Golang没有内置对Kafka监控的支持,但可以利用外部库和工具追踪延迟、处理时间和错误率。设置警报可以及时发现和处理问题,确保应用健壮、可靠且高效。
遵循上述实践,结合sarama库,可以在使用kafka时提高处理效率和可靠性,确保应用程序在分布式环境中稳定运行。
9、NIFI综合应用场景-通过NIFI配置kafka的数据同步
本文旨在介绍nifi与kafka的交互过程,即生产数据到kafka中,然后通过nifi消费kafka中的数据。
本文前提是nifi、kafka环境正常。
本文分为三个部分,即处理器说明、生产数据到kafka中以及消费kafka中的易语言BmpOperate源码数据。
一、处理器说明
1、处理器说明
1.1、PublishKafka_0_
描述:使用Kafka 0..x Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的定界符(例如换行符)进行定界。用于获取消息的辅助NiFi处理器是ConsumeKafka_0_。
2、属性配置
2.1、ConsumeKafka_0_
描述:消耗来自专门针对Kafka 0..x Consumer API构建的Apache Kafka的消息。用于发送消息的辅助NiFi处理器是PublishKafka_0_。
在下面的列表中,列出属性及其默认值,以及属性是否支持NiFi表达式语言。
二、Producer生产
1、创建并配置处理器GenerateFlowFile
创建处理器组kafka,进入组后创建GenerateFlowFile处理器。每1秒生产一次数据。
- 文件大小b - 每次生成个相同文件 - 每次生成的lumnytool画质助手源码流文件内容唯一
2、创建并配置处理器PublishKafka_0_
创建处理器组kafka,进入组后创建PublishKafka_0_处理器。
- Brokers设置为...:,...:,...:。图为示例。
- topic设置为nifi-topic,如果topic不存在,会自动创建
- Delivery Guarantee,对应kafka的acks机制,选择最为安全的Guarantee Replicated Delivery,相当于acks=all
3、配置GenerateFlowFile和PublishKafka_0_连接
连接GenerateFlowFile和PublishKafka_0_
4、负载均衡并发
5、验证
启动并查看监听kafka消费数据,也可以通过 server1:/topic/meta...工具查看生产的数据 在kafka所在服务器执行监听命令:
三、Consumer消费
1、创建并配置ConsumeKafka_0_处理器并连接
2、验证
启动生产者、消费者,验证nifi是否将数据写入kafka、并且kafka的数据是否被消费。以下为模板界面。
以上完成了nifi读取kafka中的数据(消费)。类似的也可以通过nifi将数据写入到nifi中,此处不再赘述。
kafkaç®ä»
ä¸ãkafkaå®ä¹
äºãkafkaçä¼å¿
ä¸ãkafkaçåç
åãkafkaèµ·æº
ä¸ãKafkaæ¯æåç±Linkedinå ¬å¸å¼åï¼æ¯ä¸ä¸ªåå¸å¼ãæ¯æååºçï¼partitionï¼ãå¤å¯æ¬çï¼replicaï¼ï¼åºäºzookeeperåè°çåå¸å¼æ¶æ¯ç³»ç»ï¼å®çæ大çç¹æ§å°±æ¯å¯ä»¥å®æ¶çå¤ç大éæ°æ®ä»¥æ»¡è¶³åç§éæ±åºæ¯ï¼æ¯å¦åºäºhadoopçæ¹å¤çç³»ç»ãä½å»¶è¿çå®æ¶ç³»ç»ãstorm/Sparkæµå¼å¤çå¼æï¼web/nginxæ¥å¿ã访é®æ¥å¿ï¼æ¶æ¯æå¡ççï¼ç¨scalaè¯è¨ç¼åï¼Linkedinäºå¹´è´¡ç®ç»äºApacheåºéä¼å¹¶æ为顶级å¼æºé¡¹ç®ã
äºãkafkaçä¼å¿
é«ååéãä½å»¶è¿ï¼kafkaç¾å¦ä¹å¤æ¯å¯ä»¥å¤çå åä¸æ¡ä¿¡æ¯ï¼å®ç延è¿æä½åªæå 毫ç§ï¼æ¯ä¸ªtopicå¯ä»¥åå¤ä¸ªpartitionï¼consumer
group对partitionè¿è¡consumeæä½ã
å¯æ©å±æ§ï¼kafkaé群æ¯æçæ©å±
æä¹ åãå¯é æ§ï¼æ¶æ¯è¢«æä¹ åå°æ¬å°ç£çï¼å¹¶ä¸æ¯ææ°æ®å¤ä»½é²æ¢æ°æ®ä¸¢å¤±
容éæ§ï¼å 许é群ä¸èç¹å¤±è´¥ï¼è¥å¯æ¬æ°é为nï¼åå 许n-1个èç¹å¤±è´¥ï¼
é«å¹¶åï¼æ¯ææ°å个客æ·ç«¯åæ¶è¯»å
ä¸ãkafkaçåç
kafkaæ¯å¦ä½å®ç°ä»¥ä¸æè¿°è¿å ç¹ï¼æ们éä¸è¯´æï¼
1.é«ååéãä½å»¶è¿
kafkaå¨è®¾è®¡ä¹åå°±æ¯ä¸ºäºé对大æ°æ®éçä¼ è¾å¤çï¼é«ååéãä½å»¶è¿æ主è¦ççå°±æ¯åä½æ¶é´å æè½è¯»åçæ°æ®æ»éï¼æ们å æ¥çç产端ã
kafkaéåäºä¸å®éçæ¹å¤çæºå¶ï¼å³å½ç产æ°æ®è¾¾å°ä¸å®æ°éæè è¾¾å°æ¶é´çªå£åï¼å°ææ¶éå°çæ°æ®ä¸æ¹æ¬¡çæ交å°æå¡å¨ï¼æ们å设å¤çä¸æ¬¡æ°æ®çæ¶é´ä¸º1msï¼é£æ¯ç§éè½å¤çæ¡ï¼å»¶æ¶ä¸º1msï¼å¦ææ¤æ¶å°å¤çé´éåæ9msï¼å³æ¯mså¤çä¸æ¹æ°æ®ï¼å设è¿æ®µæ¶é´æ¥æ¶å°æ¡å¤çï¼é£æ¯ç§åè½å¤çæ¡ï¼ä½æ¯å»¶æ¶åæäºmsã为äºè·å¾æ大çååéï¼éè¦çºç²ä¸å®ç延è¿ï¼ä½æ¯è¿æ ·ççºç²æ¯å¼å¾çãå½ç¡®å®äºè¿ç§å°æ¹éæ¹å¼ä¹åï¼é«éçåååå³äºkafkaèªèº«åç£ççé度äºãèç±äºkafkaæ¬èº«å¯¹æ°æ®ä¸åä»»ä½çå¤çï¼åªç®¡åå ¥æ°æ®ï¼ä¿ç®¡æ°æ®ï¼ååæ°æ®ï¼å æ¤ä¼æ¯ä¸ç§æ¹é顺åºåå ¥æ°æ®çæ åµï¼èç£çç读åé度大éæ¶èå¨å¯»åä¸ï¼ä¹å°±æ¯éæºè¯»åï¼ä½æ¯å¯¹äºé¡ºåºåå ¥çé度æ¯é常快çï¼çè³è½åª²ç¾å åçéæºåå ¥é度ãæ人åè¿ä¸ä¸ªå¯¹æ¯ï¼æ®éç£ç顺åºåå ¥æ¯ç§è½è¾¾å°.2M/sï¼SSDç顺åºåå ¥é度为.2M/sï¼å åç顺åºåå ¥é度为.2M/sãkafkaæ£æ¯å©ç¨äºè¿ä¸ªç¹æ§ï¼é¡ºåºåå ¥ï¼é度ç¸å¯¹è¾å¿«ãèkafkaæ¬èº«è½ç¶ä¹æ¯åå ¥ç£çæä¹ åæ°æ®ï¼ä½å®é ä¸kafkaæ¯å°æ°æ®é¡ºåºåå ¥é¡µç¼åä¸ï¼page cacheï¼ï¼ç¶åç±æä½ç³»ç»èªè¡å³å®ä½æ¶åå°ç£çä¸ï¼å æ¤kafkaçåæä½è½å¨æ¯ç§è½»è½»æ¾æ¾è¾¾å°åå ¥æ°åä¸æ¡è®°å½ã并ä¸åºäºkafkaçå¨ææ©å±ï¼è¿ä¸ªæ°åè¿è½ä¸æå¢å¤§ã
kafkaå¨æ¶è´¹ç«¯ä¹æçé«ååéï¼ç±äºkafkaæ¯å°æ°æ®åå ¥å°é¡µç¼åä¸ï¼åæ¶ç±äºè¯»åç¸é´çé´é并ä¸å¤§ï¼å¾å¤§å¯è½æ§ä¼å¨ç¼åä¸å½ä¸ï¼ä»èä¿è¯é«ååéãå¦å¤kafkaç±äºæ¬èº«ä¸å¯¹æ°æ®åä»»ä½çä¿®æ¹ï¼å®å ¨ä½¿ç¨é¶æ·è´ææ¯ï¼å¤§å¤§æåæ°æ®ç读åè½åã
2.kafkaæ¯ä¸ªèç¹å«åbrokerï¼èæ¯ä¸ä¸ªbrokeré½æ¯ç¬ç«è¿è¡çï¼å¯ä»¥éæ¶å å ¥kafkaé群ï¼é群çå¿è·³ç®¡çæ¯ç±zookeeperè´è´£ï¼æ°å å ¥çbrokeråªè¦broker idä¸ä¸åæçå²çªå°±è½é¡ºå©çå å ¥é群ä¸ï¼å®ç°å¨ææ©å±ã
3.kafkaçæä¹ åå¨ä¸é¢å·²ç»æå°ï¼kafkaç»è¿äºjavaçå å¤çæ°æ®ï¼ç´æ¥å°æ°æ®åå ¥é¡µç¼åï¼ç¶åç±æä½ç³»ç»æ¥ç®¡ç页ç¼ååå ¥ç£çï¼å®ç°æä¹ åãkafkaæ¯ä¸ä¸ªä¸»é¢topicæ¯ä¸ä¸ªä¸å¡æ°æ®ï¼ä»å¯ç±å¤ä¸ªpartitionç»æï¼èæ¯ä¸ªpartitionå¯ä»¥æå¤ä¸ªreplicaå¯æ¬ï¼ç¨äºä¿è¯æ°æ®çå¯é æ§ãreplicaå为两个è§è²ï¼ä¸ä¸ªæ¯leaderï¼ä¸ä¸ªæ¯è¿½éè ï¼åä¸æ¶é´ï¼æ¯ä¸ä¸ªpartitionåªè½æä¸ä¸ªleaderï¼å ¶ä»é½æ¯è¿½é®éè ï¼laederè´è´£æ¥æ¶æ°æ®å¹¶åå ¥logï¼è追éè ä¸è½è¢«ç¨æ·åå ¥æ°æ®ï¼åªæ¯ä»leaderè§è²çreplicaå¯æ¬ä¸åæ¥logåå ¥èªå·±çlogï¼ä¿ææ°æ®åæ¥ãkafkaä¸æä¸ä¸ªæ¦å¿µï¼ISRï¼å ¨ç§°æ¯in-sync
replicaï¼å³ææå¯ç¨çreplicaå¯æ¬ï¼è¿éçISRæ°éåªè¦å¤§äº1ï¼è¿ä¸ªpartitionå°±è½æ£å¸¸è¿ä½ï¼å æ¤å®¹éæ§é常好ï¼å设n个replicaï¼é£æå¤å¯ä»¥ån-1个replicaçæ åµä¸ï¼è¿è½ä¿æç³»ç»æ£å¸¸è¿è¡ãå½replicaè¿æ»å°ä¸å®æ¶é´åï¼ä¼è¢«kafkaä»ISRä¸åé¤ï¼å½å次åæ¥åï¼å¯ä»¥å次å å ¥ISRï¼å¦æè¿æ¶åleaderåºç°é®é¢ï¼ä¼ä»ISRä¸éæ°é举ä¸ä¸ªleaderï¼åå çleaderå次åæ¥æååä¼éæ°å å ¥ISRï¼æ为ä¸ä¸ªflowerã
4.ä¸é¢æå°äºkafkaçISRæºå¶ï¼kafkaç容éæ§å°±æ¯ç±ISRçæºå¶æ¥ä¿è¯çã
5.kafkaé群å¯ä»¥å¨ææ©å±brokerï¼å¤ä¸ªpartitionåæ¶åå ¥æ¶è´¹æ°æ®ï¼å®ç°çæ£çé«å¹¶åã
åãkafkaçèµ·æº
kafkaèµ·æºäºLinkedInå ¬å¸ï¼å½æ¶é¢è±å ¬å¸éè¦æ¶é两大类æ°æ®ï¼ä¸æ¯ä¸å¡ç³»ç»ååºç¨ç¨åºçæ§è½çæ§ææ æ°æ®ï¼èæ¯ç¨æ·çæä½è¡ä¸ºæ°æ®ãå½æ¶ä¸ºäºæ¶éè¿ä¸¤ç±»æ°æ®ï¼é¢è±èªç äºä¸¤å¥ç¸åºçæ°æ®æ¶éç³»ç»ï¼ä½æ¯è¿ä¸¤å¥ç³»ç»é½åå¨ä¸äºå¼ç«¯ï¼æ æ³å®ç°å®æ¶äº¤äºãå®æ¶æ§å·®ãç»´æ¤ææ¬é«ãå æ¤é¢è±çå·¥ç¨å¸å¸ææ¾å°ä¸ä¸ªç»ä¸çç»ä»¶æ¥æ¶éååæ¶è´¹è¿äºå¤§æ¹éçæ°æ®ï¼ActiveMQç±äºæ©å±æ§ä¸è¶³ï¼ä¸è½æ¯æ大æ°æ®éè被æå¼ï¼ä»èå³å®èªç ä¸å¥æ»¡è¶³éæ±çç³»ç»ç»ä»¶ï¼ä¹å°±æ¯kafkaã
kafkaç设计ä¹å主è¦æä¸ä¸ªç®æ ï¼
1.为ç产è åæ¶è´¹è æä¾ä¸å¥ç®åçAPI
2.éä½ç½ç»ä¼ è¾åç£çåå¨å¼é
3.å ·æé«ä¼¸ç¼©æ§æ¶æ
ç®åkafkaå¯ä»¥ç®æ¯è¶ é¢å®æäºç®æ ã
kafkaçå称ç±æ¥ä¹å¾æææï¼å 为kafkaç³»ç»çåæä½æ§è½ç¹å«å¼ºï¼å æ¤æ³ä½¿ç¨ä¸ä¸ªä½å®¶çååæ¥å½åkafkaï¼èJay Krepsï¼kafkaçä¸ä½ä½è ä¹ä¸ï¼å¨ä¸å¤§å¦çæ¶åå¾å欢Franz Kafkaï¼å æ¤èµ·æ¥è¿æ ·ä¸ä¸ªååã
kafkaå¨å¹´å¼æºï¼å¹´7ææ£å¼è¿å ¥Apacheè¿è¡åµåï¼å¹´æ顺å©æ¯ä¸ï¼åæ为Apacheç顶级项ç®ã
sarama 源码解析--Kafka的重平衡
重平衡操作
重平衡是动态调整Consumer Group下的Consumer订阅Topic的分区的一个关键操作。Sarama中的BalanceStrategyRange和BalanceStrategySticky策略具体实施这一操作。
重平衡触发条件之一是成员数变更。这一过程包括以下步骤:
1. 启动一个新的消费者实例。
2. 调用Consume方法。
3. Consume方法初始化连接信息,并启动一个goroutine。程序会阻塞在sess.ctx.Done()上。
4. 在newSession方法中找到协调者信息,并发起join请求和syncgroup请求。Consumer Leader执行一次重平衡。
5. 创建consumer group session,并初始化offset manager和开启心跳goroutine。
6. 当心跳超时或收到coordinator的重平衡通知时,调用cancel()方法取消操作,退出Consume逻辑。
7. 此时,Consume函数优雅退出。由于外层循环的存在,会重新执行Consume,实现一次重平衡。
另一个触发重平衡的条件是订阅主题分区数发生变更。这一过程如下:
1. 在Consume方法中开启心跳goroutine,并将consumer group session传递给它。
2. 分区数发生变化时,调用sess.cancel(),Consume优雅退出并重新执行,实现重平衡。
kafka原理 消费者偏移量__consumer_offsets_相关解析
知流Stream是滴滴开源的Kafka运维管控平台,对于参与开发有兴趣但担心能力的同学,可以联系我,我将作为你的导师带你一起参与开源项目。
Kafka的日志文件中存有以`__consumer_offsets_`为前缀的文件夹,总共有个。新版Kafka推荐将消费者的位移信息保存在内部的`__consumer_offsets`topic中,此topic默认提供了`kafka_consumer_groups.sh`脚本供用户查看消费者信息。
`__consumer_offsets`类似于普通的topic,其主要功能是保存消费者的位移信息。每一项消息采用KV格式,其结构包括group.id、topic、分区号作为键,位移值作为值。
在高并发的消费环境中,多个消费者或消费组同时提交位移信息会增加`__consumer_offsets`的写入负载。因此,Kafka默认为该topic创建个分区,并通过`Math.abs(groupID.hashCode()) % numPartitions`的哈希求模运算将负载均匀分布在不同的分区上。
通常,当集群中首次有消费者消费消息时,会自动创建`__consumer_offsets`topic,其副本因子默认为3,分区数默认为。消费者可以使用消费者命令指定消费组、topic和partition进行消费。
为了演示,我们打开一个session,执行消费者命令,指定消费组`szz1-group`,topic`szz1-test-topic`,然后执行生产消息命令,发送几条消息,可以看到session中的消费者消费了这些消息。通过查看指定消费组的消费位置offset,可以了解到每个partition对应的消费者id。因为只开一个消费者,它同时消费了3个partition。CURRENT-OFFSET表示当前消费组消费到的偏移量,LOG-END-OFFSET表示日志最后的偏移量。当CURRENT-OFFSET等于LOG-END-OFFSET时,说明当前消费组已经全部消费完毕。
关闭session后,发送新的消息,观察CURRENT-OFFSET保持不变,因为此时没有消费者消费这些消息。重新打开一个消费组继续消费,可以看到控制台输出了新发送的消息,并且偏移量更新了。
对于新的消费组,如果不指定`--from-beginning`参数,默认会从最新的偏移量开始消费。如果需要从头开始消费,需要加上此参数。通过`Math.abs(groupID.hashCode()) % numPartitions`确定消费组在哪个`__consumer_offsets-`中。
为了查询特定消费组的偏移量,可以先通过`consume_group`确定分区数,例如对于`szz1-group`,通过哈希求模运算确定消费组的偏移量信息存于哪个分区。可以通过命令查询,其结构为键和值,键由消费组+Topic+分区数确定,值包含了消费组的偏移量信息。
日常运维与问题排查方面,滴滴开源的LogiKM一站式Kafka监控与管控平台能够提供高效的支持。