CN114138520B - 分布式消息处理方法及系统 - Google Patents
分布式消息处理方法及系统 Download PDFInfo
- Publication number
- CN114138520B CN114138520B CN202111463171.4A CN202111463171A CN114138520B CN 114138520 B CN114138520 B CN 114138520B CN 202111463171 A CN202111463171 A CN 202111463171A CN 114138520 B CN114138520 B CN 114138520B
- Authority
- CN
- China
- Prior art keywords
- message
- messages
- overtime
- timestamp
- receiving
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/541—Client-server
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/547—Messaging middleware
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/548—Queue
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computer And Data Communications (AREA)
Abstract
本发明公开一种分布式消息处理方法及系统,其中方法包括以下步骤:接收客户端发送的消息发送请求,根据所述消息发送请求生成包含发送时间戳的消息,将所述消息加入相应的消息队列,所述发送时间戳在所述消息队列中具有唯一性,基于所述发送时间戳对消息队列中的消息进行排列;判断消息是否超时,根据判断结果标记超时消息并将所述超时消息备份至分布式文件系统;按照预设的清除时间,周期性的删除消息队列中的超时消息。本发明通过对发送时间戳的设计保证消息传递的顺序;通过对超时消息进行判断和清除步骤的设计,能够令分布式文件系统和消息队列相互配合,便于客户端查询消息的同时减轻消息队列带来的数据存储压力。
Description
技术领域
本发明涉及计算机软件技术领域,尤其涉及一种分布式消息处理方法及系统。
背景技术
Hadoop平台是最著名的大数据平台,现今大数据处理技术栈都基于Hadoop构建,而基于大数据平台的大多应用程序都需要处理分布式的消息;
然而现有消息处理系统过于专业化/复杂化或过于简单,如微软的MSMQ为具有强大功能和硬交付保证的工业级消息处理系统,但其部署需要微软的专有系统,成本高昂;如亚马逊提供了一种低成本的简单队列服务(Simple Queue Service,SQS),它仅具有基本的消息传递操作(发送/接收),无法保证消息传递的顺序。
故现今为Hadoop用户提供保证消息传递顺序的消息队列功能时,往往需要额外部署现今过于专业化/复杂化的消息处理系统,成本过高且将提高对应应用程序的复杂性。
发明内容
本发明针对现有技术中的缺点,提供了能够按序传递消息且部署成本低的一种分布式消息处理方法及系统。
为了解决上述技术问题,本发明通过下述技术方案得以解决:
一种分布式消息处理方法,包括以下步骤:
接收客户端发送的消息发送请求,根据所述消息发送请求生成包含发送时间戳的消息,将所述消息加入相应的消息队列,所述发送时间戳在所述消息队列中具有唯一性,基于所述发送时间戳对消息队列中的消息进行排列;
判断消息是否超时,根据判断结果标记超时消息并将所述超时消息备份至分布式文件系统;
按照预设的清除时间,周期性的删除消息队列中的超时消息。
作为一种可实施方式,将所述消息加入相应的消息队列后,还包括消息传递步骤,具体步骤为:
接收客户端发送的消息接收请求,基于消息接收请求扫描消息队列和/或分布式文件系统中的消息,获得第一待传递消息;
基于发送时间戳按序为第一待传递消息添加送达时间戳,并标记所述第一待传递消息为已传递后将其发送至客户端。
作为一种可实施方式:
接收客户端发送的消息接收请求,所述消息接收请求包括最近接收时间戳;
基于所述消息接收请求获取当前接收时间戳并反馈,使客户端根据所述当前接收时间戳更新其最近接收时间戳;
基于最近接收时间戳计算待扫描时间,基于待扫描时间判断是否需要扫描超时消息,当判定不需要扫描超时消息时,基于最近接收时间戳扫描消息队列的消息,否则基于最近接收时间戳扫描消息队列和分布式文件系统中的消息,获得第二待传递消息;
从第二待传递消息中提取第一待传递消息。
作为一种可实施方式:
当最近接收时间戳为空时,对消息队列和分布式文件系统中的所有消息进行扫描。
作为一种可实施方式:
消息发送请求包括传递保证;
获取第二待传递消息的传递保证,并检测所述第二待传递消息的传递情况,获得检测结果;
基于所述传递保证和所述检测结果从第二待传递消息中提取第一待传递消息。
作为一种可实施方式:
发送时间戳包括时间、随机数和对应时间戳管理节点的ID。
作为一种可实施方式:
消息队列中的消息分布式存储在嵌入式数据库中。
本发明还提出一种分布式消息处理系统,与分布式文件系统相连,包括消息管理节点、时间戳管理节点和客户端;
所述消息管理节点包括:
消息发布模块,用于接收客户端发送的消息发送请求,根据所述消息发送请求生成包含发送时间戳的消息,将所述消息加入相应的消息队列,所述发送时间戳由对应的时间戳管理节点发放,所述发送时间戳在所述消息队列中具有唯一性;所述消息发布模块还用于基于所述发送时间戳对消息队列中的消息进行排列;
超时判断模块,用于判断消息是否超时,根据判断结果标记超时消息并将所述超时消息备份至分布式文件系统;
超时清除模块,用于按照预设的清除时间,周期性的删除消息队列中的超时消息。
作为一种可实施方式:
所述消息管理节点还包括消息传递模块;
所述消息传递模块,用于接收客户端发送的消息接收请求,基于消息接收请求扫描消息队列和/或分布式文件系统中的消息,获得第一待传递消息;还用于基于发送时间戳按序为第一待传递消息添加送达时间戳,并标记所述第一待传递消息为已传递后将其发送至客户端。
作为一种可实施方式:
所述消息传递模块包括接收单元、时间戳更新单元、扫描单元、提取单元和传递单元;
所述接收单元,用于接收客户端发送的消息接收请求,所述消息接收请求包括最近接收时间戳;
所述时间戳更新单元,用于基于所述消息接收请求从相应的时间戳管理节点处获取当前接收时间戳并反馈,使客户端根据所述当前接收时间戳更新其最近接收时间戳;
所述扫描单元,用于基于最近接收时间戳计算待扫描时间,基于待扫描时间判断是否需要扫描超时消息,当判定不需要扫描超时消息时,基于最近接收时间戳扫描消息队列的消息,否则基于最近接收时间戳扫描消息队列和分布式文件系统中的消息,获得第二待传递消息;
所述提取单元,用于从第二待传递消息中提取第一待传递消息。
本发明由于采用了以上技术方案,具有显著的技术效果:
本发明中根据所述消息发送请求生成包含发送时间戳的消息,且所述发送时间戳在所述消息队列中具有唯一性,从而可基于所述发送时间戳对消息队列中的消息进行排列,令客户端在接收消息时,根据发送时间戳指定的总顺序接消息,从而保证消息传递的顺序;同时通过对超时信息的判断和清除的步骤,能够令分布式文件系统和消息队列相互配合,便于客户端查询消息的同时减轻消息队列带来的数据存储压力;与现有技术相比,基于本发明所提出的方法进行消息处理,在能够确保消息按顺序传递的前提下仅需在现有分布式文件系统的集群中部署嵌入式数据库即可,无需巨大的成本且不改变分布式文件系统本身的属性。
附图说明
为了更清楚地说明本发明实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
图1是本发明一种分布式消息处理方法的流程示意图;
图2是本发明一种分布式消息处理系统的模块连接示意图。
具体实施方式
下面结合实施例对本发明做进一步的详细说明,以下实施例是对本发明的解释而本发明并不局限于以下实施例。
实施例1、一种分布式消息处理方法,如图1所示,包括以下步骤:
S100、接收客户端200发送的消息发送请求,根据所述消息发送请求生成包含发送时间戳的消息,将所述消息加入相应的消息队列,所述发送时间戳在所述消息队列中具有唯一性,基于所述发送时间戳对消息队列中的消息进行排列;
本实施例中,预先基于消息发送请求在相应的消息队列中写入消息行,再向对应的时间戳管理节点300请求发送时间戳,将所得发送时间戳插入消息行后完成消息的生成,此时将其标记为已发送,从而基于消息发送请求完成消息的发送,此过程基于原子方法实现。
在实际使用过程中,客户端200预先创建或加入至消息队列中,从而可向所述消息队列发送消息或接收所述消息队列中的消息。
本实施例中消息发送请求包括客户端200ID、队列ID、消息体和消息类型,还可包括接收者的客户端200ID,其中客户端200ID和队列ID均具有唯一性;所述消息体为消息内容或指向文件的指针,只有发送者/接收者知道如何获取文件;所述消息类型为消息体对应的数据类型信息。上述发送者/接收者通过其客户端200实现收发消息。
注:基于客户端200的请求创建消息队列、将客户端200加入已有消息队列、发送消息、接收消息等为现有常规技术,故本实施例不对其具体实现步骤进行详细介绍。
由上可知,本实施例中根据所述消息发送请求生成包含发送时间戳的消息,且所述发送时间戳在所述消息队列中具有唯一性,从而可基于所述发送时间戳对消息队列中的消息进行排列,令客户端200在接收消息时,根据发送时间戳指定的总顺序接消息,从而保证消息传递的顺序。
S200、判断消息是否超时,根据判断结果标记超时消息并将所述超时消息备份至分布式文件系统400;
本实施例中分布式文件系统400采用Hadoop文件系统(HDFS);
判断消息是否超时的具体方法为:
基于所述消息的发送时间戳获取所述消息的等待时间,当所述等待时间达到预设的超时阈值(2min)时,判定所述消息超时,将该消息标记为超时消息并将所述超时消息备份至分布式文件系统400;
上述消息队列由以队列命名的Hadoop HDFS系统表表示,即,每一个消息队列为一个HDFS文件。本实施例中将消息队列中的消息分布式存储在嵌入式数据库中。
S300、按照预设的清除时间,周期性的删除消息队列中的超时消息。
本领域的技术人员可自行设定清除时间,本实施例中清除时间为2min,即,每隔2min清除一次消息队列中的超时消息。
由此可见,消息队列实际为一个临时表,用于实现对近期活跃消息(超时阈值内的消息)的实时读写,当消息超时后,将所述消息备份至分布式文件系统400,便于后续对消息的查询。
由于分布式文件系统400中往往存储有海量消息,如直接从分布式文件系统400中扫描客户端200待接收的消息,其扫描时间过长,而所有消息均保留在消息队列中,将导致储存消息队列的压力过大,故本实施例对超时消息的判断和清除能够令分布式文件系统400和消息队列相互配合,便于客户端200查询消息的同时减轻消息队列带来的数据存储压力。
与现有技术相比,基于本实施例所提出的方法进行消息处理,在能够确保消息按顺序传递的前提下仅需在现有分布式文件系统400的集群中部署嵌入式数据库即可,无需巨大的成本且不改变Hadoop文件系统本身的属性,例如可伸缩性和容错性等。
进一步地,步骤S100将所述消息加入相应的消息队列后,还包括消息传递步骤,具体步骤为:
S400、接收客户端200发送的消息接收请求,基于消息接收请求扫描消息队列和/或分布式文件系统400中的消息,获得第一待传递消息;
具体实现步骤为:
S410、接收客户端200发送的消息接收请求,所述消息接收请求包括最近接收时间戳;
消息接收请求还包括客户端200ID、队列ID等信息,此属于现有常规技术,故不再本说明书额外告知。
所述最近接收时间戳用于指示所述客户端200上一次发起消息接收请求的时间。
S420、基于所述消息接收请求获取当前接收时间戳并反馈,使客户端200根据所述当前接收时间戳更新其最近接收时间戳;
即,将当前接收时间戳作为所述客户端200下次发起消息接收请求时上报的最近接收时间戳。
S430、基于最近接收时间戳计算待扫描时间,基于待扫描时间判断是否需要扫描超时消息,当判定不需要扫描超时消息时,基于最近接收时间戳扫描消息队列的消息,否则基于最近接收时间戳扫描消息队列和分布式文件系统400中的消息,获得第二待传递消息;
本实施例中通过对最近接收时间戳的设计,仅扫描客户端200上次接收消息至今的时间段内的消息,无需对消息队列和/或分布式文件系统400中所有消息进行扫描,加快扫描速度。
由于本实施例中消息队列相当于临时表,仅储存近期活跃的消息(即,暂时未被清除的消息),超时消息则作为具有持久性的HDFS副本保存至分布式文件系统400中,便于用户通过客户端200接收或查询;故本实施例中通过根据最近接收时间戳计算客户端200上次接收消息至今的时间长度(待扫描时间),从而判断其是否需要扫描超时消息,基于判断结果扫描消息队列和/或分布式文件系统400中对应时段的消息。
具体实现步骤为:
当待扫描时间小于预设的超时阈值,即表明其需扫描的信息均未超时,故判定不需要扫描超时消息,此时基于最近接收时间戳扫描消息队列中的消息(即,从所述客户端200上次接收消息的时间点起至今的消息)。
当待扫描时间大于等于预设的超时阈值,即表明其需扫描超时消息,此时基于最近接收时间戳扫描分布式文件系统400和消息队列中的消息。
在实际处理过程中,最近接收时间戳可为空,当最近接收时间戳为空时,即对消息队列和分布式文件系统400中的所有消息进行扫描。
将扫描所得与所述消息接收请求相对应的消息作为第二待传递消息。
S440、从第二待传递消息中提取第一待传递消息。
由于存在已超时但未清除的消息,从而导致第二待传递消息中存在重复消息,故本实施例对扫描获得的第二待传递消息去重后获得第一待传递消息。
进一步地,本实施例中消息接收请求包括传递保证,所述传递保证用于指定消息的传递次数,本实施例中用于指定消息应当被递送“最少一次”或“最多一次”。
步骤S440从第二待传递消息中提取第一待传递消息还包括检查传递保证步骤,具体步骤如下:
获取第二待传递消息的传递保证,并检测所述第二待传递消息的传递情况,获得检测结果;
基于所述传递保证和所述检测结果从第二待传递消息中提取第一待传递消息。
即,当传递保证为“最多一次”时,仅将未被标记为已传递的第二待传递消息作为第一待传递消息进行传递。
S500、基于发送时间戳按序为第一待传递消息添加送达时间戳,并标记所述第一待传递消息为已传递后将其发送至客户端200,即,在同一消息队列中,严格按照发送时间戳的顺序向客户端200传递消息。
本实施例先将第一待传递消息标记为已传递再将其发送至客户端200,能够有效保持“最多一次”的消息保证,否则,如先将消息传递给客户端200,再根据客户端200反馈的传递结果将消息标记为已传递时,存在客户端200获取消息后出现故障,需等待客户端200复活后才能将消息标记为已传递,而在等待过程中相同的消息可能会被再次传递,从而破坏“最多一次”的传递保证。
进一步地,发送时间戳包括时间(单位为ms)、随机数和对应时间戳管理节点300的ID。
本实施例中送达时间戳和当前接收时间戳的格式与发送时间戳一致。各时间戳管理节点300基于NTP协议与外部的时间服务器同步时间,并且保证同步时间准确性在200ms误差范围内。
由于本实施例中要求发送时间戳在所处消息队列中具有唯一性,才能对消息队列的消息按照发送时间戳的顺序进行排列和处理,而在实际使用时,不同客户端200可在同一时间发布消息,即,客户端200写入消息的时间是相同的,如直接为消息增加指示当前时间的时间戳,无法使发送时间戳达到唯一且严格排序的要求。
故本实施例对发送时间戳的格式做设计,其格式为NTP时间+对应时间戳管理节点300的ID+随机数,上述NTP时间、对应时间戳管理节点300的ID和随机数的长度固定,即使多个客户端200在同一时间发布消息,且为消息分配时间戳的时间戳管理节点300为同一个时,也能保证发送时间戳的唯一性,并能使消息基于发送时间戳进行严格排序,且发送时间戳为一个long数字,便于优化表达。
实施例2、一种分布式消息处理系统,如图2所示,与分布式文件系统400相连,包括若干个消息管理节点100、若干个时间戳管理节点300和若干个客户端200;
所述消息管理节点100包括:
消息发布模块,用于接收客户端200发送的消息发送请求,根据所述消息发送请求生成包含发送时间戳的消息,将所述消息加入相应的消息队列,所述发送时间戳由对应的时间戳管理节点300发放,所述发送时间戳在所述消息队列中具有唯一性;所述消息发布模块110还用于基于所述发送时间戳对消息队列中的消息进行排列;
超时判断模块,用于判断消息是否超时,根据判断结果标记超时消息并将所述超时消息备份至分布式文件系统400;
超时清除模块,用于按照预设的清除时间,周期性的删除消息队列中的超时消息。
进一步地,所述消息管理节点100还包括消息传递模块;
所述消息传递模块,用于接收客户端200发送的消息接收请求,基于消息接收请求扫描消息队列和/或分布式文件系统400中的消息,获得第一待传递消息;还用于基于发送时间戳按序为第一待传递消息添加送达时间戳,并标记所述第一待传递消息为已传递后将其发送至客户端200。
进一步地,所述消息传递模块包括接收单元、时间戳更新单元、扫描单元、提取单元和传递单元;
所述接收单元,用于接收客户端200发送的消息接收请求,所述消息接收请求包括最近接收时间戳;
所述时间戳更新单元,用于基于所述消息接收请求从相应的时间戳管理节点300处获取当前接收时间戳并反馈,使客户端200根据所述当前接收时间戳更新其最近接收时间戳;
所述扫描单元,用于基于最近接收时间戳计算待扫描时间,基于待扫描时间判断是否需要扫描超时消息,当判定不需要扫描超时消息时,基于最近接收时间戳扫描消息队列的消息,否则基于最近接收时间戳扫描消息队列和分布式文件系统400中的消息,获得第二待传递消息;
所述提取单元,用于从第二待传递消息中提取第一待传递消息。
所述传递单元,用于基于发送时间戳按序为第一待传递消息添加送达时间戳,并标记所述第一待传递消息为已传递后将其发送至客户端200。
进一步地,所述提取单元被配置为:
获取第二待传递消息的传递保证,并检测所述第二待传递消息的传递情况,获得检测结果;
基于所述传递保证和所述检测结果从第二待传递消息中提取第一待传递消息。
本实施例中分布式文件系统400采用Hadoop文件系统(HDFS);
消息管理节点100还用于储存各消息队列,即,各消息管理节点100构成包含嵌入式数据库(H2关系型数据库的嵌入式模式)的分布式消息处理器。
各时间戳管理节点300构成分布式时间戳管理器。
上述消息管理节点100、时间戳管理节点300均部署在分布式文件系统400所在集群的机器中。
本实施例中基于嵌入式数据库利用多层数据索引架构和消息数据分片方法储存各消息队列,即,集群的机器包括第一机器和第二机器,其中第一机器为被选举的meta机器,用于负责索引全局数据并告知每个查询每个对应消息的数据存在哪个第二机器中,第二机器用于储存消息。
本实施例中采用现有的哈希算法实现对消息队列的切分,即,按照集群中第二机器的数量确定将各消息队列切分成相应数量的哈希块进行保存。
且本实施例中清除超时消息的设计,在每清除一次过期消息后基于集群中第一机器和第二机器的分布,令储存的消息进行自动均衡,故当集群中机器数量变化的时候,无需进行实时均衡,仅需增加新的哈希块,无需调配原有哈希块,等待清除超时消息后统一进行数据均衡即可。
上述时间戳管理节点300可采用时间戳管理代理(Agent),相关领域的技术人员可根据实际情况,静态/动态的部署时间戳管理节点300,保证集群中每个大区(或rack)具有相同数量的时间戳管理节点300,并保证每个大区(或rack)内的每个子域中具有相同数量的时间戳管理节点300。其中静态部署在人工指定的机器中部署时间戳管理代理(Agent)作为时间戳管理节点300,动态部署指根据各子域内机器的负载情况,自动根据指定数量启动负载最低的的机器中的时间戳管理代理(Agent),并关闭其他机器中的时间戳管理代理(Agent)。
本实施例所提出的分布式消息处理系统在应用时存在以下四种故障,服务器端计算机故障、网络故障、客户端200故障和可疑故障。本系统可依靠Hadoop分布式文件系统400(Hadoop Distributed File System,HDFS)来处理前服务器端计算机故障和网络故障故障。针对客户端200故障和可疑故障,本领域的技术人员能够自行设置消息发送失败的处理机制,如利用原子方法来确保消息被发送或永不发送。
综上,本实施例仅需部署分布式消息处理器和分布式时间戳管理器,即可按序传递消息队列中的消息,与现有如微软的MSMQ等消息处理系统相比,部署成本低,且当使用微软的MSMQ进行消息处理时,要求对应的应用程序必须能够同时处理由MSMQ和/或Hadoop两者/任一者引起的故障,而本实施例可基于Hadoop本身的功能解决故障,无需增加对应的应用程序的复杂度。
对于装置实施例而言,由于其与方法实施例基本相似,所以描述的比较简单,相关之处参见方法实施例的部分说明即可。
本说明书中的各个实施例均采用递进的方式描述,每个实施例重点说明的都是与其他实施例的不同之处,各个实施例之间相同相似的部分互相参见即可。
本领域内的技术人员应明白,本发明的实施例可提供为方法、装置、或计算机程序产品。因此,本发明可采用完全硬件实施例、完全软件实施例、或结合软件和硬件方面的实施例的形式。而且,本发明可采用在一个或多个其中包含有计算机可用程序代码的计算机可用存储介质(包括但不限于磁盘存储器、CD-ROM、光学存储器等)上实施的计算机程序产品的形式。
本发明是参照根据本发明的方法、终端设备(系统)、和计算机程序产品的流程图和/或方框图来描述的。应理解可由计算机程序指令实现流程图和/或方框图中的每一流程和/或方框、以及流程图和/或方框图中的流程和/或方框的结合。可提供这些计算机程序指令到通用计算机、专用计算机、嵌入式处理机或其他可编程数据处理终端设备的处理器以产生一个机器,使得通过计算机或其他可编程数据处理终端设备的处理器执行的指令产生用于实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能的装置。
这些计算机程序指令也可存储在能引导计算机或其他可编程数据处理终端设备以特定方式工作的计算机可读存储器中,使得存储在该计算机可读存储器中的指令产生包括指令装置的制造品,该指令装置实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能。
这些计算机程序指令也可装载到计算机或其他可编程数据处理终端设备上,使得在计算机或其他可编程终端设备上执行一系列操作步骤以产生计算机实现的处理,从而在计算机或其他可编程终端设备上执行的指令提供用于实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能的步骤。
需要说明的是:
说明书中提到的“一个实施例”或“实施例”意指结合实施例描述的特定特征、结构或特性包括在本发明的至少一个实施例中。因此,说明书通篇各个地方出现的短语“一个实施例”或“实施例”并不一定均指同一个实施例。
尽管已描述了本发明的优选实施例,但本领域内的技术人员一旦得知了基本创造性概念,则可对这些实施例做出另外的变更和修改。所以,所附权利要求意欲解释为包括优选实施例以及落入本发明范围的所有变更和修改。
此外,需要说明的是,本说明书中所描述的具体实施例,其零、部件的形状、所取名称等可以不同。凡依本发明专利构思所述的构造、特征及原理所做的等效或简单变化,均包括于本发明专利的保护范围内。本发明所属技术领域的技术人员可以对所描述的具体实施例做各种各样的修改或补充或采用类似的方式替代,只要不偏离本发明的结构或者超越本权利要求书所定义的范围,均应属于本发明的保护范围。
Claims (10)
1.一种分布式消息处理方法,其特征在于,包括以下步骤:
接收客户端发送的消息发送请求,根据所述消息发送请求生成包含发送时间戳的消息,将所述消息加入相应的消息队列,所述发送时间戳在所述消息队列中具有唯一性,基于所述发送时间戳对消息队列中的消息进行排列;
判断消息是否超时,根据判断结果标记超时消息并将所述超时消息备份至分布式文件系统;
按照预设的清除时间,周期性的删除消息队列中的超时消息;
还包括消息传递步骤,具体步骤为:
接收客户端发送的消息接收请求,所述消息接收请求包括最近接收时间戳;
基于所述最近接收时间戳判断是否需要扫描超时消息,当判定不需要扫描超时消息时,基于最近接收时间戳扫描消息队列的消息,否则基于最近接收时间戳扫描消息队列和分布式文件系统中的消息,获得第二待传递消息;
从第二待传递消息中提取第一待传递消息,具体为,对扫描获得的第二待传递消息去重后获得第一待传递消息;
基于发送时间戳按序为第一待传递消息添加送达时间戳,并标记所述第一待传递消息为已传递后将其发送至客户端。
2.根据权利要求1所述的分布式消息处理方法,其特征在于:
所述最近接收时间戳用于指示所述客户端上一次发起消息接收请求的时间。
3.根据权利要求2所述的分布式消息处理方法,其特征在于:
基于最近接收时间戳计算待扫描时间,基于待扫描时间判断是否需要扫描超时消息,当判定不需要扫描超时消息时,基于最近接收时间戳扫描消息队列的消息,否则基于最近接收时间戳扫描消息队列和分布式文件系统中的消息,获得第二待传递消息。
4.根据权利要求3所述的分布式消息处理方法,其特征在于:
当待扫描时间小于预设的超时阈值,判定不需要扫描超时消息;
当待扫描时间大于等于预设的超时阈值,判定需扫描超时消息。
5.根据权利要求3所述的分布式消息处理方法,其特征在于:
当最近接收时间戳为空时,对消息队列和分布式文件系统中的所有消息进行扫描。
6.根据权利要求1至5任一所述的分布式消息处理方法,其特征在于:
消息发送请求包括传递保证,所述传递保证用于指定相应消息的传递次数;
获取第二待传递消息的传递保证,并检测所述第二待传递消息是否被标记为已传递,获得检测结果;
基于所述传递保证和所述检测结果从第二待传递消息中提取第一待传递消息。
7.根据权利要求1至5任一所述的分布式消息处理方法,其特征在于:
发送时间戳包括时间、随机数和对应时间戳管理节点的ID。
8.根据权利要求1至5任一所述的分布式消息处理方法,其特征在于:
消息队列中的消息分布式存储在嵌入式数据库中。
9.根据权利要求1至5任一所述的分布式消息处理方法,其特征在于:
分布式文件系统采用Hadoop文件系统。
10.一种分布式消息处理系统,与分布式文件系统相连,其特征在于,包括消息管理节点、时间戳管理节点和客户端;
所述消息管理节点包括:
消息发布模块,用于接收客户端发送的消息发送请求,根据所述消息发送请求生成包含发送时间戳的消息,将所述消息加入相应的消息队列,所述发送时间戳由对应的时间戳管理节点发放,所述发送时间戳在所述消息队列中具有唯一性;所述消息发布模块还用于基于所述发送时间戳对消息队列中的消息进行排列;
超时判断模块,用于判断消息是否超时,根据判断结果标记超时消息并将所述超时消息备份至分布式文件系统;
超时清除模块,用于按照预设的清除时间,周期性的删除消息队列中的超时消息;
消息传递模块,包括接收单元、扫描单元、提取单元和传递单元;
所述接收单元,用于接收客户端发送的消息接收请求,所述消息接收请求包括最近接收时间戳;
所述扫描单元,用于基于最近接收时间戳判断是否需要扫描超时消息,当判定不需要扫描超时消息时,基于最近接收时间戳扫描消息队列的消息,否则基于最近接收时间戳扫描消息队列和分布式文件系统中的消息,获得第二待传递消息;
所述提取单元,用于从第二待传递消息中提取第一待传递消息,具体为,对扫描获得的第二待传递消息去重后获得第一待传递消息;
所述传递单元,用于基于发送时间戳按序为第一待传递消息添加送达时间戳,并标记所述第一待传递消息为已传递后将其发送至客户端。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202111463171.4A CN114138520B (zh) | 2020-05-13 | 2020-05-13 | 分布式消息处理方法及系统 |
Applications Claiming Priority (2)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202010401681.8A CN111611090B (zh) | 2020-05-13 | 2020-05-13 | 分布式消息处理方法及系统 |
CN202111463171.4A CN114138520B (zh) | 2020-05-13 | 2020-05-13 | 分布式消息处理方法及系统 |
Related Parent Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202010401681.8A Division CN111611090B (zh) | 2020-05-13 | 2020-05-13 | 分布式消息处理方法及系统 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN114138520A CN114138520A (zh) | 2022-03-04 |
CN114138520B true CN114138520B (zh) | 2022-06-28 |
Family
ID=72196925
Family Applications (2)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202111463171.4A Active CN114138520B (zh) | 2020-05-13 | 2020-05-13 | 分布式消息处理方法及系统 |
CN202010401681.8A Active CN111611090B (zh) | 2020-05-13 | 2020-05-13 | 分布式消息处理方法及系统 |
Family Applications After (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202010401681.8A Active CN111611090B (zh) | 2020-05-13 | 2020-05-13 | 分布式消息处理方法及系统 |
Country Status (1)
Country | Link |
---|---|
CN (2) | CN114138520B (zh) |
Families Citing this family (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN112596920A (zh) * | 2020-12-15 | 2021-04-02 | 中国建设银行股份有限公司 | 一种消息处理的方法、装置、电子设备和存储介质 |
CN113778701B (zh) * | 2021-01-07 | 2024-06-18 | 北京沃东天骏信息技术有限公司 | 消息处理方法和装置以及电子设备和介质 |
CN113886329A (zh) * | 2021-10-13 | 2022-01-04 | 北京达佳互联信息技术有限公司 | 一种数据获取方法、装置、系统、设备及存储介质 |
CN115878639B (zh) * | 2022-09-07 | 2023-10-24 | 贝壳找房(北京)科技有限公司 | 二级缓存的一致性处理方法及分布式服务系统 |
Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN108959660A (zh) * | 2018-08-15 | 2018-12-07 | 东北大学 | 一种基于hdfs分布式文件系统的存储方法及使用方法 |
US10581751B1 (en) * | 2015-12-16 | 2020-03-03 | EMC IP Holding Company LLC | Multi-queue based system for throttling backup agent save processes |
Family Cites Families (24)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
WO2007024120A1 (en) * | 2005-08-26 | 2007-03-01 | Electronics And Telecommunications Research Institute | Method for requesting resource and scheduling for uplink traffic in mobile communication and apparatus thereof |
CN101459627B (zh) * | 2008-04-07 | 2012-09-05 | 中兴通讯股份有限公司 | 消息管理方法 |
US10362131B1 (en) * | 2008-06-18 | 2019-07-23 | Amazon Technologies, Inc. | Fault tolerant message delivery |
EP2614484A4 (en) * | 2010-09-10 | 2014-05-07 | Visible Technologies Inc | SYSTEMS AND METHODS FOR MANAGING USER-PRODUCED MEDIA REPUTATIONS |
CN103019866B (zh) * | 2012-10-24 | 2017-02-08 | 北京京东世纪贸易有限公司 | 基于消息队列的分布式方法和系统 |
US9823951B2 (en) * | 2013-02-27 | 2017-11-21 | International Business Machines Corporation | Link optimization for callout request messages |
US9894143B1 (en) * | 2013-11-06 | 2018-02-13 | Amazon Technologies, Inc. | Pre-processing and processing pipeline for queue client |
US9614939B2 (en) * | 2014-05-08 | 2017-04-04 | Google Inc. | Network timeouts using intentionally delayed transmissions |
CN104301203B (zh) * | 2014-09-10 | 2016-04-27 | 腾讯科技(深圳)有限公司 | 一种消息推送方法和设备 |
US10230670B1 (en) * | 2014-11-10 | 2019-03-12 | Google Llc | Watermark-based message queue |
CN107547605B (zh) * | 2016-06-29 | 2020-01-31 | 华为技术有限公司 | 一种基于节点队列的消息读写方法及节点设备 |
CN106850397A (zh) * | 2016-12-13 | 2017-06-13 | 深圳市智物联网络有限公司 | 物联网中消息传递方法和装置 |
CN106789431B (zh) * | 2016-12-26 | 2019-12-06 | 中国银联股份有限公司 | 一种超时监控方法及装置 |
CN106878473B (zh) * | 2017-04-20 | 2021-03-30 | 腾讯科技(深圳)有限公司 | 一种消息处理方法、服务器集群及系统 |
CN107181674A (zh) * | 2017-06-16 | 2017-09-19 | 深圳市盛路物联通讯技术有限公司 | 物联网中消息传递方法和装置 |
CN108009022A (zh) * | 2017-11-06 | 2018-05-08 | 联动优势科技有限公司 | 一种消息处理方法及服务器 |
CN108737208B (zh) * | 2018-03-21 | 2020-09-22 | 北京天融信网络安全技术有限公司 | 基于安全网关深度包检测的连接同步方法、装置及计算机 |
CN109460438B (zh) * | 2018-09-26 | 2024-04-12 | 中国平安人寿保险股份有限公司 | 消息数据存储方法、装置、计算机设备和存储介质 |
US10805094B2 (en) * | 2018-10-08 | 2020-10-13 | International Business Machines Corporation | Blockchain timestamp agreement |
CN109495375B (zh) * | 2018-11-02 | 2021-04-13 | 广州小鹏汽车科技有限公司 | Mqtt消息的处理方法、装置、电子设备及存储介质 |
CN109558425A (zh) * | 2018-11-19 | 2019-04-02 | 郑州云海信息技术有限公司 | 一种缓存的备份方法及装置 |
CN109743137B (zh) * | 2019-01-10 | 2022-01-14 | 浙江小泰科技有限公司 | 一种支持更新的分布式延迟消息队列处理系统 |
CN110392120B (zh) * | 2019-08-15 | 2022-06-21 | 锐捷网络股份有限公司 | 一种消息推送过程中故障的恢复方法及装置 |
CN111104257A (zh) * | 2019-11-30 | 2020-05-05 | 浪潮(北京)电子信息产业有限公司 | 一种备份日志数据的防超时方法、装置、设备及介质 |
-
2020
- 2020-05-13 CN CN202111463171.4A patent/CN114138520B/zh active Active
- 2020-05-13 CN CN202010401681.8A patent/CN111611090B/zh active Active
Patent Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US10581751B1 (en) * | 2015-12-16 | 2020-03-03 | EMC IP Holding Company LLC | Multi-queue based system for throttling backup agent save processes |
CN108959660A (zh) * | 2018-08-15 | 2018-12-07 | 东北大学 | 一种基于hdfs分布式文件系统的存储方法及使用方法 |
Also Published As
Publication number | Publication date |
---|---|
CN111611090B (zh) | 2021-12-28 |
CN111611090A (zh) | 2020-09-01 |
CN114138520A (zh) | 2022-03-04 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN114138520B (zh) | 分布式消息处理方法及系统 | |
CN107465767B (zh) | 一种数据同步的方法和系统 | |
CN103581230B (zh) | 文件传输系统和方法、接收端、发送端 | |
CN111818112B (zh) | 一种基于Kafka系统的发送消息的方法和装置 | |
JP2002501254A (ja) | ネットワークを介したコンテンツをアドレス可能なデータに対するアクセス | |
CN109558065B (zh) | 数据删除方法及分布式存储系统 | |
CN109905479B (zh) | 文件传输方法和装置 | |
CN111427670A (zh) | 任务调度方法和系统 | |
CN110324406B (zh) | 一种获取业务数据的方法和云服务系统 | |
CN107040576A (zh) | 信息推送方法及装置、通讯系统 | |
US20080052341A1 (en) | System and method for processing data associated with a transmission in a data communication system | |
CN111526185B (zh) | 数据下载方法、装置、系统及存储介质 | |
CN110958150B (zh) | 一种服务动态配置的管理方法及装置 | |
CN111245887B (zh) | Hbase连接动态保持方法、设备、存储介质及系统 | |
CN116405547A (zh) | 消息推送方法、装置及处理器、电子设备、存储介质 | |
EP3868071B1 (en) | Distributed state recovery in a system having dynamic reconfiguration of participating nodes | |
CN111756780B (zh) | 一种同步连接信息的方法和负载均衡系统 | |
CN113268540B (zh) | 一种数据同步的方法及装置 | |
CN103248636A (zh) | 离线下载的系统及方法 | |
CN103118045B (zh) | 一种离线下载的方法及系统 | |
CN102546734B (zh) | 数据信息处理系统及方法 | |
CN107741994B (zh) | 一种数据更新方法及装置 | |
WO2007055867A1 (en) | Independent message stores and message transport agents | |
CN115023929A (zh) | 数据同步方法、装置、系统、电子设备及存储介质 | |
CN112600943B (zh) | 高并发数据状态下异构系统的消息同步方法 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |