CN114884975B - 业务消息的处理方法和装置、存储介质及电子装置 - Google Patents
业务消息的处理方法和装置、存储介质及电子装置 Download PDFInfo
- Publication number
- CN114884975B CN114884975B CN202210468874.4A CN202210468874A CN114884975B CN 114884975 B CN114884975 B CN 114884975B CN 202210468874 A CN202210468874 A CN 202210468874A CN 114884975 B CN114884975 B CN 114884975B
- Authority
- CN
- China
- Prior art keywords
- message
- incremental
- data
- service
- messages
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 238000003860 storage Methods 0.000 title claims abstract description 18
- 238000003672 processing method Methods 0.000 title description 18
- 230000001360 synchronised effect Effects 0.000 claims abstract description 118
- 238000012545 processing Methods 0.000 claims abstract description 79
- 238000000034 method Methods 0.000 claims abstract description 53
- 238000004519 manufacturing process Methods 0.000 claims abstract description 25
- 230000004044 response Effects 0.000 claims description 33
- 238000004590 computer program Methods 0.000 claims description 8
- 230000008569 process Effects 0.000 description 15
- 238000004891 communication Methods 0.000 description 11
- 238000010586 diagram Methods 0.000 description 9
- 238000007689 inspection Methods 0.000 description 6
- 230000007246 mechanism Effects 0.000 description 5
- 238000012795 verification Methods 0.000 description 4
- 230000008859 change Effects 0.000 description 3
- 230000003287 optical effect Effects 0.000 description 3
- 230000002159 abnormal effect Effects 0.000 description 2
- 230000009471 action Effects 0.000 description 2
- 230000005540 biological transmission Effects 0.000 description 2
- 230000008878 coupling Effects 0.000 description 2
- 238000010168 coupling process Methods 0.000 description 2
- 238000005859 coupling reaction Methods 0.000 description 2
- 238000005516 engineering process Methods 0.000 description 2
- 230000002093 peripheral effect Effects 0.000 description 2
- 238000005406 washing Methods 0.000 description 2
- XLYOFNOQVPJJNP-UHFFFAOYSA-N water Substances O XLYOFNOQVPJJNP-UHFFFAOYSA-N 0.000 description 2
- 241001178520 Stomatepia mongo Species 0.000 description 1
- 230000005856 abnormality Effects 0.000 description 1
- 230000006978 adaptation Effects 0.000 description 1
- 230000032683 aging Effects 0.000 description 1
- 230000000903 blocking effect Effects 0.000 description 1
- 238000004140 cleaning Methods 0.000 description 1
- 238000010411 cooking Methods 0.000 description 1
- 238000013500 data storage Methods 0.000 description 1
- 230000001934 delay Effects 0.000 description 1
- 230000000694 effects Effects 0.000 description 1
- 230000006870 function Effects 0.000 description 1
- 238000012986 modification Methods 0.000 description 1
- 230000004048 modification Effects 0.000 description 1
- 230000002035 prolonged effect Effects 0.000 description 1
- 239000000779 smoke Substances 0.000 description 1
- 238000010408 sweeping Methods 0.000 description 1
- 239000002699 waste material Substances 0.000 description 1
Classifications
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/01—Protocols
- H04L67/10—Protocols in which an application is distributed across nodes in the network
- H04L67/1095—Replication or mirroring of data, e.g. scheduling or transport for data synchronisation between network nodes
Landscapes
- Engineering & Computer Science (AREA)
- Computer Networks & Wireless Communication (AREA)
- Signal Processing (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本申请公开了一种业务消息的处理方法和装置、存储介质及电子装置,涉及智能家居/智慧家庭技术领域,该方法包括:从消息中间件获取第一设备生产的一组第一增量消息,每个第一增量消息携带有用于标识每个第一增量消息生产顺序的消息标识;当第一消息标识和一组第一增量消息的消息标识不连续时,从目标数据库中拉取与目标业务对应的第一待同步数据,第一消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第一全量数据,第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据;使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据。
Description
技术领域
本申请涉及计算机领域,具体而言,涉及一种业务消息的处理方法和装置、存储介质及电子装置。
背景技术
消息中间件是在分布式系统中完成消息的发送和接收的软件,其广泛应用于系统解耦、异步消息、消息同步等场景。在一些业务场景中,通过消息中间件存储的可以是通过增量消息所发送的消息的增量数据。为了保障业务能够进行,需要保障消息中间件发送的增量数据的可靠性。
在进行增量消息处理时,为了保证消息处理(比如,对于消息查询请求的处理)的可靠性,可以在使用消息中间件的同时,采用定时缓存失效策略:通过为消息中间件缓存的增量数据设置过期时间,并在确定缓存的增量数据过期之后,通过调用实时接口从保存有消息的全量数据的数据库中获取消息的增量数据并更新缓存的内容,可以避免由于缓存的增量数据丢失导致业务无法准确执行。
然而,频繁调用实时接口会导致实时接口压力增大,接口响应时间变长,而不停更新过期时间,有可能会导致消息处理的延迟。由此可见,相关技术中使用消息中间件处理消息的方式,存在由于需要频繁调用实时接口导致的消息处理的响应速度慢的问题。
发明内容
本申请实施例提供了一种业务消息的处理方法和装置、存储介质及电子装置,以至少解决相关技术中使用消息中间件处理消息的方式存在由于需要频繁调用实时接口导致的消息处理的响应速度慢的技术问题。
根据本申请实施例的一个方面,提供了一种业务消息的处理方法,包括:从消息中间件获取第一设备生产的一组第一增量消息,其中,所述一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,所述每个第一增量消息携带有用于标识所述每个第一增量消息的生产顺序的消息标识;在第一消息标识和所述一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与所述目标业务对应的第一待同步数据,其中,所述第一消息标识为第二设备上当前已同步的、所述目标业务的增量消息的消息标识,所述目标数据库中存储有与所述目标业务对应的第一全量数据,所述第一待同步数据用于将所述第二设备上与目标业务对应的业务数据同步为所述第一全量数据;使用所述第一待同步数据将所述第二设备上与所述目标业务对应的业务数据同步为所述第一全量数据。
根据本申请实施例的另一个方面,还提供了一种业务消息的处理装置,包括:第一获取单元,用于从消息中间件获取第一设备生产的一组第一增量消息,其中,所述一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,所述每个第一增量消息携带有用于标识所述每个第一增量消息的生产顺序的消息标识;第一拉取单元,用于在第一消息标识和所述一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与所述目标业务对应的第一待同步数据,其中,所述第一消息标识为第二设备上当前已同步的、所述目标业务的增量消息的消息标识,所述目标数据库中存储有与所述目标业务对应的第一全量数据,所述第一待同步数据用于将所述第二设备上与目标业务对应的业务数据同步为所述第一全量数据;第一同步单元,用于使用所述第一待同步数据将所述第二设备上与所述目标业务对应的业务数据同步为所述第一全量数据。
在一个示例性实施例中,所述装置还包括:发送单元,用于在所述从消息中间件获取第一设备生产的一组第一增量消息之前,响应于所述目标业务的业务处理指令,向所述消息中间件发送第一查询请求,其中,所述第一查询请求用于请求查询所述第一设备生产的、与所述目标业务对应的增量消息,所述一组第一增量消息是所述消息中间件响应于所述第一查询请求返回的增量消息。
在一个示例性实施例中,所述发送单元包括:获取模块,用于响应于所述目标业务的业务处理指令,获取所述第一消息标识,其中,所述第一消息标识为所述第二设备上已同步的、与所述目标业务对应的增量消息的消息标识中的最大消息标识;第一发送模块,用于向所述消息中间件发送所述第一查询请求,其中,所述第一查询请求携带有所述第一设备的设备标识和所述第一消息标识,所述第一查询请求用于请求查询所述第一设备生产的、与所述目标业务对应的增量消息中,消息标识大于所述第一消息标识的增量消息。
在一个示例性实施例中,所述装置还包括:第一确定单元,用于在所述从消息中间件获取第一设备生产的一组第一增量消息之后,确定所述一组第一增量消息的消息标识中大于所述第一消息标识的消息标识,得到一组消息标识,其中,所述第一消息标识为所述第二设备上已同步的增量消息中的最大消息标识;第二确定单元,用于在所述第一消息标识和所述一组消息标识不满足预设连续条件的情况下,确定所述第一消息标识和所述一组第一增量消息的消息标识不连续;第三确定单元,用于在所述第一消息标识和所述一组消息标识满足预设连续条件的情况下,确定所述第一消息标识和所述一组第一增量消息的消息标识连续;其中,所述预设连续条件为所述一组消息标识中的消息标识依次连续、且所述一组消息标识中的最小消息标识与所述第一消息标识连续。
在一个示例性实施例中,所述第一拉取单元包括:第二发送模块,用于向所述目标数据库发送业务数据请求,其中,所述业务数据请求用于请求所述第一设备生产的、与所述目标业务对应的全量数据;第一接收模块,用于接收所述目标数据库响应于所述业务数据请求返回的所述第一全量数据,其中,所述第一待同步数据为所述第一全量数据;或者,第一确定模块,用于确定所述一组第一增量消息的消息标识中大于所述第一消息标识的消息标识,得到一组消息标识,其中,所述第一消息标识为所述第二设备上已同步的增量消息中的最大消息标识;第二确定模块,用于确定所述一组消息标识和所述第一消息标识中缺失的消息标识,得到一组缺失消息标识;第三发送模块,用于向所述目标数据库发送的增量消息请求,其中,所述增量消息请求用于请求所述第一设备生产的、消息标识为所述一组缺失消息标识中的缺失消息标识的增量消息;第二接收模块,用于接收所述目标数据库响应于所述增量消息请求返回的一组缺失增量消息,其中,所述一组缺失增量消息与所述一组缺失消息标识一一对应,所述第一待同步数据为所述一组缺失增量消息中包含的、与所述目标业务对应的增量数据。
在一个示例性实施例中,所述装置还包括:第二同步单元,用于在所述从消息中间件获取第一设备生产的一组第一增量消息之后,在所述第一消息标识和所述一组第一增量消息的消息标识连续的情况下,使用所述每个第一增量消息中包含的、与所述目标业务对应的增量数据,对所述第二设备上与所述目标业务对应的业务数据进行同步,得到与所述目标业务对应的已同步业务消息。
在一个示例性实施例中,所述装置还包括:第二获取单元,用于在从消息中间件获取第一设备生产的一组第一增量消息之后,在目标核查时间到达的情况下,从所述消息中间件获取所述第一设备生产的一组第二增量消息,其中,所述一组第二增量消息中的每个第二增量消息包含与所述目标业务对应的增量数据,所述每个第二增量消息携带有用于标识所述每个第二增量消息的生产顺序的消息标识;第二拉取单元,用于在第二消息标识和所述一组第二增量消息的消息标识不连续的情况下,从所述目标数据库中拉取与所述目标业务对应的第二待同步数据,其中,所述第二消息标识为所述第二设备上当前已同步的、所述目标业务的增量消息的消息标识,所述目标数据库中存储有与所述目标业务对应的第二全量数据,所述第二待同步数据用于将所述第二设备上与目标业务对应的业务数据同步为所述第二全量数据;第三同步单元,用于使用所述第二待同步数据将所述第二设备上与所述目标业务对应的业务数据同步为所述第二全量数据。
根据本申请实施例的又一方面,还提供了一种计算机可读的存储介质,该计算机可读的存储介质中存储有计算机程序,其中,该计算机程序被设置为运行时执行上述业务消息的处理方法。
根据本申请实施例的又一方面,还提供了一种电子装置,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,其中,上述处理器通过计算机程序执行上述的业务消息的处理方法。
在本申请实施例中,采用基于业务消息的消息标识确定业务消息异常、并在业务消息异常时从数据库拉取全量数据进行数据同步的方式,通过从消息中间件获取第一设备生产的一组第一增量消息,其中,一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,每个第一增量消息携带有用于标识每个第一增量消息的生产顺序的消息标识;在第一消息标识和一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第一待同步数据,其中,第一消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第一全量数据,第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据;使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据,由于在消费增量消息时,通过增量消息的消息标识是否连续确定增量消息是否被全部接收到,并在未全部接收到时从数据库拉取数据进行数据同步,在保证数据同步的完整性的同时,仅在确定消费增量消息出现异常时才会调用与数据库之间的通信接口拉取数据,可以实现减少实时接口调用次数的目的,达到提高消息中间件处理消息的响应速度的技术效果,进而解决了相关技术中使用消息中间件处理消息的方式存在由于需要频繁调用实时接口导致的消息处理的响应速度慢的问题。
附图说明
此处的附图被并入说明书中并构成本说明书的一部分,示出了符合本申请的实施例,并与说明书一起用于解释本申请的原理。
为了更清楚地说明本申请实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,对于本领域普通技术人员而言,在不付出创造性劳动性的前提下,还可以根据这些附图获得其他的附图。
图1是根据本申请实施例的一种可选的业务消息的处理方法的硬件环境的示意图;
图2是根据本申请实施例的一种可选的业务消息的处理方法的流程示意图;
图3是根据本申请实施例的另一种可选的业务消息的处理方法的流程示意图;
图4是根据本申请实施例的一种可选的业务消息的处理方法的示意图;
图5是根据本申请实施例的又一种可选的业务消息的处理方法的流程示意图;
图6是根据本申请实施例的一种可选的业务消息的处理装置的结构框图;
图7是根据本申请实施例的一种可选的电子装置的结构框图。
具体实施方式
为了使本技术领域的人员更好地理解本申请方案,下面将结合本申请实施例中的附图,对本申请实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本申请一部分的实施例,而不是全部的实施例。基于本申请中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都应当属于本申请保护的范围。
需要说明的是,本申请的说明书和权利要求书及上述附图中的术语“第一”、“第二”等是用于区别类似的对象,而不必用于描述特定的顺序或先后次序。应该理解这样使用的数据在适当情况下可以互换,以便这里描述的本申请的实施例能够以除了在这里图示或描述的那些以外的顺序实施。此外,术语“包括”和“具有”以及他们的任何变形,意图在于覆盖不排他的包含,例如,包含了一系列步骤或单元的过程、方法、系统、产品或设备不必限于清楚地列出的那些步骤或单元,而是可包括没有清楚地列出的或对于这些过程、方法、产品或设备固有的其它步骤或单元。
根据本申请实施例的一个方面,提供了一种业务消息的处理方法。该业务消息的处理方法广泛应用于智慧家庭(Smart Home)、智能家居、智能家用设备生态、智慧住宅(Intelligence House)生态等全屋智能数字化控制应用场景。可选地,在本实施例中,上述业务消息的处理方法可以应用于如图1所示的由终端设备102和服务器104所构成的硬件环境中。如图1所示,服务器104通过网络与终端设备102进行连接,可用于为终端或终端上安装的客户端提供服务(如应用服务等),可在服务器上或独立于服务器设置数据库,用于为服务器104提供数据存储服务,可在服务器上或独立于服务器配置云计算和/或边缘计算服务,用于为服务器104提供数据运算服务。
上述网络可以包括但不限于以下至少之一:有线网络,无线网络。上述有线网络可以包括但不限于以下至少之一:广域网,城域网,局域网,上述无线网络可以包括但不限于以下至少之一:WIFI(Wireless Fidelity,无线保真),蓝牙。终端设备102可以并不限定于为PC、手机、平板电脑、智能空调、智能烟机、智能冰箱、智能烤箱、智能炉灶、智能洗衣机、智能热水器、智能洗涤设备、智能洗碗机、智能投影设备、智能电视、智能晾衣架、智能窗帘、智能影音、智能插座、智能音响、智能音箱、智能新风设备、智能厨卫设备、智能卫浴设备、智能扫地机器人、智能擦窗机器人、智能拖地机器人、智能空气净化设备、智能蒸箱、智能微波炉、智能厨宝、智能净化器、智能饮水机、智能门锁等。可选地,终端设备可以是用于生产业务消息的设备(消息的发送方),例如,消息生产者(Producer),也可以是用于消费业务消息的设备(消息的接收方),例如,消息消费者(Consumer)。
本申请实施例的业务消息的处理方法可以由服务器104来执行,也可以由终端设备102来执行,还可以是由服务器104和终端设备102共同执行。其中,终端设备102执行本申请实施例的业务消息的处理方法也可以是由安装在其上的客户端来执行。
以由终端设备102来执行本实施例中的业务消息的处理方法为例,图2是根据本申请实施例的一种可选的业务消息的处理方法的流程示意图,如图2所示,该方法的流程可以包括以下步骤:
步骤S202,从消息中间件获取第一设备生产的一组第一增量消息,其中,一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,每个第一增量消息携带有用于标识每个第一增量消息的生产顺序的消息标识。
本实施例中的业务消息的处理方法可以应用到分布式系统中,应用在从业务消息的生产设备将生产的业务消息发送给从消息中间件、并由业务消息的消费设备从消息中间件获取(消费)业务消息的场景。上述的预设业务可以是允许通过消息中间件进行业务消息(或者说,业务信息)同步的业务,例如,可以是云平台中用于进行设备状态信息同步的业务。上述的消息中间件是在分布式系统中完成消息的发送和接收的基础软件,这里,消息中间件通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程通信,例如,消息中间件可以是RabbitMQ、RocketMQ、Kafka等,但不限于此,本实施例中对于消息中间件不做限定。
对于一个消息中间件,其可以对应于消息生产者和消息消费者。消息生产者可以生产出某一业务(例如,目标业务)的增量消息,并将生产的增量消息缓存到消息中间件对应的主题(Topic)中,消息消费者可以通过订阅对应的主题,从消息中间件中获取到消息生产者生产的、与该业务对应的增量消息。
以Kafka为例,相关技术中,在分布式系统中,设备列表相关信息调用频繁,是基于多个业务场景的基础接口。然而,直接请求IOT(Internet of Things,物联网)接口不能达到性能要求,需要做缓存。但与此同时,这些信息又在频繁变更之中,通过Kafka消息订阅存在消息中间件有少量消息丢失的可能,导致无法完全信任消息中间件的可靠性做业务处理。
为了保证消息处理的可靠性,可以在使用消息中间件的同时采用定时时效策略,通过为消息中间件缓存的增量数据设置过期时间,并在确定缓存的增量数据过期之后,通过调用实时接口从保存有消息的全量数据的数据库中获取消息的增量数据并更新缓存的内容,可以避免由于缓存的增量数据丢失导致业务无法准确执行。
例如,如图3所示,图3是根据本申请实施例的另一种可选的业务消息的处理方法的流程示意图,该业务消息的处理方法的流程可以包括以下步骤:
步骤S302,消息中间件接收消息消费者的查询请求。
步骤S304,响应于接收到的查收请求,消息中间件判断缓存的增量数据是否过期,如果是,执行步骤S308,否则,执行步骤S306。
步骤S306,当消息中间件中的增量数据没有过期的情况下,消息中间件可以直接从缓存中获取查询请求所请求的内容(即,增量数据),并异步更新缓存中增量数据的过期时间。
步骤S308,当消息中间件中的增量数据过期的情况下,消息中间件可以请求实时接口,从数据库中获取查询请求所请求的增量数据。
步骤S310,消息中间件可以将从实时接口中获取到的增量消息异步更新至缓存中。
步骤S312,消息中间件可以将获取结果(即,获取到的增量数据)发送给消费者。
然而,对于上述增量消息更新方案,在进行增量数据查询时会不断更新缓存内容,导致缓存操作频繁,而实际上信息变更频率会远远低于更新频率,从而造成资源浪费;并且,频繁调用实时接口会导致实时接口压力增大,接口响应时间变长;而不停更新过期时间,有可能会导致消息延迟。
为了解决上述问题中的至少部分,在本实施例中,可以为每个增量消息设置用于标识每个增量消息的生产顺序的消息标识。上述消息标识可以是消息生产者进行消息生产时为消息添加的,例如,消息生产者在发送增量消息时,可以对增量消息进行该消息唯一地、严格递增地编号,得到该消息的消息标识,生产的增量消息可以先发送给消息中间件,消息中间件可以根据消息的订阅信息进行消息的转发,上述转发可以是直接转发,也可以是基于消息消费者的查询请求进行的被动转发,本实施例中对此不做限定。
对于目标业务,为了提高消息的传输效率,第一设备可以将生产的目标业务的增量消息缓存到消息中间件中,这里,第一设备可以是与目标业务对应的消息生产者。终端设备可以从消息中间件中获取第一设备(对应地,终端设备可以为第二设备)生产的增量消息,得到一组第一增量消息。终端设备获取到上述一组第一增量消息的方式可以有一种或多种,可以是按照预设的时间间隔(例如,3s,5s),周期性地从消息中间件中获取到目标业务的增量消息,也可以是在满足增量消息获取条件(例如,接收到获取目标业务的增量消息的指令)时,从消息中间件中获取到目标业务的增量消息。本实施例中对于获取目标业务增量消息的方式不做限定。
一组第一增量消息可以是与第一设备对应的消息,例如,状态消息、变更消息、选项消息、属性消息等。由于在业务处理中,终端设备需要根据第一设备生产的最新增量消息来处理相关的业务,为了保证业务处理的可靠性,终端设备可以获取每个第一增量消息携带的、用于标识每个第一增量消息的生产顺序的消息标识,得到每个第一增量消息的消息标识。这里,每个第一增量消息的消息标识可以是第一设备根据每个第一增量消息生产的先后顺序,对每个第一增量消息进行编号,得到每个第一增量消息的消息标识,其可以是第一设备所维护的消息编号(或者说,设备号)。
可选地,对每个第一增量消息进行编号的方式可以有一种或多种,可以是使用全局唯一编号对每个第一增量消息按照生产时间的先后顺序进行编号,也可以是根据每个第一增量消息的消息类型对同一消息类型的第一增量消息按照生产时间的先后顺序进行编号,还可以是通过其他方式按照生产时间的先后顺序对每个第一增量消息进行编号,本实施例中此不做限定。
步骤S204,在第一消息标识和一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第一待同步数据。
在本实施例中,终端设备(即,第二设备)当前已同步的、目标业务的增量消息的消息标识为第一消息标识(可以是当前已同步的、目标业务的增量消息的最大消息标识),终端设备可以判断第一消息标识和一组第一增量消息的消息标识是否连续。在第一消息标识和一组第一增量消息的消息标识连续的情况下,表示增量消息已被正确接收,则可以直接进行目标业务的业务数据的增量同步。
在第一消息标识和一组第一增量消息的消息标识不连续的情况下,终端设备可以从目标数据库中拉取与目标业务对应的第一待同步数据,上述的目标数据库中存储有与目标业务对应的第一全量数据,该第一全量数据包含有目标业务对应的所有数据。上述的第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据,其可以是第一全量数据,也可以是按照缺失的消息标识所获取的、缺失的增量数据,本实施例中对此不做限定。
终端设备从目标数据库中拉取第一待同步数据的方式可以有一种或多种,可以是通过与目标业务对应的数据接口从目标数据库中拉取与目标业务对应的第一待同步数据,也可以是通过对应的指令从目标数据库中拉取与目标业务对应的第一待同步数据,还可以是通过其他方式从目标数据库中拉取与目标业务对应的第一待同步数据,本实施例中对此不做限定。
步骤S206,使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据。
在从目标数据库中拉取到第一待同步数据之后,终端设备可以使用第一待同步数据执行数据同步操作,即,使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据。获取待同步数据的方式不同,执行上述数据同步操作的方式可以是不同的,例如,可以直接将第二设备上与目标业务对应的业务数据同步为第一待同步数据(在此情况下,第一待同步数据为第一全量数据),也可以是按照第一待同步数据所对应的消息标识以及一组第一增量消息的消息标识,使用第一待同步数据和一组第一增量消息对第二设备上与目标业务对应的业务数据进行更新(在此情况下,第一待同步数据为与缺失的消息标识对应的增量数据),还可以是其他执行数据同步操作的方式,本实施例中对此不做限定。
通过上述步骤S202至步骤S206,从消息中间件获取第一设备生产的一组第一增量消息,其中,一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,每个第一增量消息携带有用于标识每个第一增量消息的生产顺序的消息标识;在第一消息标识和一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第一待同步数据,其中,第一消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第一全量数据,第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据;使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据,解决了相关技术中的使用消息中间件处理消息的方式存在由于需要频繁调用实时接口导致的消息处理的响应速度慢的问题的技术问题,提高了消息中间件处理消息的响应速度。
在一个示例性实施例中,在从消息中间件获取第一设备生产的一组第一增量消息之前,上述方法还包括:
S11,响应于目标业务的业务处理指令,向消息中间件发送第一查询请求,其中,第一查询请求用于请求查询第一设备生产的、与目标业务对应的增量消息,一组第一增量消息是消息中间件响应于第一查询请求返回的增量消息。
在本实施例中,用户可以对终端设备执行与目标业务对应的业务处理操作,触发生成目标业务的业务处理指令。终端设备可以响应于目标业务的业务处理指令,向消息中间件发送第一查询请求。第一查询请求用于请求查询第一设备生产的、与目标业务对应的增量消息。一组第一增量消息是消息中间件响应于第一查询请求返回的增量消息。
执行业务处理操作的方式可以有一种或多种,例如,用户可以对终端设备的设备操作页面中的特定区域执行与目标业务对应的业务处理操作,触发生成上述业务处理指令。又例如,用户可以通过对终端设备上的按钮或者对应的区域执行与目标业务对应的业务处理操作,触发生成上述业务处理指令。本实施例中对于触发生成业务处理指令的方式不做限定。
消息中间件可以接收到第一查询请求,并响应于接收到的第一查询请求,按照目标业务的业务标识、或者其他标识查询到与目标业务对应的增量消息,得到一组第一增量消息,并将得到的一组第一增量消息发送给终端设备。终端设备可以接收消息中间件发送的一组第一增量消息,从而获取到一组第一增量消息。
可选地,上述业务处理操作可以是一个操作,或者多个操作的组合,可以包括但不限于以下至少之一:点击操作,双击操作,滑动操作,还可以是其他操作,本实施例中对于业务处理操作不做限定。
通过本实施例,基于业务处理指令向消息中间件发送业务的增量消息的查询请求,从而获取特定业务的增量数据,可以提高业务处理的高效性和便捷性。
在一个示例性实施例中,响应于目标业务的业务处理指令,向消息中间件发送第一查询请求,包括:
S21,响应于目标业务的业务处理指令,获取第一消息标识,其中,第一消息标识为第二设备上已同步的、与目标业务对应的增量消息的消息标识中的最大消息标识;
S22,向消息中间件发送第一查询请求,其中,第一查询请求携带有第一设备的设备标识和第一消息标识,第一查询请求用于请求查询第一设备生产的、与目标业务对应的增量消息中,消息标识大于第一消息标识的增量消息。
在本实施例中,为了减少传输的增量消息的数据量,响应于目标业务的业务处理指令,终端设备可以首先获取到第一消息标识。上述的第一消息标识为第二设备上已同步的、与目标业务对应的增量消息的消息标识中的最大消息标识,即,第二设备上最近一次同步成功的消息标识。在获取到第一消息标识后,终端设备可以向消息中间件发送第一查询请求,第一查询请求携带有第一设备的设备标识和第一消息标识。第一设备的设备标识可以是用于唯一标识第一设备,例如,幂等编号,其可以是第一设备的设备编号,例如,MAC(Media Access Control,媒体访问控制)地址,对应地,第一查询请求可以用于请求查询第一设备生产的、与目标业务对应的增量消息中,消息标识大于第一消息标识的增量消息。
在接收到第一查询请求之后,消息中间件可以首先提取出第一设备的设备标识和第一消息标识,然后可以使用第一设备的设备标识和第一消息标识执行查询操作,查询第一设备生产的、与目标业务对应的增量消息中,消息标识大于第一消息标识的增量消息,得到一组第一增量消息。消息中间件执行查询操作的方法可以有一种或多种,可以是根据第一设备的设备标识和第一消息标识,查询到第一设备生产的、与目标业务对应的增量消息中,消息标识大于第一消息标识的增量消息,也可以是先根据第一设备的设备标识查询到第一设备生产的、与目标业务对应的增量消息,然后再根据第一消息标识查询到这些增量消息中,消息标识大于第一消息标识的增量消息,还可以是通过其他方式查询到第一设备生产的、与目标业务对应的增量消息中,消息标识大于第一消息标识的增量消息,本实施例中对此不做限定。
通过本实施例,通过携带有消息生产者的设备标识和已成功不同的增量消息的消息标识查询到预定业务的增量消息,可以提高增量消息获取的便捷性和高效性,同时可以减少对网络传输资源的占用。
在一个示例性实施例中,在从消息中间件获取第一设备生产的一组第一增量消息之后,上述方法还包括:
S31,确定一组第一增量消息的消息标识中大于第一消息标识的消息标识,得到一组消息标识,其中,第一消息标识为第二设备上已同步的增量消息中的最大消息标识;
S32,在第一消息标识和一组消息标识不满足预设连续条件的情况下,确定第一消息标识和一组第一增量消息的消息标识不连续;
S33,在第一消息标识和一组消息标识满足预设连续条件的情况下,确定第一消息标识和一组第一增量消息的消息标识连续。
在本实施例中,在获取到一组第一增量消息之后,终端设备可以从一组第一增量消息的消息标识中确定出大于第一消息标识的消息标识,得到一组消息标识,这里,第一消息标识为第二设备上已同步的增量消息中的最大消息标识。在得到一组消息标识之后,终端设备可以确定第一消息标识和一组消息标识是否满足预设连续条件,这里,预设连续条件为一组消息标识中的消息标识依次连续、且一组消息标识中的最小消息标识与第一消息标识连续。
可选地,在确定一组消息标识时,可以按照从小到大的顺序、或者从大到小的顺序对一组消息标识进行排序,得到消息标识队列,该消息标识队列中的消息标识可以是依次连续,也可以不是依次连续的。根据消息标识队列中的消息标识,终端设备可以确定消息标识队列中是否存在第一消息标识,如果存在,将大于第一消息标识方向上的所有消息标识,确定为一组消息标识,如果不存在,可以确定消息标识队列中大于第一消息标识的最小消息标识,并将大于最小消息标识方向上的所有消息标识,确定为一组消息标识。
可选地,判断第一消息标识和一组第一增量消息的消息标识是否连续可以是判断第一消息标识与一组消息标识是否连续。如果第一消息标识与一组消息标识中的最小消息标识连续、且一组消息标识内的消息标识也连续(一组第一增量消息和当前已同步的增量消息互为连续的消息),则可以认为满足预设连续条件,否则,可以认为不满足预设连续条件。
如果确定第一消息标识和一组第一增量消息的消息标识不连续,终端设备可以向终端设备的使用对象发送提示消息,以提示消息中间件中目标业务对应的增量消息有缺失,或者终端设备出现获取增量消息失败的情况。
例如,对于消息消费者,当前已同步的增量消息的最大消息标识可以为101。在获取到的一组增量消息的消息标识为102-107时,可以认为已同步的增量消息的消息标识和获取到的增量消息的消息标识是连续的。
又例如,对于消息消费者,当前已同步的增量消息的最大消息标识可以为101,在获取到的一组增量消息的消息标识为105-107时,终端设备可以认为已同步的增量消息的消息标识和获取到的一组增量消息的消息标识是不连续的,即,获取到的一组增量消息中有丢失的增量消息,或者终端设备出现获取增量消息失败的情况。在此情况下,可以从数据库拉取全量数据,还可以通过终端设备的扬声器播放增量数据同步异常的提醒消息。
通过本实施例,通过判断已同步的增量消息的最大消息标识与新获取的、消息标识大于最大消息标识的消息标识确定获取到的增量数据是否有丢失,可以提高业务处理的准确性和高效性。
在一个示例性实施例中,可以采用多种方式获取第一待同步数据,可以是获取目标业务对应的全量数据,也可以是获取缺失的增量消息中的增量数据。
作为一种可选的实施方式,从目标数据库中拉取与目标业务对应的第一待同步数据,包括:
S41,向目标数据库发送业务数据请求,其中,业务数据请求用于请求第一设备生产的、与目标业务对应的全量数据;
S42,接收目标数据库响应于业务数据请求返回的第一全量数据,其中,第一待同步数据为第一全量数据。
终端设备可以向目标数据库发送业务数据请求(例如,通过调用与目标业务对应的数据接口,发送业务数据请求),该业务数据请求携带有第一设备的设备标识,以此来请求获取第一设备生产的、与目标业务对应的全量数据。
目标数据库可以接收到上述业务数据请求,并响应于接收到的业务数据请求,拉取第一设备生产的、与目标业务对应的全量数据,得到第一全量数据,并将第一全量数据发送给终端设备。在此情况下,第一待同步数据为第一全量数据。终端设备可以接收到第一全量数据,并使用第一全量数据进行数据同步。
作为另一种可选的实施方式,从目标数据库中拉取与目标业务对应的第一待同步数据,包括:
S43,确定一组第一增量消息的消息标识中大于第一消息标识的消息标识,得到一组消息标识,其中,第一消息标识为第二设备上已同步的增量消息中的最大消息标识;
S44,确定一组消息标识和第一消息标识中缺失的消息标识,得到一组缺失消息标识;
S45,向目标数据库发送的增量消息请求,其中,增量消息请求用于请求第一设备生产的、消息标识为一组缺失消息标识中的缺失消息标识的增量消息;
S46,接收目标数据库响应于增量消息请求返回的一组缺失增量消息,其中,一组缺失增量消息与一组缺失消息标识一一对应,第一待同步数据为一组缺失增量消息中包含的、与目标业务对应的增量数据。
终端设备可以确定出一组第一增量消息中,消息标识中大于第一消息标识的消息标识,得到一组消息标识,然后确定出一组消息标识和第一消息标识中缺失的消息标识,得到一组缺失消息标识,可以是第一消息标识和一组消息标识中的最大消息标识之间缺失的消息标识。在得到一组缺失消息标识后,终端设备可以向目标数据库发送增量消息请求,该增量消息请求携带有第一设备的设备标识以及上述的一组缺失的消息标识,以此来请求第一设备生产的、消息标识为一组缺失消息标识中的缺失消息标识的增量消息。
目标数据库可以接收到上述的增量消息请求,并响应于接收到的增量消息请求,拉取第一设备生产的、消息标识为一组缺失消息标识中的缺失消息标识的增量消息,得到一组缺失增量消息,并将得到的一组缺失增量消息发送给终端设备。在此情况下,第一待同步数据为一组缺失增量消息。终端设备可以接收到一组缺失增量消息,并使用一组缺失增量消息进行数据同步。
例如,消息消费者收到增量消息之后,处理成功则将成功的消息编号(即,消息标识)保存在消费端的轻量级缓存中。这样,如果收到的消息编号出现跳跃的情况,比如,从101变到了105,则说明中间消息有丢失或者处理上有失败的情况,可以通过幂等性操作,拉取全量消息进行同步,从而保障本条消息完整性的高度可靠性。
当消息编号从101变到105,消息消费者可以从数据库拉取全量消息,也可以确定出缺失的消息编号为102-104,并使用消息编号102-104拉取与这些消息编号对应的增量消息,从而使用拉取的增量消息和已接收到的消息编号为105以及之后的增量消息进行消息同步。
通过本实施例,通过从数据库拉取全量消息或者缺失的增量消息,可以提高数据同步的灵活性,提升数据同步的完整性。
在一个示例性实施例中,在从消息中间件获取第一设备生产的一组第一增量消息之后,上述方法还包括:
S51,在第一消息标识和一组第一增量消息的消息标识连续的情况下,使用每个第一增量消息中包含的、与目标业务对应的增量数据,对第二设备上与目标业务对应的业务数据进行同步,得到与目标业务对应的已同步业务消息。
在本实施例中,在确定第一消息标识和一组第一增量消息的消息标识连续时,可以确定未发现缺失的增量消息,可以使用每个第一增量消息中包含的、与目标业务对应的增量数据,对第二设备上与目标业务对应的业务数据进行同步。同步完成后,终端设备可以得到与目标业务对应的已同步业务消息。
在进行数据同步时,可以按照每个第一增量消息的消息标识由小到大的顺序,使用每个第一增量消息中包含、与目标业务对应的增量数据,依次对第二设备上与目标业务对应的业务数据进行同步,从而实现业务数据同步。
通过本实施例,确定增量消息未缺失时使用获取到的增量消息进行数据同步,可以提高业务处理的高效性和便捷性。
在一个示例性实施例中,在从消息中间件获取第一设备生产的一组第一增量消息之后,上述方法还包括:
S61,在目标核查时间到达的情况下,从消息中间件获取第一设备生产的一组第二增量消息,其中,一组第二增量消息中的每个第二增量消息包含与目标业务对应的增量数据,每个第二增量消息携带有用于标识每个第二增量消息的生产顺序的消息标识;
S62,在第二消息标识和一组第二增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第二待同步数据,其中,第二消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第二全量数据,第二待同步数据用于将第二设备上与目标业务对应的业务数据同步为第二全量数据;
S63,使用第二待同步数据将第二设备上与目标业务对应的业务数据同步为第二全量数据。
为了进一步保证消息中间件的可靠性,避免消息中间件中有未处理的目标业务的增量消息,同时检验第二设备中已同步的、目标业务的业务数据是否有缺失,可以按照时间戳进行增量消息消费,辅助批量检查成功的消息编号,直到全部成功,才做点前检查点(checkpoint)的提交(对应于分布式事务处理的场景),从而保障所有消息可达性的高度可靠性。
在目标核查时间到达的情况下,终端设备可以从消息中间件拉取第一设备生产的一组第二增量消息,一组第二增量消息可以是基于时间戳获取的,例如,基于当前已同步的、目标业务的增量消息的最大消息标识获取的,又例如,基于当前已同步的、目标业务的最新增量消息的生产时间、接收时间获取的。上述的目标核查时间可以是用户预设的时间,例如,在每天的13点拉取第一设备生产的、目标业务的增量消息,在每天的15点拉取第一设备生产的、目标业务的增量消息,也可以是用户预设的时间周期,例如,每隔1小时拉取一次,每隔半小时拉取一次,还可以是满足核查条件的时间,例如,在业务处理完成(例如,已经基于一组第一增量数据完成数据同步)之后拉取一次,本实施例中对此不做限定。
对于一组第二增量消息,终端设备可以采用与前述实施例中相同或者类似的方式获取一组第二增量消息的消息标识,并确定第二消息标识和一组第二增量消息的消息标识是否连续,并基于第二消息标识和一组第二增量消息的消息标识是否连续采用不同的方式进行数据同步,已经进行过描述的,在此不做赘述。
通过本实施例,采用基于时间戳的检查机制,可以保证全量消息同步的完整性,提高业务处理的便捷性和准确性。
下面结合可选示例对本申请实施例中的业务消息的处理方法进行解释说明。在本可选示例中,第一设备为消息的发送方(消息生产者),第二设备为消息的消费方(消息消费者),发送方的设备标识为幂等ID(Identifier,标识),增量消息的消息标识为全局唯一编号,即,消息编号。
本可选示例中提供了一种消息中间件增量同步高可靠性的方案,可以应用到通过基于Java实现的分布式系统,采用Kafka、Mongo和Spring-Boot框架来保障缓存同步,基于消息全局递增的唯一编号机制、基于严格递增和跳跃递增的处理方式保障本条信息的可靠性;基于时间戳的检查机制,保障消息的完整性,从而保障消息中间件的可靠性,使得接口查询可以直接查询消息中间件的缓存,无需调用实时接口(消息中间件和数据库的实时接口),从而提升接口的整体响应速度。
结合图4和图5所示,本可选示例中的业务消息的处理方法的流程可以包括以下步骤:
步骤S502,发送方可以生产与目标业务对应的增量消息。
步骤S504,发送方为生成的增量消息设置本消息的全局唯一编号、幂等ID。全局唯一编号用于表示增量消息生产的时间顺序,增量消息的幂等ID为发送方的设备标识。
步骤S506,发送方将增量消息发送到消息中间件Kafka中。
步骤S508,消息中间件Kafka接收到增量消息并进行缓存。
步骤S510,基于offset(偏移量)消费处理消息分组。
消费方在消费消息时,可以设置两个消费分组,其中一个基于offset进行消费用于业务处理,另一个基于时间戳进行消费用于消息检查及补偿处理。
业务处理时,阻塞消息检查进程,消费方基于offset消费分组消费消息。消费方可以通过第二设备向Kafka发送携带有幂等ID的查询请求,消息中间件根据幂等ID,使用Offset从缓存中拉取与幂等ID相关的增量信息。
步骤S512,基于编号判断进行业务处理。消费方可以基于消息的全局唯一编号,确定出增量消息是否为需要进行同步的消息。
步骤S514,消费方比对编号。若当前的消息的全局唯一编号小于上次消息消费者成功同步的编号,执行步骤S516,若当前的消息的全局唯一编号大于上次消费方成功同步的编号(即,B1),且该编号严格+1递增,执行步骤S518,若当前的消息的全局唯一编号大于上次消息消费者成功同步的编号,而该编号不是严格+1递增时,执行步骤S528。
步骤S516,消费方丢弃本增量消息。消费方的业务处理进程如果发现当前消息编号小于上次成功的消息编号,则丢弃。
步骤S518,如果严格+1递增则增量同步,此时,业务处理进程将本增量消息进行增量同步。
步骤S520,基于时间戳,消费增量消息,处理消息分组。基于时间戳,消息检查消费分组消费增量消息。消费方可以设置用于消息检查的时间戳,在设置的时间戳对应的时间到达时,使用幂等ID从消息中间件中拉取增量消息并进行保存到消息检查消费分组中,增量消息的消息编号为B2。
步骤S522,消费方处理增量消息。业务处理完毕一定时间(比如,30秒)没有消息时,停止对消息检查进程的阻塞,消息检查进行继续处理。
步骤S524,判断B2是否大于B1,若是,执行步骤S526,否则,执行步骤S528。
消息检查进程判断消息检查消费分组当前已同步增量消息的消息编号B2是否大于消息处理消费分组的当前已同步增量消息的消息编号B1,即,B2是否大于B1。
步骤S526,消息检查进程丢弃增量消息。
步骤S528,基于幂等ID,全量同步本消息。跳跃递增则幂等同步,基于发送方的幂等ID从数据库中拉取本消息的全量数据并进行同步。
步骤S530,消息同步成功后,更新本消息成功的编号B1。
通过本可选示例,基于消息全局递增的唯一编号机制对本条消息中间是否有丢失进行快速判断,由于通过跳跃递增检查机制保障了消息丢失及时发现,及时进行幂等处理保障了消息的可靠性,同时,由于多数情况下,消息属于+1严格递增,无需幂等同步,从而提升了消息的性能,这两者保障了单条消息的性能和保障性;如果消息消费过程中出现丢失的情况,则可以通过基于时间戳消费的检查进程进行查缺补漏,保障消息消费的完整性。
需要说明的是,对于前述的各方法实施例,为了简单描述,故将其都表述为一系列的动作组合,但是本领域技术人员应该知悉,本申请并不受所描述的动作顺序的限制,因为依据本申请,某些步骤可以采用其他顺序或者同时进行。其次,本领域技术人员也应该知悉,说明书中所描述的实施例均属于优选实施例,所涉及的动作和模块并不一定是本申请所必须的。
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到根据上述实施例的方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本申请的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质(如ROM(Read-Only Memory,只读存储器)/RAM(Random Access Memory,随机存取存储器)、磁碟、光盘)中,包括若干指令用以使得一台终端设备(可以是手机,计算机,服务器,或者网络设备等)执行本申请各个实施例所述的方法。
根据本申请实施例的另一个方面,还提供了一种用于实施上述业务消息的处理方法的业务消息的处理装置。图6是根据本申请实施例的一种可选的业务消息的处理装置的结构框图,如图6所示,该装置可以包括:
第一获取单元602,用于从消息中间件获取第一设备生产的一组第一增量消息,其中,一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,每个第一增量消息携带有用于标识每个第一增量消息的生产顺序的消息标识;
第一拉取单元604,与第一获取单元602相连,用于在第一消息标识和一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第一待同步数据,其中,第一消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第一全量数据,第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据;
第一同步单元606,与第一拉取单元604相连,用于使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据。
需要说明的是,该实施例中的第一获取单元602可以用于执行上述步骤S202,该实施例中的第一拉取单元604可以用于执行上述步骤S204,该实施例中的第一同步单元606可以用于执行上述步骤S206。
通过上述模块,从消息中间件获取第一设备生产的一组第一增量消息,其中,一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,每个第一增量消息携带有用于标识每个第一增量消息的生产顺序的消息标识;在第一消息标识和一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第一待同步数据,其中,第一消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第一全量数据,第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据;使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据,解决了相关技术中的使用消息中间件处理消息的方式存在由于需要频繁调用实时接口导致的消息处理的响应速度慢的问题的技术问题,提高了消息中间件处理消息的响应速度。
在一个示例性实施例中,上述装置还包括:
发送单元,用于在从消息中间件获取第一设备生产的一组第一增量消息之前,响应于目标业务的业务处理指令,向消息中间件发送第一查询请求,其中,第一查询请求用于请求查询第一设备生产的、与目标业务对应的增量消息,一组第一增量消息是消息中间件响应于第一查询请求返回的增量消息。
在一个示例性实施例中,发送单元包括:
获取模块,用于响应于目标业务的业务处理指令,获取第一消息标识,其中,第一消息标识为第二设备上已同步的、与目标业务对应的增量消息的消息标识中的最大消息标识;
第一发送模块,用于向消息中间件发送第一查询请求,其中,第一查询请求携带有第一设备的设备标识和第一消息标识,第一查询请求用于请求查询第一设备生产的、与目标业务对应的增量消息中,消息标识大于第一消息标识的增量消息。
在一个示例性实施例中,上述装置还包括:
第一确定单元,用于在从消息中间件获取第一设备生产的一组第一增量消息之后,确定一组第一增量消息的消息标识中大于第一消息标识的消息标识,得到一组消息标识,其中,第一消息标识为第二设备上已同步的增量消息中的最大消息标识;
第二确定单元,用于在第一消息标识和一组消息标识不满足预设连续条件的情况下,确定第一消息标识和一组第一增量消息的消息标识不连续;
第三确定单元,用于在第一消息标识和一组消息标识满足预设连续条件的情况下,确定第一消息标识和一组第一增量消息的消息标识连续;
其中,预设连续条件为一组消息标识中的消息标识依次连续、且一组消息标识中的最小消息标识与第一消息标识连续。
在一个示例性实施例中,第一拉取单元包括:
第二发送模块,用于向目标数据库发送业务数据请求,其中,业务数据请求用于请求第一设备生产的、与目标业务对应的全量数据;
第一接收模块,用于接收目标数据库响应于业务数据请求返回的第一全量数据,其中,第一待同步数据为第一全量数据。
在一个示例性实施例中,第一拉取单元包括:
第一确定模块,用于确定一组第一增量消息的消息标识中大于第一消息标识的消息标识,得到一组消息标识,其中,第一消息标识为第二设备上已同步的增量消息中的最大消息标识;
第二确定模块,用于确定一组消息标识和第一消息标识中缺失的消息标识,得到一组缺失消息标识;
第三发送模块,用于向目标数据库发送的增量消息请求,其中,增量消息请求用于请求第一设备生产的、消息标识为一组缺失消息标识中的缺失消息标识的增量消息;
第二接收模块,用于接收目标数据库响应于增量消息请求返回的一组缺失增量消息,其中,一组缺失增量消息与一组缺失消息标识一一对应,第一待同步数据为一组缺失增量消息中包含的、与目标业务对应的增量数据。
在一个示例性实施例中,上述装置还包括:
第二同步单元,用于在从消息中间件获取第一设备生产的一组第一增量消息之后,在第一消息标识和一组第一增量消息的消息标识连续的情况下,使用每个第一增量消息中包含的、与目标业务对应的增量数据,对第二设备上与目标业务对应的业务数据进行同步,得到与目标业务对应的已同步业务消息。
在一个示例性实施例中,上述装置还包括:
第二获取单元,用于在从消息中间件获取第一设备生产的一组第一增量消息之后,在目标核查时间到达的情况下,从消息中间件获取第一设备生产的一组第二增量消息,其中,一组第二增量消息中的每个第二增量消息包含与目标业务对应的增量数据,每个第二增量消息携带有用于标识每个第二增量消息的生产顺序的消息标识;
第二拉取单元,用于在第二消息标识和一组第二增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第二待同步数据,其中,第二消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第二全量数据,第二待同步数据用于将第二设备上与目标业务对应的业务数据同步为第二全量数据;
第三同步单元,用于使用第二待同步数据将第二设备上与目标业务对应的业务数据同步为第二全量数据。
此处需要说明的是,上述模块与对应的步骤所实现的示例和应用场景相同,但不限于上述实施例所公开的内容。需要说明的是,上述模块作为装置的一部分可以运行在如图1所示的硬件环境中,可以通过软件实现,也可以通过硬件实现,其中,硬件环境包括网络环境。
根据本申请实施例的又一个方面,还提供了一种存储介质。可选地,在本实施例中,上述存储介质可以用于执行本申请实施例中上述任一项业务消息的处理方法的程序代码。
可选地,在本实施例中,上述存储介质可以位于上述实施例所示的网络中的多个网络设备中的至少一个网络设备上。
可选地,在本实施例中,存储介质被设置为存储用于执行以下步骤的程序代码:
S1,从消息中间件获取第一设备生产的一组第一增量消息,一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,每个第一增量消息携带有用于标识每个第一增量消息的生产顺序的消息标识;
S2,在第一消息标识和一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第一待同步数据,其中,第一消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第一全量数据,第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据;
S3,使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据。
可选地,本实施例中的具体示例可以参考上述实施例中所描述的示例,本实施例中对此不再赘述。
可选地,在本实施例中,上述存储介质可以包括但不限于:U盘、ROM、RAM、移动硬盘、磁碟或者光盘等各种可以存储程序代码的介质。
根据本申请实施例的又一个方面,还提供了一种用于实施上述业务消息的处理方法的电子装置,该电子装置可以是服务器、终端、或者其组合。
图7是根据本申请实施例的一种可选的电子装置的结构框图,如图7所示,包括处理器702、通信接口704、存储器706和通信总线708,其中,处理器702、通信接口704和存储器706通过通信总线708完成相互间的通信,其中,
存储器706,用于存储计算机程序;
处理器702,用于执行存储器706上所存放的计算机程序时,实现如下步骤:
S1,从消息中间件获取第一设备生产的一组第一增量消息,其中,一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,每个第一增量消息携带有用于标识每个第一增量消息的生产顺序的消息标识;
S2,在第一消息标识和一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与目标业务对应的第一待同步数据,其中,第一消息标识为第二设备上当前已同步的、目标业务的增量消息的消息标识,目标数据库中存储有与目标业务对应的第一全量数据,第一待同步数据用于将第二设备上与目标业务对应的业务数据同步为第一全量数据;
S3,使用第一待同步数据将第二设备上与目标业务对应的业务数据同步为第一全量数据。
可选地,通信总线可以是PCI(Peripheral Component Interconnect,外设部件互连标准)总线、或EISA(Extended Industry Standard Architecture,扩展工业标准结构)总线等。该通信总线可以分为地址总线、数据总线、控制总线等。为便于表示,图7中仅用一条粗线表示,但并不表示仅有一根总线或一种类型的总线。通信接口用于上述电子装置与其他设备之间的通信。
存储器可以包括RAM,也可以包括非易失性存储器(non-volatile memory),例如,至少一个磁盘存储器。可选地,存储器还可以是至少一个位于远离前述处理器的存储装置。
作为一种示例,上述存储器706中可以但不限于包括上述业务消息的处理装置中的第一获取单元602、第一拉取单元604、以及第一同步单元606。此外,还可以包括但不限于上述业务消息的处理装置中的其他模块单元,本示例中不再赘述。
上述处理器可以是通用处理器,可以包含但不限于:CPU(Central ProcessingUnit,中央处理器)、NP(Network Processor,网络处理器)等;还可以是DSP(DigitalSignal Processing,数字信号处理器)、ASIC(Application Specific IntegratedCircuit,专用集成电路)、FPGA(Field-Programmable Gate Array,现场可编程门阵列)或者其他可编程逻辑器件、分立门或者晶体管逻辑器件、分立硬件组件。
可选地,本实施例中的具体示例可以参考上述实施例中所描述的示例,本实施例在此不再赘述。
本领域普通技术人员可以理解,图7所示的结构仅为示意,实施上述业务消息的处理方法的设备可以是终端设备,该终端设备可以是智能手机(如Android手机、iOS手机等)、平板电脑、掌上电脑以及移动互联网设备(Mobile Internet Devices,MID)、PAD等终端设备。图7其并不对上述电子装置的结构造成限定。例如,电子装置还可包括比图7中所示更多或者更少的组件(如网络接口、显示装置等),或者具有与图7所示的不同的配置。
本领域普通技术人员可以理解上述实施例的各种方法中的全部或部分步骤是可以通过程序来指令终端设备相关的硬件来完成,该程序可以存储于一计算机可读存储介质中,存储介质可以包括:闪存盘、ROM、RAM、磁盘或光盘等。
上述本申请实施例序号仅仅为了描述,不代表实施例的优劣。
上述实施例中的集成的单元如果以软件功能单元的形式实现并作为独立的产品销售或使用时,可以存储在上述计算机可读取的存储介质中。基于这样的理解,本申请的技术方案本质上或者说对现有技术做出贡献的部分或者该技术方案的全部或部分可以以软件产品的形式体现出来,该计算机软件产品存储在存储介质中,包括若干指令用以使得一台或多台计算机设备(可为个人计算机、服务器或者网络设备等)执行本申请各个实施例所述方法的全部或部分步骤。
在本申请的上述实施例中,对各个实施例的描述都各有侧重,某个实施例中没有详述的部分,可以参见其他实施例的相关描述。
在本申请所提供的几个实施例中,应该理解到,所揭露的客户端,可通过其它的方式实现。其中,以上所描述的装置实施例仅仅是示意性的,例如所述单元的划分,仅仅为一种逻辑功能划分,实际实现时可以有另外的划分方式,例如多个单元或组件可以结合或者可以集成到另一个系统,或一些特征可以忽略,或不执行。另一点,所显示或讨论的相互之间的耦合或直接耦合或通信连接可以是通过一些接口,单元或模块的间接耦合或通信连接,可以是电性或其它的形式。
所述作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,也可以分布到多个网络单元上。可以根据实际的需要选择其中的部分或者全部单元来实现本实施例中所提供的方案的目的。
另外,在本申请各个实施例中的各功能单元可以集成在一个处理单元中,也可以是各个单元单独物理存在,也可以至少两个单元集成在一个单元中。上述集成的单元既可以采用硬件的形式实现,也可以采用软件功能单元的形式实现。
以上所述仅是本申请的优选实施方式,应当指出,对于本技术领域的普通技术人员来说,在不脱离本申请原理的前提下,还可以做出若干改进和润饰,这些改进和润饰也应视为本申请的保护范围。
Claims (8)
1.一种业务消息的处理方法,其特征在于,包括:
从消息中间件获取第一设备生产的一组第一增量消息,其中,所述一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,所述每个第一增量消息携带有用于标识所述每个第一增量消息的生产顺序的消息标识;
在第一消息标识和所述一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与所述目标业务对应的第一待同步数据,其中,所述第一消息标识为第二设备上当前已同步的、所述目标业务的增量消息的最大消息标识,所述目标数据库中存储有与所述目标业务对应的第一全量数据,所述第一待同步数据用于将所述第二设备上与目标业务对应的业务数据同步为所述第一全量数据;
使用所述第一待同步数据将所述第二设备上与所述目标业务对应的业务数据同步为所述第一全量数据;
其中,所述从消息中间件获取第一设备生产的一组第一增量消息之前,所述方法还包括:
响应于所述目标业务的业务处理指令,向所述消息中间件发送第一查询请求,获取所述第一消息标识;
向所述消息中间件发送所述第一查询请求,其中,所述第一查询请求携带有所述第一设备的设备标识和所述第一消息标识,所述第一查询请求用于请求查询所述第一设备生产的、与所述目标业务对应的增量消息中,消息标识大于所述第一消息标识的增量消息,所述一组第一增量消息是所述消息中间件响应于所述第一查询请求返回的增量消息。
2.根据权利要求1所述的方法,其特征在于,在所述从消息中间件获取第一设备生产的一组第一增量消息之后,所述方法还包括:
确定所述一组第一增量消息的消息标识中大于所述第一消息标识的消息标识,得到一组消息标识,其中,所述第一消息标识为所述第二设备上已同步的增量消息中的最大消息标识;
在所述第一消息标识和所述一组消息标识不满足预设连续条件的情况下,确定所述第一消息标识和所述一组第一增量消息的消息标识不连续;
在所述第一消息标识和所述一组消息标识满足预设连续条件的情况下,确定所述第一消息标识和所述一组第一增量消息的消息标识连续;
其中,所述预设连续条件为所述一组消息标识中的消息标识依次连续、且所述一组消息标识中的最小消息标识与所述第一消息标识连续。
3.根据权利要求1所述的方法,其特征在于,所述从目标数据库中拉取与所述目标业务对应的第一待同步数据,包括:
向所述目标数据库发送业务数据请求,其中,所述业务数据请求用于请求所述第一设备生产的、与所述目标业务对应的全量数据;接收所述目标数据库响应于所述业务数据请求返回的所述第一全量数据,其中,所述第一待同步数据为所述第一全量数据;或者,
确定所述一组第一增量消息的消息标识中大于所述第一消息标识的消息标识,得到一组消息标识,其中,所述第一消息标识为所述第二设备上已同步的增量消息中的最大消息标识;确定所述一组消息标识和所述第一消息标识中缺失的消息标识,得到一组缺失消息标识;向所述目标数据库发送的增量消息请求,其中,所述增量消息请求用于请求所述第一设备生产的、消息标识为所述一组缺失消息标识中的缺失消息标识的增量消息;接收所述目标数据库响应于所述增量消息请求返回的一组缺失增量消息,其中,所述一组缺失增量消息与所述一组缺失消息标识一一对应,所述第一待同步数据为所述一组缺失增量消息中包含的、与所述目标业务对应的增量数据。
4.根据权利要求1所述的方法,其特征在于,在所述从消息中间件获取第一设备生产的一组第一增量消息之后,所述方法还包括:
在所述第一消息标识和所述一组第一增量消息的消息标识连续的情况下,使用所述每个第一增量消息中包含的、与所述目标业务对应的增量数据,对所述第二设备上与所述目标业务对应的业务数据进行同步,得到与所述目标业务对应的已同步业务消息。
5.根据权利要求1至4中任一项所述的方法,其特征在于,在从消息中间件获取第一设备生产的一组第一增量消息之后,所述方法还包括:
在目标核查时间到达的情况下,从所述消息中间件获取所述第一设备生产的一组第二增量消息,其中,所述一组第二增量消息中的每个第二增量消息包含与所述目标业务对应的增量数据,所述每个第二增量消息携带有用于标识所述每个第二增量消息的生产顺序的消息标识;
在第二消息标识和所述一组第二增量消息的消息标识不连续的情况下,从所述目标数据库中拉取与所述目标业务对应的第二待同步数据,其中,所述第二消息标识为所述第二设备上当前已同步的、所述目标业务的增量消息的消息标识,所述目标数据库中存储有与所述目标业务对应的第二全量数据,所述第二待同步数据用于将所述第二设备上与目标业务对应的业务数据同步为所述第二全量数据;
使用所述第二待同步数据将所述第二设备上与所述目标业务对应的业务数据同步为所述第二全量数据。
6.一种业务消息的处理装置,其特征在于,包括:
第一获取单元,用于从消息中间件获取第一设备生产的一组第一增量消息,其中,所述一组第一增量消息中的每个第一增量消息包含与目标业务对应的增量数据,所述每个第一增量消息携带有用于标识所述每个第一增量消息的生产顺序的消息标识;
第一拉取单元,用于在第一消息标识和所述一组第一增量消息的消息标识不连续的情况下,从目标数据库中拉取与所述目标业务对应的第一待同步数据,其中,所述第一消息标识为第二设备上当前已同步的、所述目标业务的增量消息的最大消息标识,所述目标数据库中存储有与所述目标业务对应的第一全量数据,所述第一待同步数据用于将所述第二设备上与目标业务对应的业务数据同步为所述第一全量数据;
第一同步单元,用于使用所述第一待同步数据将所述第二设备上与所述目标业务对应的业务数据同步为所述第一全量数据;
所述装置还包括:发送单元,用于在从消息中间件获取第一设备生产的一组第一增量消息之前,响应于目标业务的业务处理指令,向消息中间件发送第一查询请求;
所述发送单元包括:获取模块,用于响应于目标业务的业务处理指令,获取第一消息标识,
第一发送模块,用于向消息中间件发送第一查询请求,其中,第一查询请求携带有第一设备的设备标识和第一消息标识,第一查询请求用于请求查询第一设备生产的、与目标业务对应的增量消息中,消息标识大于第一消息标识的增量消息,所述一组第一增量消息是所述消息中间件响应于所述第一查询请求返回的增量消息。
7.一种计算机可读的存储介质,其特征在于,所述计算机可读的存储介质包括存储的程序,其中,所述程序运行时执行权利要求1至5中任一项所述的方法。
8.一种电子装置,包括存储器和处理器,其特征在于,所述存储器中存储有计算机程序,所述处理器被设置为通过所述计算机程序执行权利要求1至5中任一项所述的方法。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202210468874.4A CN114884975B (zh) | 2022-04-29 | 2022-04-29 | 业务消息的处理方法和装置、存储介质及电子装置 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202210468874.4A CN114884975B (zh) | 2022-04-29 | 2022-04-29 | 业务消息的处理方法和装置、存储介质及电子装置 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN114884975A CN114884975A (zh) | 2022-08-09 |
CN114884975B true CN114884975B (zh) | 2024-03-22 |
Family
ID=82672799
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202210468874.4A Active CN114884975B (zh) | 2022-04-29 | 2022-04-29 | 业务消息的处理方法和装置、存储介质及电子装置 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN114884975B (zh) |
Families Citing this family (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN115695532B (zh) * | 2023-01-04 | 2023-03-10 | 深圳竹云科技股份有限公司 | 利用消息中间件处理消息的方法、装置、计算机设备 |
Citations (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN110765204A (zh) * | 2019-09-30 | 2020-02-07 | 武汉达梦数据库有限公司 | 一种对增量同步异常中断情况的处理方法和装置 |
CN111090699A (zh) * | 2019-12-13 | 2020-05-01 | 北京奇艺世纪科技有限公司 | 业务数据的同步方法和装置、存储介质、电子装置 |
CN111190747A (zh) * | 2019-12-20 | 2020-05-22 | 北京金山云网络技术有限公司 | 用于消息队列的消息丢失检测方法和装置 |
CN112631798A (zh) * | 2020-12-21 | 2021-04-09 | 平安普惠企业管理有限公司 | 消息同步方法、装置、计算机设备及存储介质 |
CN112835918A (zh) * | 2021-02-19 | 2021-05-25 | 浪潮云信息技术股份公司 | 一种MySQL数据库增量同步实现方法 |
-
2022
- 2022-04-29 CN CN202210468874.4A patent/CN114884975B/zh active Active
Patent Citations (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN110765204A (zh) * | 2019-09-30 | 2020-02-07 | 武汉达梦数据库有限公司 | 一种对增量同步异常中断情况的处理方法和装置 |
CN111090699A (zh) * | 2019-12-13 | 2020-05-01 | 北京奇艺世纪科技有限公司 | 业务数据的同步方法和装置、存储介质、电子装置 |
CN111190747A (zh) * | 2019-12-20 | 2020-05-22 | 北京金山云网络技术有限公司 | 用于消息队列的消息丢失检测方法和装置 |
CN112631798A (zh) * | 2020-12-21 | 2021-04-09 | 平安普惠企业管理有限公司 | 消息同步方法、装置、计算机设备及存储介质 |
CN112835918A (zh) * | 2021-02-19 | 2021-05-25 | 浪潮云信息技术股份公司 | 一种MySQL数据库增量同步实现方法 |
Also Published As
Publication number | Publication date |
---|---|
CN114884975A (zh) | 2022-08-09 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN110113381B (zh) | 一种区块链中订阅主题的方法及装置 | |
EP3229420A1 (en) | Method for establishing persistent connection between multiple smart devices and server, and smart device | |
WO2021151312A1 (zh) | 一种确定服务间依赖关系的方法及相关装置 | |
CN111083180B (zh) | 物联网系统、物联网设备联动方法及装置 | |
CN112286698B (zh) | 远程过程调用方法、装置以及远程过程调用执行方法 | |
CN114884975B (zh) | 业务消息的处理方法和装置、存储介质及电子装置 | |
CN111245677A (zh) | 通信异常上报方法、装置、电子设备及存储介质 | |
CN114143299A (zh) | 基于物联网的数据同步方法及系统 | |
CN107517236B (zh) | 一种用于物联网的事件处理方法、装置和设备 | |
CN115296948B (zh) | 场景信息的更新方法和装置、存储介质及电子装置 | |
CN115309062B (zh) | 设备的控制方法、装置、存储介质及电子装置 | |
CN116418613A (zh) | 消息的回流处理方法、装置、存储介质及电子装置 | |
CN114157725B (zh) | 设备联动的方法、装置、服务器、电子设备以及存储介质 | |
CN111479161B (zh) | 一种直播的质量数据上报方法和装置 | |
JP2014216764A (ja) | ゲートウェイ装置及びプログラム | |
CN114501347A (zh) | 异构系统间信息交互方法、装置及系统 | |
CN112765212A (zh) | 中转设备数据处理方法及装置 | |
CN115312050B (zh) | 指令的响应方法、存储介质及电子装置 | |
CN114048017B (zh) | 一种物联网设备协同联动方法和装置 | |
CN116761264B (zh) | 上行资源分配方法、装置、基站及中继终端 | |
CN115002206B (zh) | 智能设备的控制方法和装置、存储介质及电子装置 | |
CN115314331B (zh) | 智能终端的控制方法和装置、存储介质及电子装置 | |
CN114697345B (zh) | 信息更新方法、装置和存储介质及电子装置 | |
CN115695594B (zh) | 物联网数据通信方法和装置 | |
CN116436861A (zh) | 消息发送方法及装置、存储介质及电子装置 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |