CN115495265A - 一种基于hadoop提高kafka消费能力的方法 - Google Patents
一种基于hadoop提高kafka消费能力的方法 Download PDFInfo
- Publication number
- CN115495265A CN115495265A CN202211181457.8A CN202211181457A CN115495265A CN 115495265 A CN115495265 A CN 115495265A CN 202211181457 A CN202211181457 A CN 202211181457A CN 115495265 A CN115495265 A CN 115495265A
- Authority
- CN
- China
- Prior art keywords
- consumption
- offset
- kafka
- segment
- hadoop
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
- 238000000034 method Methods 0.000 title claims abstract description 30
- 238000005192 partition Methods 0.000 claims abstract description 49
- 230000007246 mechanism Effects 0.000 abstract description 2
- 238000000638 solvent extraction Methods 0.000 description 2
- 230000009286 beneficial effect Effects 0.000 description 1
- 238000010586 diagram Methods 0.000 description 1
- 238000012544 monitoring process Methods 0.000 description 1
- 239000002994 raw material Substances 0.000 description 1
- 230000011218 segmentation Effects 0.000 description 1
- 238000006467 substitution reaction Methods 0.000 description 1
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/548—Queue
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本发明公开了一种基于hadoop提高kafka消费能力的方法,使用Hadoop平台的MapReduce框架,在map阶段把消费任务分成若干个子任务,每个子任务记录自己的消费开始偏移量和结束偏移量,借助MR框架的失败重试等机制能最大限度的保证每次任务顺序完成,并记录整个任务的执行情况,全部子任务成功,整个消费才算成功,如果本次消费失败,下次消费从上次失败的消费偏移量开始消费。本发明将基于Hadoop的多任务分发及并行计算能力,多任务并行消费,并记录每个任务的消费情况,脱离了对现有分区服务的依赖,实现消费能力的大幅度提升。
Description
技术领域
本发明涉及软件技术领域,尤其涉及一种基于hadoop提高kafka消费能力的方法。
背景技术
Kafka是一种高吞吐量的分布式发布订阅消息系统。现在它已被很多公司作为多种类型的数据管道和消息系统使用。
Kafka基础概念:
A.生产者与消费者
对于Kafka来说客户端有两种基本类型:生产者(Producer)、消费者(Consumer),生产者(也称为发布者)创建消息,而消费者(也称为订阅者)负责消费消息。
B.主题(Topic)与分区(Partition)
在Kafka中,消息以主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。
但是如果把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。
使用一个生活中的例子来说明:现在A城市生产的某商品需要运输到B城市,走的是公路。那么单通道的高速公路不论是在「A城市商品增多」还是「现在C城市也要往B城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。
所以现在引入分区(Partition)的概念,类似“允许多修几条道”的方式对的主题完成了水平扩展。
C.Broker和集群(Cluster)
一个Kafka服务器也称为Broker,它接受生产者发送的消息并存入磁盘,Broker同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个Broker每秒可以处理成千上万的分区和百万量级的消息。若干个Broker组成一个集群(Cluster),其中集群内某个Broker会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到Broker、监控Broker故障等。在集群内,一个分区由一个Broker负责,这个Broker也称为这个分区的Leader。
当然一个分区可以被复制到多个Broker上来实现冗余,这样当存在Broker故障时可以将其分区重新分配到其他Broker来负责。
在官方提供的CLIENT API中,不同的groupId区分不同的消费分组,同组(groupId相同)的消费者消费的数据的和集等于topic中的数据总和,如果一个分组内的消费者数据总和大于topic的分区数,那么其中的某些消费者是消费不到数据的。
如果同一分组内的消费者数量小于topic的分区数,那么其中的某些消费者会消费多个分区内的数据。
如果同一分组内的消费者数量刚好等于topic的分区数,那么消费者会和分区一一对应。
所以,要提高消费端的消费效率,只有通过增加topic的分区数量,但是分区数量从服务器硬件环境来说是有限的。
发明内容
本发明的目的就在于为了解决上述问题而提供一种基于hadoop提高kafka消费能力的方法,本发明在有限的topic分区数量下,解决通过增加分区数量提高Kafka消费能力和Kafka服务资源是有限的分区数量不能无限增加这个问题,通过脱离对Kafka分区资源的依赖,基于Hadoop提供的多任务能力,划分消费区间,记录每个任务的消费情况,从而实现并行消费的能力,成倍提高消费效率。
要提高同一个组下的并行消费能力,必须是增加Topic下的Partition数量;但是一个Kafka节点的资源是有限的,不能无止境的增加Partition。就需要一种多线程消费方式,在同一个组下,让一个Partition能被多个Consumer消费,并能保证数据的完整性,这种高效的消费方式,有效的增加了应用机器的利用率,而不是一味的给Kafka服务器本身增加压力。
要实现上面的技术方案,有两个关键点,一是当开始一次新消费时,需要对topic的每一个分区(partition),在这个时间点,记录它的开始偏移量(S)和结束偏移量(E),然后以固定步长(T)把(E-S)区间内的数据分成N份,每一份数据由一个独立的线程读取,M个Partition就需要M*N个线程来完成整个消费;二是对当前组下已经消费过一次或者N次的情况,需要知道上一次每个Partition的最后消费的偏移量,拿到这个偏移量,作为此次消费的开始偏移量。因此记录每次消费的结束偏移量以及每个分段的开始和结束偏移量是保证整个消费数据完整性的必要条件。
使用Hadoop平台的MapReduce框架,在map阶段把消费任务分成若干个子任务,每个子任务记录自己的消费开始偏移量和结束偏移量,借助MR框架的失败重试等机制能最大限度的保证每次任务顺序完成,并记录整个任务的执行情况,全部子任务成功,整个消费才算成功,如果本次消费失败,下次消费从上次失败的消费偏移量开始消费。
本发明通过以下技术方案来实现上述目的:
一种基于hadoop提高kafka消费能力的方法,包括以下步骤:
步骤1,读取上一次记录的日志信息,获取本次消费的开始偏移量;
步骤2,建立与Kafka服务的连接;
步骤3,为每一个分区划分数据段;
设定分段的的步长为T,的数据分段数量N使用公式N=(endOffset-beginOffset)/T,当N不为整数时,需要在N的基础上再加1,计算出N1,N2……Nn;
步骤4,为每个分区的每个分段设置开始偏移量和结束偏移量;
步骤5,根据前面计算的分段数量以及每个分段的开始和结束偏移量为每个Consumer指定消费区间;
步骤6,完成消费过程,得到消费数据;
当N1+N2+……+Nn个Map任务都正确结束后,的整个消费过程也就顺利完成;
步骤7,记录消费结果的日志;
把消费成功的结果信息记录在HDFS的特定目录中,供下次任务启动时读取,获取消费开始偏移量。
进一步方案为,所述步骤1中,读取HDFS(Hadoop集群的存储系统)中记录的上次消费的结束偏移量,作为本次消费的开始偏移量。
进一步方案为,所述步骤2中,通过Kafka Client提供的KafkaConsumer建立和Kafka连接。
进一步方案为,所述步骤4中,在知道每个分区的分段数量、开始偏移量和结束偏移量以及设定的步长T后,根据分段开始偏移量计算公式SN=beginOffset+(N-1)*T+1其中N为当前第几个分段,第一个分段的开始偏移量S1=beginOffset,第二个分段的开始偏移量S2=beginOffset+(2-1)*T+1,计算出Partition1的每一个分段的开始偏移量S1N;根据分段的结束偏移量公式EN=beginOffset+N*T,第一个分段的结束偏移量E1=beginOffset+1*T,最后一个分段的偏移量EN=endOffset,计算出Partition1的每个分段的结束偏移量E1N。
进一步方案为,所述步骤5中,使用Kafka提供的API建立Kafka的消费端consumer,并使用assign方法指定消费的Partition以及使用seek方法指定消费的开始偏移量。
本发明的有益效果在于:
本发明将基于Hadoop的多任务分发及并行计算能力,多任务并行消费,并记录每个任务的消费情况,脱离了对现有分区服务的依赖,实现消费能力的大幅度提升。
本发明脱离了传统方法需要依赖Kafka服务的分区能力,基于Hadoop的多任务能力提高数据消费的并行度,减轻了Kafka服务本身的资源访问压力,增加了应用机器的利用率。
附图说明
为了更清楚地说明本发明实施例中的技术方案,下面将对实施例或现有技术描述中所需要实用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
图1为本发明基于Hadoop的MapReduce框架在Map阶段并行消费示意图。
具体实施方式
为使本发明的目的、技术方案和优点更加清楚,下面将对本发明的技术方案进行详细的描述。显然,所描述的实施例仅仅是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动的前提下所得到的所有其它实施方式,都属于本发明所保护的范围。
在任一实施例中,如图1所示,本发明的一种基于hadoop提高kafka消费能力的方法,包括:
本次实施是在Hadoop集群已经部署,Kafka集群已经部署,并且网络和基础环境能够正常工作的情况下,假设已经知道了本次消费的Topic有三个Partition分别为Partition1、Partition2和Partition3,他们分别对应的开始偏移量为beginOffsets1、beginOffsets2、beginOffsets3,结束偏移量为endOffsets1、endOffsets2、endOffsets3。
1、读取上一次记录的日志信息,获取本次消费的开始偏移量;
读取HDFS(Hadoop集群的存储系统)中记录的上次消费的结束偏移量,作为本次消费的开始偏移量。
2、建立与Kafka服务的连接;
通过Kafka Client提供的KafkaConsumer建立和Kafka连接。
3、为每一个分区(Partition)化分数据段;
设定分段的的步长为T,的数据分段数量N使用公式N=(endOffset-beginOffset)/T,当N不为整数时,需要在N的基础上再加1,那么的Partition1的划分的数据段N1的数量如下:
依次类推,分别计算出Partition2和Partition3的分段数量N2和N3。
4、为每个分区的每个分段设置开始偏移量和结束偏移量;
在知道每个分区的分段数量、开始偏移量和结束偏移量以及设定的步长T后,根据分段开始偏移量计算公式SN=beginOffset+(N-1)*T+1其中N为当前第几个分段,第一个分段的开始偏移量S1=beginOffset,第二个分段的开始偏移量S2=beginOffset+(2-1)*T+1,可以计算出Partition1的每一个分段的开始偏移量S1N。根据分段的结束偏移量公式EN=beginOffset+N*T,第一个分段的结束偏移量E1=beginOffset+1*T,最后一个分段的偏移量EN=endOffset,可以计算出Partition1的每个分段的结束偏移量E1N。具体计算过程如下:
其中_beginOffset就是Partition1分区的每个分段的开始偏移量,_endOffset就是Partition1分区的每个分段的结束偏移量。把每个分段的开始和结束偏移量以及每个分区的开始和结束偏移量都记录下来,为后面的每个分段消费的失败重试和下一次消费需要的每个分区的开始消费偏移量做数据准备。
5、根据前面计算的分段数量以及每个分段的开始和结束偏移量为每个Consumer指定消费区间;
使用Kafka提供的API建立Kafka的消费端consumer,并使用assign方法指定消费的Partition以及使用seek方法指定消费的开始偏移量,比如Partition1的某一个分段,如下。
consumer.assign(partition1)
consumer.seek(partition1,_beginOffset)
Kafka消费数据是一个轮询的方式,逐条消费的,当消费到设定的endOffset时,就需要停止消费进程,以免过度消费,导致数据重复,伪代码如下:
其中record表示Kafka中的每一个数据。
6、完成消费过程,得到消费数据;
当N1+N2+N3个Map任务都正确结束后,的整个消费过程也就顺利完成。
7、记录消费结果的日志;
把消费成功的结果信息记录在HDFS的特定目录中,供下次任务启动时读取,获取消费开始偏移量。
以上所述,仅为本发明的具体实施方式,但本发明的保护范围并不局限于此,任何熟悉本技术领域的技术人员在本发明揭露的技术范围内,可轻易想到变化或替换,都应涵盖在本发明的保护范围之内。因此,本发明的保护范围应以所述权利要求的保护范围为准。另外需要说明的是,在上述具体实施方式中所描述的各个具体技术特征,在不矛盾的情况下,可以通过任何合适的方式进行组合,为了避免不必要的重复,本发明对各种可能的组合方式不再另行说明。此外,本发明的各种不同的实施方式之间也可以进行任意组合,只要其不违背本发明的思想,其同样应当视为本发明所公开的内容。
Claims (5)
1.一种基于hadoop提高kafka消费能力的方法,其特征在于,包括以下步骤:
步骤1,读取上一次记录的日志信息,获取本次消费的开始偏移量;
步骤2,建立与Kafka服务的连接;
步骤3,为每一个分区划分数据段;
设定分段的的步长为T,的数据分段数量N使用公式N=(endOffset-beginOffset)/T,当N不为整数时,需要在N的基础上再加1,计算出N1,N2……Nn;
步骤4,为每个分区的每个分段设置开始偏移量和结束偏移量;
步骤5,根据前面计算的分段数量以及每个分段的开始和结束偏移量为每个Consumer指定消费区间;
步骤6,完成消费过程,得到消费数据;
当N1+N2+……+Nn个Map任务都正确结束后,的整个消费过程也就顺利完成;
步骤7,记录消费结果的日志;
把消费成功的结果信息记录在HDFS的特定目录中,供下次任务启动时读取,获取消费开始偏移量。
2.如权利要求1所述的一种基于hadoop提高kafka消费能力的方法,其特征在于,所述步骤1中,读取HDFS中记录的上次消费的结束偏移量,作为本次消费的开始偏移量。
3.如权利要求1所述的一种基于hadoop提高kafka消费能力的方法,其特征在于,所述步骤2中,通过Kafka Client提供的KafkaConsumer建立和Kafka连接。
4.如权利要求1所述的一种基于hadoop提高kafka消费能力的方法,其特征在于,所述步骤4中,在知道每个分区的分段数量、开始偏移量和结束偏移量以及设定的步长T后,根据分段开始偏移量计算公式SN=beginOffset+(N–1)*T+1其中N为当前第几个分段,第一个分段的开始偏移量S1=beginOffset,第二个分段的开始偏移量S2=beginOffset+(2-1)*T+1,计算出Partition1的每一个分段的开始偏移量S1N;根据分段的结束偏移量公式EN=beginOffset+N*T,第一个分段的结束偏移量E1=beginOffset+1*T,最后一个分段的偏移量EN=endOffset,计算出Partition1的每个分段的结束偏移量E1N。
5.如权利要求1所述的一种基于hadoop提高kafka消费能力的方法,其特征在于,所述步骤5中,使用Kafka提供的API建立Kafka的消费端consumer,并使用assign方法指定消费的Partition以及使用seek方法指定消费的开始偏移量。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202211181457.8A CN115495265A (zh) | 2022-09-27 | 2022-09-27 | 一种基于hadoop提高kafka消费能力的方法 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202211181457.8A CN115495265A (zh) | 2022-09-27 | 2022-09-27 | 一种基于hadoop提高kafka消费能力的方法 |
Publications (1)
Publication Number | Publication Date |
---|---|
CN115495265A true CN115495265A (zh) | 2022-12-20 |
Family
ID=84472280
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202211181457.8A Pending CN115495265A (zh) | 2022-09-27 | 2022-09-27 | 一种基于hadoop提高kafka消费能力的方法 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN115495265A (zh) |
Cited By (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN115866017A (zh) * | 2023-02-27 | 2023-03-28 | 天翼云科技有限公司 | 消息处理方法、装置、通信设备及存储介质 |
-
2022
- 2022-09-27 CN CN202211181457.8A patent/CN115495265A/zh active Pending
Cited By (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN115866017A (zh) * | 2023-02-27 | 2023-03-28 | 天翼云科技有限公司 | 消息处理方法、装置、通信设备及存储介质 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN110648178A (zh) | 一种增加kafka消费能力的方法 | |
CN109739929B (zh) | 数据同步方法、装置及系统 | |
US9590915B2 (en) | Transmission of Map/Reduce data in a data center | |
US8627024B2 (en) | Snapshot based replication | |
US8843632B2 (en) | Allocation of resources between web services in a composite service | |
US20100325473A1 (en) | Reducing recovery time for business organizations in case of disasters | |
US8930501B2 (en) | Distributed data storage system and method | |
US20150222525A1 (en) | Dynamic Rerouting of Service Requests Between Service Endpoints for Web Services in a Composite Service | |
US20120278817A1 (en) | Event distribution pattern for use with a distributed data grid | |
CN108574645B (zh) | 一种队列调度方法及装置 | |
CN111221653B (zh) | 一种服务处理方法、装置及计算机可读存储介质 | |
CN111866045A (zh) | 信息处理方法及其装置、计算机系统及计算机可读介质 | |
CN115292414A (zh) | 一种业务数据同步到数仓的方法 | |
CN116302574B (zh) | 一种基于MapReduce的并发处理方法 | |
US20110131288A1 (en) | Load-Balancing In Replication Engine of Directory Server | |
CN115495265A (zh) | 一种基于hadoop提高kafka消费能力的方法 | |
CN113448757B (zh) | 消息处理方法、装置、设备、存储介质和系统 | |
CN111931105A (zh) | 一种kafka消费指定推送时间数据处理方法 | |
CN111522792A (zh) | 文件迁移方法及装置 | |
CN111435329A (zh) | 一种自动化测试方法和装置 | |
US20220129303A1 (en) | Managing failures in edge computing environments | |
CN110298031B (zh) | 一种词典服务系统及模型版本一致性配送方法 | |
CN112764679A (zh) | 一种动态扩容的方法及终端 | |
CN116467086A (zh) | 数据处理方法、装置、电子设备及存储介质 | |
CN112965805B (zh) | 基于内存映射文件的跨进程异步任务处理方法及系统 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination |