CN115280740B - 利用分布式消息队列系统提供流数据弹性的技术 - Google Patents
利用分布式消息队列系统提供流数据弹性的技术 Download PDFInfo
- Publication number
- CN115280740B CN115280740B CN202180021072.2A CN202180021072A CN115280740B CN 115280740 B CN115280740 B CN 115280740B CN 202180021072 A CN202180021072 A CN 202180021072A CN 115280740 B CN115280740 B CN 115280740B
- Authority
- CN
- China
- Prior art keywords
- cloud
- data
- data platform
- message
- message queue
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 238000000034 method Methods 0.000 title claims abstract description 57
- 230000015654 memory Effects 0.000 claims description 9
- 238000001514 detection method Methods 0.000 claims description 8
- 230000004044 response Effects 0.000 claims description 7
- 230000008569 process Effects 0.000 abstract description 35
- 238000012545 processing Methods 0.000 description 21
- 230000006870 function Effects 0.000 description 8
- 230000005540 biological transmission Effects 0.000 description 7
- 238000004891 communication Methods 0.000 description 4
- 239000008186 active pharmaceutical agent Substances 0.000 description 3
- 230000001960 triggered effect Effects 0.000 description 3
- 238000005516 engineering process Methods 0.000 description 2
- 238000012795 verification Methods 0.000 description 2
- 230000006978 adaptation Effects 0.000 description 1
- 238000013502 data validation Methods 0.000 description 1
- 230000001627 detrimental effect Effects 0.000 description 1
- 238000010586 diagram Methods 0.000 description 1
- 230000000694 effects Effects 0.000 description 1
- 239000000203 mixture Substances 0.000 description 1
- 238000012986 modification Methods 0.000 description 1
- 230000004048 modification Effects 0.000 description 1
- 238000012544 monitoring process Methods 0.000 description 1
- 230000006855 networking Effects 0.000 description 1
- 230000003287 optical effect Effects 0.000 description 1
- 230000002093 peripheral effect Effects 0.000 description 1
- 230000002265 prevention Effects 0.000 description 1
- 238000012546 transfer Methods 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L51/00—User-to-user messaging in packet-switching networks, transmitted according to store-and-forward or real-time protocols, e.g. e-mail
- H04L51/21—Monitoring or handling of messages
- H04L51/214—Monitoring or handling of messages using selective forwarding
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L51/00—User-to-user messaging in packet-switching networks, transmitted according to store-and-forward or real-time protocols, e.g. e-mail
- H04L51/21—Monitoring or handling of messages
- H04L51/23—Reliability checks, e.g. acknowledgments or fault reporting
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L63/00—Network architectures or network communication protocols for network security
- H04L63/08—Network architectures or network communication protocols for network security for authentication of entities
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L63/00—Network architectures or network communication protocols for network security
- H04L63/14—Network architectures or network communication protocols for network security for detecting or protecting against malicious traffic
- H04L63/1408—Network architectures or network communication protocols for network security for detecting or protecting against malicious traffic by monitoring network traffic
- H04L63/1416—Event detection, e.g. attack signature detection
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/01—Protocols
- H04L67/10—Protocols in which an application is distributed across nodes in the network
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L69/00—Network arrangements, protocols or services independent of the application payload and not provided for in the other groups of this subclass
- H04L69/40—Network arrangements, protocols or services independent of the application payload and not provided for in the other groups of this subclass for recovering from a failure of a protocol instance or entity, e.g. service redundancy protocols, protocol state redundancy or protocol service redirection
Landscapes
- Engineering & Computer Science (AREA)
- Computer Networks & Wireless Communication (AREA)
- Signal Processing (AREA)
- General Engineering & Computer Science (AREA)
- Computer Security & Cryptography (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Computing Systems (AREA)
- Computer Hardware Design (AREA)
- Physics & Mathematics (AREA)
- General Physics & Mathematics (AREA)
- Information Transfer Between Computers (AREA)
- Computer And Data Communications (AREA)
Abstract
实施例包括接收和处理来自数据服务提供者的数据、将包括数据的多个消息传送到流数据平台以及检测将多个消息向流数据平台的递送失败的技术。实施例还包括将多个消息传送到分布式消息队列系统的分布式消息队列服务、将多个消息中的每一个发布到流数据平台、确定发布到流数据平台的多个消息中的每一个是成功还是不成功、以及重试将多个消息中的每个未成功发布的消息发布到流数据平台的技术。
Description
相关申请
本申请要求于2020年5月22日提交的美国非临时申请号16/881,469和2020年1月14日提交的美国临时申请号62/961,059的优先权,两者的标题均为“TECHNIQUES TOPROVIDE STREAMING DATA RESILIENCY UTILIZING A DISTRIBUTED MESSAGE QUEUESYSTEM”。上述申请的内容通过引用整体并入本文。
背景技术
实时数据处理环境可以包括提供和处理数据的各种类型的数据系统和设备。在实例中,实时数据可能是需要及时处理并提供给其他系统的时间关键和/或任务关键数据。例如,银行系统可能需要处理大量实时数据,以执行各种银行功能,包括处理交易和提供欺诈检测。当发生故障时,数据不丢失以及系统尽快恢复到工作状态至关重要。本文所讨论的实施例旨在解决这些问题和其他问题。
发明内容
本文讨论的实施例旨在以弹性方式提供流数据。例如,实施例可以包括用于提供技术和计算机实施的设备、系统、组件等,包括从数据服务提供者接收数据,将包括数据的一个或多个消息传送到流数据平台,以及检测将一个或多个消息中的消息向流数据平台的递送失败。实施例还包括将消息传送到分布式消息队列服务,其中,基于对递送失败的检测将消息传送到分布式消息队列服务,并存储在分布式消息队列系统的队列中。该系统包括将消息发布到流数据平台,确定发布到流数据平台的消息是成功还是不成功,以及在消息发布不成功时重试将消息发布到流数据平台。
附图说明
图1示出了用于处理数据和提供流数据弹性的系统的示例配置。
图2A示出了经由流数据平台处理数据的示例处理流程。
图2B示出了提供流数据弹性的示例处理流程。
图3A示出了经由流数据平台处理数据的第二示例处理流程。
图3B示出了提供流数据弹性的第二示例处理流程。
图4示出了示例逻辑流程。
图5示出了第二示例逻辑流程。
图6示出了系统架构的示例。
具体实施方式
实施例总体上涉及提供捕获和处理来自一个或多个数据源的流数据的弹性。更具体地,实施例包括利用分布式基于云的消息队列系统来处理和提供包括来自实时数据流的数据的消息。在一种配置中,流系统可以包括接收和处理流数据的应用。应用可以将已处理数据提供给流数据平台,该流数据平台可以被其他消费者应用使用来进一步处理和/或存储数据。例如,流数据平台可以被用于为使用该系统的客户提供智能实时决策。在某些情况下,从应用到流数据平台的数据可能无法被递送。在这些情况下,可以将未递送的数据提供给分布式消息队列系统,以存储在队列中,并且一旦任何问题被解决就提供给流数据平台。
在另一种配置中,流可以包括可以接收数据、处理数据以及利用分布式消息队列系统将所有已处理数据发送到流数据平台的应用。在该配置中,当发生递送失败时,应用可以重试将未递送的发送到流数据平台。这些细节和其他细节将在下面的描述中变得显而易见。
现在参考附图,其中,相同的附图标记自始至终被用于指代相同的元件。在以下描述中,为了解释的目的,阐述了许多具体细节,以便提供对其的透彻理解。然而,显而易见的是,可以在没有这些具体细节的情况下实践新颖的实施例。在其他情况下,以框图形式示出了众所周知的结构和设备,以便于对其进行描述。其意图是涵盖权利要求范围内的所有修改、等同物和替代物。
图1示出了一种系统100,用于实时或近实时处理流数据,同时利用分布式消息队列系统105和服务102提供弹性备份能力。数据可以由一个或多个系统基于任何数量的事件发生来提供。通常,数据由流系统120接收以处理并提供给其他内部系统,诸如消费者系统109。这些系统可以对数据进行额外处理以提供见解。在一个示例中,系统100可以是银行平台的一部分,并且流数据可以与银行功能相关,诸如执行交易、处理信贷或贷款申请以及其他一般银行功能。在实施例中,可能需要数据来执行关键功能,诸如欺诈检测、交易认证、贷款/信贷批准等。分布式消息队列系统105的利用确保了系统100可以通过提供备份数据路径和日志记录能力来继续提供具有容错性的关键功能。
在实施例中,系统100可以包括流数据系统120,其可以接收和处理来自一个或多个内部或外部数据源的流数据,诸如数据提供者系统101。数据提供者系统101可以是任何类型的计算系统,其可以实时或近实时地并且以流方式提供数据。在一个示例中,数据提供者系统101可以是支付处理系统,例如 SynovusFinancial Corp、Fiserv、First data Corp等,并且可以在执行交易时将数据传送到流系统120。数据可以包括交易数据,例如商户识别信息、交易价格、交易时间/日期等。数据还可以包括客户的认证数据,例如用户或卡标识符、帐号或令牌、验证值的卡安全代码、到期日期等。数据提供者系统101可以在交易发生时提供数据,并且可以被一个或多个消费者系统109用于提供见解和作出决定,例如,认证用户、检测欺诈、提供激励、更新模型等等。在实施例中,系统100可以包括多于一个或多个数据提供者系统101,并且不限于这种方式。例如,附加的数据提供者系统101可以包括提供客户服务信息的客户服务系统、提供新闻的新闻服务系统、提供奖励数据的奖励系统、提供语音响应数据的语音响应系统。
在实施例中,流系统120可以通过与一个或多个数据提供者系统101耦合的一个或多个应用编程接口(API)门户来接收数据。在实施例中,数据可以由流系统120的一个或多个服务应用103接收,流系统120被配置为将流数据发送到流数据平台107。服务应用103可以是与一个或多个API耦合的软件引擎或组件,以从数据提供者系统101接收数据、处理数据并将数据提供给消费者系统109。流数据平台107可以处理格式的数据,并且可以利用基于二进制传输控制协议(TCP)的协议。此外,服务应用103可以利用生产者API以发布以Java格式的记录流。在其他情况下,流数据平台107可以利用消费者API并订阅以Java格式的记录流。
在一个示例配置中,如图2A/2B所示,服务应用103可以将一个或多个消息中的已处理数据直接提供给流数据平台107,以发布给消费者系统109。在该配置中,分布式消息队列系统105只能在数据丢失事件期间被用作备份,以确保未递送的消息最终被递送给流数据平台107。在另一种配置中,如图3A/3B所示,服务应用103可以利用分布式消息队列系统105将已处理数据提供给流数据平台107。在该配置中,服务应用103可以处理未递送的数据。更具体地,分布式消息队列系统105可以通知服务应用103未递送的数据,并且服务应用103可以将未递送的已处理数据直接发送到流数据平台107,以发布给消费者系统109。
在实施例中,流系统120可以包括多个不同的服务应用103,以执行各种操作来处理流数据。在一个示例中,服务应用103可以是认证引擎,用于接收和处理用于认证的数据,例如,处理认证请求数据以作出认证决策。在另一个示例中,服务应用103可以是欺诈引擎,用于处理用于欺诈检测的数据,例如交易信息、交易位置、交易金额,例如,进行实时欺诈确定。在第三示例中,服务应用103可以是建模引擎,用于接收和处理用于模型评分的数据,例如,确定模型特征和分数,检测模型错误和警告,以及利用交易数据、分析数据和用于数据验证的其他数据。
在实施例中,流数据平台107可以直接从服务应用103或从分布式消息队列系统105接收已处理的数据,如前所述。流数据平台107可以为一个或多个消费者系统109发布数据,以进一步处理和/或存储数据。例如,消费者系统109可以包括建模系统108,其可以接收包括模型评分的数据以更新模型、生成新模型、执行模型验证并存储在数据存储器中。在另一个示例中,消费者系统109可以包括认证系统110,以进一步处理用于认证的数据,并将认证决策存储在数据存储器中。在第三示例中,消费者系统109可以包括欺诈系统112,以进一步处理数据以执行欺诈防御动作、更新账户状态、应用账户限制、创建欺诈案例和触发警报。
在实施例中,流系统120可以是向客户和提供者提供银行服务的关键系统的一部分,诸如交易处理和欺诈检测。因此,任何数据丢失对利用数据的流系统120和消费者系统109都是有害的。以前的解决方案使用系统和服务器来记录数据,并且在这种情况下有时会受到相同的中断和主系统处理数据的影响。在一个示例中,以前的系统利用配置有的系统(包括APACHE的log4j)来记录故障事件期间的数据,并在主系统恢复联机时提供。然而,在某些情况下,当故障事件发生时,它也会关闭(take down)/>系统本身。在其他情况下,当APACHE的log4j在故障期间记录数据时,它上线很慢,例如,十五分钟多。
本文讨论的实施例旨在提供存储和转发解决方案,以提供弹性并消除故障事件停机时间。更具体地,实施例包括利用具有分布式消息队列服务102、队列104和分布式消息客户端106的分布式消息队列系统105来在故障事件期间存储和转发数据。在一个示例中,分布式消息队列系统105可以是基于云的系统,诸如亚马逊网络服务并且分布式消息队列服务102可以是AWS的简单队列服务/>此外,分布式消息队列客户端106可以是AWS的/>客户端。
在实施例中,如前所述,仅当检测到故障时,才可以利用分布式消息队列系统105。更具体地,服务应用103可以接收流数据,处理流数据,并生成一个或多个消息以将数据发送到流数据平台107。服务应用103可以直接传送(例如,不利用分布式消息队列系统105)流数据到流数据平台107。在一个示例中,SQS的重定向功能可以被设置为真,例如,对于服务应用103,“redirect.sdp.messages.to.sqs=false”。
在某些情况下,服务应用103可以检测到一个或多个消息向流数据平台107的递送失败。例如,服务应用103可以接收或确定递送超时(100毫秒(ms))或确定连接失败。在这些情况下,服务应用103可以被配置为将失败消息重定向到分布式消息队列系统105,以确保与失败消息相对应的流数据不会丢失,并最终被递送到流数据平台107和消费者系统109。在一些实施例中,分布式消息队列系统105可以被配置为当失败消息正在被递送时触发警报。当接收和处理指定数量的“错误”消息时,可能会触发警报。例如,使用度量过滤器进行错误监控的云观察警报–“ERROR”–GUID在5分钟内>10。也可以从分布式消息队列系统中设置QueueDepth CloudWatch警报,在1分钟内>1000,共3次。
在实施例中,服务应用103可以经由API将一个或多个失败消息传送到分布式消息队列系统105。更具体地,服务应用103可以调用或利用分布式消息队列服务102,并发送一个或多个消息以存储在队列104中。在实施例中,队列104可以是加密队列,其中一个或多个主题被配置为在每个区域中针对每个应用进行通知。在一个示例中,服务应用103可以调用SQS的“SendMessageRequest”实例,并包括队列的名称(sqs.queueName=)、队列的区域(sqs.region=)和消息的主体。然后将请求传递给分布式消息队列服务102发送消息方法,该方法可以返回发送消息响应对象。
在实施例中,分布式消息队列系统105可以将消息发布或发送到流数据平台107和/或将它们保持在队列104中,直到它们可以被递送。更具体地,分布式消息队列客户端106可以将一个或多个消息发布到流数据平台107。在一个示例中,分布式消息队列客户端106可以轮询队列104以获取消息,并将消息发送到流数据平台107。队列104可以在分布式消息队列系统105上被配置为客户端106的事件源。当事件发生时,消息或消息的记录是在队列104中,并且该事件可由客户端106触发和检测。客户端106可以从队列104检索一个或多个消息,并将一个或多个消息作为单个消息或成批(例如,五个消息)发送。
在一些实施例中,每个服务应用103可以被配置有相关联的队列104,并且客户端106可以基于哪个队列104具有数据来处理数据和向流数据平台107发送数据。在其他情况下,单个队列104可被用于所有服务应用103,并且客户端106可以基于消息信封自动发现流数据平台107的正确消费者。
分布式消息队列客户端106可以确定发布到流数据平台107的消息是被成功递送还是未成功递送。在某些情况下,流数据平台107可能无法处理和/或接收来自分布式消息队列客户端106的一个或多个消息。例如,流数据平台107可以节流数据,错误可能返回给客户端106,平台107可能不响应等。当一个或多个消息的发布不成功时,分布式消息队列客户端106可以重试将一个或多个消息发布到流数据平台107,并且直到一个或多个消息被成功递送或一段时间到期。在一些情况下,分布式消息客户端106可以将未成功递送的一个或多个消息发送回队列104,直到重试发布之前的某个稍后的时间点。例如,客户端106可以将一个或多个消息发送回队列104,等待1000ms,并且然后重试将一个或多个消息发布到流数据平台107。注意,实施例不限于这种方式,并且时间是可配置的。
在一些实施例中,如前所述,服务应用103可以将所有消息发送到分布式消息队列系统105,以发送到流数据平台107。例如,服务应用103可以将重定向功能设置为真,例如,“redirect.sdp.messages.to.sqs=true”,以将所有消息发送到分布式消息队列系统105并由分布式消息队列服务102处理。例如,服务应用103可以经由包括流数据和已处理的流数据的API消息与分布式消息队列服务102通信。分布式消息队列服务102可以将每个消息存储在队列104以用于发布到流数据平台107。在一个示例中,服务应用103可以调用SQS的“SendMessageRequest”实例,并包括队列104的名称和消息的主体。然后将请求传递给分布式消息队列服务102发送消息方法,该方法可以将发送消息响应对象返回给服务应用103。
分布式消息队列系统105可以利用分布式消息队列客户端106将队列104中存储的一个或多个消息中的每一个发布到流数据平台107。例如,分布式消息队列客户端106可以轮询队列104以获取一个或多个消息,并将一个或多个消息发送到流数据平台107。如上所述,队列104可以在分布式消息队列系统105上被配置为客户端106的事件源。当事件发生时,消息或消息的记录是在队列104中,并且事件可以由客户端106触发和检测。客户端106可以从队列104检索一个或多个消息,并将一个或多个消息作为单个消息或成批(例如五条消息)发送。
在某些情况下,一个或多个消息可能不会被递送到流数据平台107。分布式消息队列客户端106可以确定发布到流数据平台107的消息是被成功递送还是未成功递送。在这些情况下,分布式消息队列客户端106可以将未递送的消息存储回队列104中,并且分布式消息队列服务102可以将发生递送失败的对象返回给服务应用103。在实施例中,服务应用103可以基于从分布式消息队列系统105接收到的信息来确定发生的递送失败。在这些情况下,服务应用103可以将未递送的消息直接传送到流数据平台107。
在实施例中,服务应用103可以重试将未递送的消息发送到流数据平台107。例如,服务应用103可以确定直接传送到流数据平台107的消息是成功还是不成功,并在消息不成功时重试传送消息到流数据平台107。服务应用103可以继续重试以传送消息,直到向流数据平台107的递送成功或者定义的时间段已经到期。
图2A示出了系统100的示例处理流程200,其中除非出现故障,否则服务应用103直接向流数据平台107发送消息。在所示的处理流程200中,消息在没有故障的情况下被递送到流数据平台107。
在处理流程200的线201处,服务应用103可以从一个或多个数据提供者系统101接收流数据。如上所述,流数据可以包括执行银行功能的数据。服务应用103可以接收数据、处理数据、并生成流数据的一个或多个消息以发送到流数据平台107。
在线203处,服务应用103可以向流数据平台107发送一个或多个消息。在线205处,流数据平台107可以向消费者系统109发送消息。在某些情况下,流数据平台107可以发布数据,并且消费者系统109中的一个或多个可以订阅和接收特定数据。例如,建模系统108和认证系统110可以订阅和接收认证数据。在另一个示例中,欺诈系统112可以订阅和接收欺诈数据。在第三个示例中,建模系统108可以订阅和接收模型数据。
图2B示出了与图2A相同配置的示例处理流程220;但是,会处理故障事件。
在处理流程220的线221处,服务应用103可以从一个或多个数据提供者系统101接收流数据。服务应用103可以接收数据、处理数据,并生成流数据的一个或多个消息以发送到流数据平台107。
在线223处,服务应用103可以将一个或多个消息发送到流数据平台107。在某些情况下,服务应用103可以检测到一个或多个消息向流数据平台107的递送失败,如虚线225所示。在这些情况下,服务应用103可以被配置为将失败消息重定向到分布式消息队列系统。
在线227处,服务应用103可以经由API将一个或多个失败消息传送到分布式消息队列系统105。例如,服务应用103可以调用或利用分布式消息队列服务102,并发送一个或多个消息以存储在队列104中。
在实施例中,在线229处,包括分布式消息队列客户端106的分布式消息队列系统105可以将消息发布或发送到流数据平台107。在某些情况下,流数据平台107可能无法接收消息和/或消息可能无法递送到流数据平台107。分布式消息队列客户端106可以确定发布到流数据平台107的消息是被成功递送还是未成功递送。
在虚线231处,分布式消息队列系统105可以确定未被流数据平台107接收到的一个或多个消息。例如,流数据平台107可以节流数据,错误可能返回给客户端106,平台107可能不响应等。此外,在线231处,当一个或多个消息的发布不成功时并且直到一个或多个消息被成功递送或一段时间到期,分布式消息队列客户端106可以重试将一个或多个消息发布到流数据平台107。
在线233处,流数据平台107可以向消费者系统109发送消息。流数据平台107可以发布数据,并且消费者系统109中的一个或多个可以订阅和接收特定数据。
图3A示出了系统100的示例处理流程300,其中服务应用103将所有消息发送到分布式消息队列系统105,以进一步发送到流数据平台107。在所示的处理流程300中,消息在没有故障的情况下被递送到流数据平台107。
在处理流程300的线301处,服务应用103可以从一个或多个数据提供者系统101接收流数据。服务应用103可以接收数据,处理数据,并生成流数据的一个或多个消息以发送到流数据平台107。
在线303处,服务应用103可以向分布式消息队列系统105发送一个或多个消息。例如,服务应用103可以经由包括流数据和已处理的流数据的API消息与分布式消息队列服务102通信。分布式消息队列服务102可以将每个消息存储在队列104,用于发布到流数据平台107的。
在线305处,分布式消息队列系统105可以利用分布式消息队列客户端106将存储在队列104中的一个或多个消息中的每一个发布到流数据平台107。客户端106可以从队列104检索一个或多个消息,并将一个或多个消息作为单个消息或成批发送,如前所述。此外,并且在线307处,流数据平台107可以向消费者系统109发送消息。
图3B示出了类似于图3A的示例处理流程320。然而,在该示例处理流程320中,分布式消息队列系统105可能无法将一个或多个消息递送到流数据平台107。在线321、323和325处,流系统120分别执行与对应的线301、303和305相同的操作。
然而,如所提及的,某些一个或多个消息可能没有被递送到流数据平台107。在线327处,分布式消息队列客户端106可以确定发布到流数据平台107的消息是被成功递送还是未成功递送。在这些情况下,分布式消息队列客户端106可以将未递送的消息存储回队列104中,并且分布式消息队列服务102可以将在线329处发生递送失败的对象返回给服务应用103。在实施例中,服务应用103可以基于从分布式消息队列系统105接收到的信息来确定发生的递送失败。
在实施例中,在线331处,服务应用103可以将未递送的消息直接传送到流数据平台107。如果由服务应用103将消息传送到流数据平台107,则服务应用103可以在线333处重试将未递送的消息发送到流数据平台107。例如,服务应用103可以确定直接传送到流数据平台107的消息是成功还是不成功,并在消息不成功时重试将消息传送到流数据平台107。服务应用103可以继续重试以传送消息,直到向流数据平台107的递送成功或者定义的时间段已经到期。在线335处,流数据平台107可以将接收到的消息传送给消费者系统109,如前所述。
图4示出了逻辑流程400的示例,其可以代表由流系统100执行的一个或多个操作,以提供弹性流能力。
在块410处,逻辑流程400包括从数据服务提供者接收流数据。在一个示例中,消费者系统109可以利用流数据来提供关键功能,诸如银行计算系统中的发现的那些功能。在实施例中,流系统120的服务应用103可以基于服务应用103接收流数据。服务应用103可以对流数据执行操作和处理,以为系统100提供服务。
在块420处,逻辑流程400包括将包括流数据的一个或多个消息传送到流数据平台。该消息可以由服务应用103根据流数据平台107生成。例如,流数据平台107可以处理格式的数据,并且可以利用基于二进制传输控制协议(TCP)的协议。此外,服务应用103可以利用生产者API以Java格式发布记录流。在其他情况下,流数据平台可以利用消费者API并订阅Java格式的记录流。
在块430处,逻辑流程400包括检测向流数据平台的消息的递送失败。例如,服务应用103可以接收消息不能被递送或递送超时的指示。
在块440处,基于故障检测,逻辑流程400包括将消息传送到分布式消息队列服务。例如,服务应用103可以利用由分布式消息队列系统105的服务102提供的API。
在块450处,逻辑流程400包括将消息发布到流数据平台。更具体地,分布式消息队列客户端106可以从队列104检索失败的消息,并将其发布到流数据平台107。
在块460处,逻辑流程400包括确定发布到流数据平台的消息是成功还是不成功。例如,分布式消息队列客户端106可以确定消息是否被递送。响应于确定消息被递送到流数据平台,客户端106可以经由服务102和返回对象通知服务应用103。
在块470处,逻辑流程400包括在消息发布不成功时重试将消息发布到流数据平台。在一些情况下,客户端106可以立即重试发送消息,例如,一旦客户端106知道递送失败。在其他情况下,客户端106可以将消息返回到队列104,并可以在稍后的时间点重试。例如,队列104可以是先进先出队列,并且一旦失败消息通过队列,客户端106就可以重试。
图5示出了逻辑流程500的示例,其可以代表由流系统120执行的一个或多个操作,以提供弹性流能力。
在块510处,逻辑流程500包括从数据服务提供者接收流数据,如关于流程400的块410类似地讨论的。
在块520处,逻辑流程500包括将包括数据的一个或多个消息传送到分布式消息队列服务系统上的分布式消息队列服务。分布式消息队列服务系统可以将一个或多个消息中的每一个存储在队列中,用于发布到流数据平台。
在块530处,逻辑流程500包括将存储在队列中的一个或多个消息中的每一个发布到流数据平台。更具体地,分布式消息队列系统的分布式消息队列服务客户端可以发布存储在队列中的消息,以由流数据平台107接收。
在块540处,逻辑流程500包括确定由分布式消息队列服务客户端发布的一个或多个消息中的消息向流数据平台的递送失败。更具体地,服务应用103可以例如经由返回对象接收以下指示:递送来自分布式消息队列系统105的一个或多个消息失败。在块550处,逻辑流程500包括通过服务应用将消息直接传送到流数据平台。
如图6所示,计算架构600可以包括具有处理单元604、系统存储器606和系统总线608的计算机602。处理单元604可以是各种商用处理器中的任何一种,或者可以是专门设计的处理器。
系统总线608为系统组件提供接口,包括但不限于系统存储器606和处理单元604之间的接口。系统总线608可以是几种类型总线结构中的任何一种,该总线结构可以进一步互连到存储器总线(有或没有存储器控制器)、外围总线,以及使用各种商用总线架构中的任何一种的局部总线。
系统存储器606可以包括任何类型的计算机可读存储介质,包括任何类型的易失性和非易失性存储器。计算机602可以包括任何类型的计算机可读存储介质,包括内部(或外部)硬盘驱动器(HDD)614。在各种实施例中,计算机602可以包括任何其他类型的磁盘驱动器,例如,诸如磁软盘和/或光盘驱动器。HDD 614可以通过HDD接口624被连接到系统总线608。
在各种实施例中,可以将任意数量的程序模块存储在驱动器和系统存储器606和/或614中,例如,诸如操作系统630、一个或多个应用632、其他程序模块634和程序数据636。
用户可以通过一个或多个有线/无线输入设备,例如诸如键盘638和定点设备,诸如鼠标640,将命令和信息输入到计算机602中。这些输入设备和其他输入设备可以通过耦合到系统总线608的输入设备接口642连接到处理单元604。监视器644或其他类型的显示设备也可以经由接口(诸如视频适配器646)连接到系统总线608。监视器644可以在计算机602的内部或外部。
计算机602可以使用经由有线和/或无线通信到一台或多台远程计算机(诸如远程计算机648)的逻辑连接而在联网环境中运行。远程计算机648可以是工作站、服务器计算机、路由器、个人计算机、便携式计算机、基于微处理器的娱乐设备、智能手机、平板电脑、对等设备或其他公共网络节点,并且通常包括相对于计算机602描述的许多或所有元件。所描绘的逻辑连接包括到网络652(例如,诸如局域网(LAN)和/或更大的网络,例如广域网(WAN))的有线和/或无线连接。网络652可以提供到全球通信网络(例如,诸如互联网)的连接。网络适配器656可以促进到网络652的有线和/或无线通信。计算机602可以可操作以根据任何已知的计算机联网技术、标准或协议,通过任何已知的有线或无线通信技术、标准或协议进行通信。
应当注意,本文所描述的方法不必按所描述的顺序或任何特定顺序执行。此外,可以以串行或并行方式执行关于本文所识别的方法描述的各种活动。
尽管本文已经说明和描述了特定实施例,但应当理解,为实现相同目的而计算的任何布置都可以替代所示的特定实施例。本公开旨在涵盖各种实施例的任何和所有修改或变化。应当理解,以上描述是以说明性的方式而不是限制性的方式进行的。在阅读以上描述后,上述实施例的组合和本文中未具体描述的其他实施例对于本领域技术人员来说将是显而易见的。因此,各种实施例的范围包括使用上述组合物、结构和方法的任何其他应用。
需要强调的是,提供本公开的摘要以符合37C.F.R.§1.72(b),要求摘要允许读者快速确定技术公开的性质。提交它是基于这样的理解,它不会被用于解释或限制权利要求的范围或含义。此外,在前面的详细描述中,可以看出,出于简化本公开的目的,在单个实施例中将各种特征组合在一起。该公开方法不应被解释为反映了要求保护的实施例需要比每个权利要求中明确叙述的更多特征的意图。相反,如以下权利要求所反映的,新颖的主题不在于单个公开实施例的所有特征。因此,以下权利要求在此被并入到详细描述中,每个权利要求本身作为单独的优选实施例独立存在。在所附权利要求中,术语“包括”和“其中”分别用作相应术语“包括”和“其中”的纯英语等价物。此外,术语“第一”、“第二”和“第三”等仅用作标签,并不旨在对其对象施加数字要求。
尽管已经以特定于结构特征和/或方法动作的语言描述了主题,但应当理解,所附权利要求中定义的主题不一定局限于上述特定特征或动作。相反,上述具体特征和动作被公开为实施权利要求的示例形式。
Claims (18)
1.一种计算机实施的方法,包括:
由服务应用从数据服务提供者接收数据;
由所述服务应用将包括所述数据的一个或多个消息传送到流数据平台,其中所述服务应用最初将所述一个或多个消息直接传送到所述流数据平台而不使用基于云的队列;
由所述服务应用检测所述一个或多个消息中的消息向所述流数据平台的递送失败;
由所述服务应用并经由应用编程接口(API)将所述消息传送到基于云的分布式消息队列服务,其中,响应于所述递送失败将所述消息传送到所述基于云的分布式消息队列服务并存储在基于云的分布式消息队列系统的基于云的队列中;
由所述基于云的分布式消息队列系统的基于云的分布式消息队列服务客户端将所述消息发布到所述流数据平台;
由所述基于云的分布式消息队列服务客户端确定发布到所述流数据平台的所述消息被所述流数据平台成功接收到还是没有成功接收到;以及
当所述消息的发布不成功时,由所述分布式消息队列服务客户端重试将所述消息发布到所述流数据平台。
2.根据权利要求1所述的计算机实施的方法,其中,检测所述递送失败包括检测递送超时、连接失败或其组合中的至少一个。
3.根据权利要求1所述的计算机实施的方法,其中,重试发布发生,直到向所述流数据平台的递送是成功的或定义的时间段已经到期。
4.根据权利要求1所述的计算机实施的方法,包括由所述基于云的分布式消息队列服务将所述消息存储在所述基于云的分布式消息队列系统的基于云的队列中,直到所述消息被成功发布到所述流数据平台或定义的时间段已经到期。
5.根据权利要求4所述的计算机实施的方法,包括当所述消息的发布不成功时,由所述基于云的分布式消息队列服务客户端将所述消息传送到所述基于云的分布式消息队列系统以存储在所述基于云的队列中。
6.根据权利要求1所述的计算机实施的方法,其中,所述数据包括认证数据、欺诈数据、模型数据或其组合中的至少一个,并且所述服务应用包括认证引擎、欺诈引擎或建模引擎中的一个。
7.根据权利要求6所述的计算机实施的方法,包括由所述流数据平台将所述认证数据发布到认证系统和建模系统中的至少一个。
8.根据权利要求6所述的计算机实施的方法,包括由所述流数据平台将所述欺诈数据发布到欺诈检测系统。
9.根据权利要求6所述的计算机实施的方法,包括由所述流数据平台将所述模型数据发布到建模系统。
10.一种系统,包括:
存储器;以及
与所述存储器耦合的一个或多个处理器,所述一个或多个处理器被配置为:
由服务应用从数据服务提供者接收数据;
由所述服务应用将包括所述数据的多个消息直接传送到流数据平台而不使用基于云的分布式消息队列系统的基于云的队列,除非检测到一个或多个失败;
由所述服务应用检测所述多个消息中的至少一个消息向所述流数据平台的递送失败;
由所述服务应用将所述多个消息中的至少一个消息传送到基于云的分布式消息队列系统的基于云的分布式消息队列服务,其中,所述多个消息中的至少一个消息响应于所述递送失败被传送到所述基于云的分布式消息队列服务,并存储在基于云的所述分布式消息队列系统的基于云的队列中以发布到所述流数据平台;
由所述基于云的分布式消息队列系统的基于云的分布式消息队列服务客户端将所述多个消息中的至少一个消息发布到所述流数据平台;
由所述基于云的分布式消息队列服务客户端确定发布到所述流数据平台的所述多个消息中的至少一个消息中的每个消息被成功发布到还是没有成功发布到所述流数据平台;以及
由所述分布式消息队列服务客户端重试将所述多个消息中的至少一个消息的每个未成功发布的消息发布到所述流数据平台。
11.根据权利要求10所述的系统,其中,所述一个或多个处理器用于重试发布,直到向所述流数据平台的递送成功或定义的时间段已经到期。
12.根据权利要求10所述的系统,其中所述一个或多个处理器被配置为基于检测递送超时、连接失败或其组合中的至少一个来检测所述递送失败。
13.根据权利要求10所述的系统,所述一个或多个处理器被配置为通过所述基于云的分布式消息队列服务将所述多个消息中的至少一个消息中的消息存储在所述基于云的分布式消息队列系统的基于云的队列中,直到所述消息被成功发布到所述流数据平台或定义的时间段已经到期。
14.根据权利要求13所述的系统,所述一个或多个处理器被配置为在所述消息的发布不成功时,由所述基于云的分布式消息队列服务客户端将所述消息传送到所述基于云的分布式消息队列系统以存储在所述基于云的队列中。
15.根据权利要求10所述的系统,其中,所述数据包括认证数据、欺诈数据、模型数据或其组合中的至少一个,并且所述服务应用包括认证引擎、欺诈引擎或建模引擎中的一个。
16.根据权利要求15所述的系统,其中所述一个或多个处理器被配置为通过所述流数据平台将所述认证数据发布到认证系统和建模系统中的至少一个。
17.根据权利要求15所述的系统,其中所述一个或多个处理器被配置为通过所述流数据平台将所述欺诈数据发布到欺诈检测系统。
18.根据权利要求15所述的系统,其中所述一个或多个处理器被配置为通过所述流数据平台将所述模型数据发布到建模系统。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202410484862.XA CN118363773A (zh) | 2020-01-14 | 2021-01-14 | 利用分布式消息队列系统提供流数据弹性的技术 |
Applications Claiming Priority (5)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
US202062961059P | 2020-01-14 | 2020-01-14 | |
US62/961,059 | 2020-01-14 | ||
US16/881,469 US11509619B2 (en) | 2020-01-14 | 2020-05-22 | Techniques to provide streaming data resiliency utilizing a distributed message queue system |
US16/881,469 | 2020-05-22 | ||
PCT/US2021/013345 WO2021146363A1 (en) | 2020-01-14 | 2021-01-14 | Techniques to provide streaming data resiliency utilizing a distributed message queue system |
Related Child Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202410484862.XA Division CN118363773A (zh) | 2020-01-14 | 2021-01-14 | 利用分布式消息队列系统提供流数据弹性的技术 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN115280740A CN115280740A (zh) | 2022-11-01 |
CN115280740B true CN115280740B (zh) | 2024-05-10 |
Family
ID=76763436
Family Applications (2)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202410484862.XA Pending CN118363773A (zh) | 2020-01-14 | 2021-01-14 | 利用分布式消息队列系统提供流数据弹性的技术 |
CN202180021072.2A Active CN115280740B (zh) | 2020-01-14 | 2021-01-14 | 利用分布式消息队列系统提供流数据弹性的技术 |
Family Applications Before (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202410484862.XA Pending CN118363773A (zh) | 2020-01-14 | 2021-01-14 | 利用分布式消息队列系统提供流数据弹性的技术 |
Country Status (6)
Country | Link |
---|---|
US (2) | US11509619B2 (zh) |
EP (2) | EP4369684A3 (zh) |
CN (2) | CN118363773A (zh) |
CA (1) | CA3164843A1 (zh) |
PL (1) | PL4091315T3 (zh) |
WO (1) | WO2021146363A1 (zh) |
Families Citing this family (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN113506178A (zh) * | 2021-07-27 | 2021-10-15 | 中国工商银行股份有限公司 | 交易引流数据统计分析方法、装置、设备、介质和程序产品 |
CN115001998B (zh) * | 2022-04-26 | 2024-02-23 | 北京贝壳时代网络科技有限公司 | 一种消息服务的容灾方法和装置 |
Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN104753817A (zh) * | 2013-12-25 | 2015-07-01 | 中国移动通信集团公司 | 一种云计算消息队列服务本地模拟方法和系统 |
CN110196885A (zh) * | 2019-06-13 | 2019-09-03 | 东方电子股份有限公司 | 一种云化分布式实时数据库系统 |
Family Cites Families (18)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US6832243B1 (en) * | 2000-08-15 | 2004-12-14 | International Business Machines Corporation | Methods and apparatus for defining, observing and evaluating message delivery outcome on a per-message basis |
US7093025B1 (en) * | 2000-10-04 | 2006-08-15 | International Business Machines Corporation | SMTP extension for email delivery failure |
US7177917B2 (en) * | 2000-12-27 | 2007-02-13 | Softwired Ag | Scaleable message system |
US7337234B2 (en) * | 2002-04-05 | 2008-02-26 | Oracle International Corporation | Retry technique for multi-tier network communication systems |
US7350184B2 (en) * | 2002-05-02 | 2008-03-25 | Bea Systems, Inc. | System and method for enterprise application interactions |
US7185034B2 (en) | 2002-08-01 | 2007-02-27 | Oracle International Corporation | Buffered message queue architecture for database management systems with guaranteed at least once delivery |
US8230019B2 (en) * | 2003-07-17 | 2012-07-24 | International Business Machines Corporation | Alerting electronic mail users of undeliverable recipients |
US7996849B2 (en) * | 2007-02-14 | 2011-08-09 | International Business Machines Corporation | Method, apparatus and software for managing a transactional message queue |
US10841839B2 (en) * | 2009-01-28 | 2020-11-17 | Headwater Research Llc | Security, fraud detection, and fraud mitigation in device-assisted services systems |
US8266290B2 (en) * | 2009-10-26 | 2012-09-11 | Microsoft Corporation | Scalable queues on a scalable structured storage system |
US20110185237A1 (en) * | 2010-01-28 | 2011-07-28 | Futurewei Technologies, Inc. | System and Method for Delivering Messages |
US8595328B2 (en) * | 2010-11-03 | 2013-11-26 | International Business Machines Corporation | Self-updating node controller for an endpoint in a cloud computing environment |
CA3128629A1 (en) | 2015-06-05 | 2016-07-28 | C3.Ai, Inc. | Systems and methods for data processing and enterprise ai applications |
US10212120B2 (en) | 2016-04-21 | 2019-02-19 | Confluent, Inc. | Distributed message queue stream verification |
US20200348662A1 (en) * | 2016-05-09 | 2020-11-05 | Strong Force Iot Portfolio 2016, Llc | Platform for facilitating development of intelligence in an industrial internet of things system |
EP3635915A1 (en) | 2017-06-09 | 2020-04-15 | Equinix, Inc. | Near real-time messaging service for data center infrastructure monitoring data |
US10277524B1 (en) | 2018-02-23 | 2019-04-30 | Capital One Services, Llc | Monitoring data streams and scaling computing resources based on the data streams |
US11863470B2 (en) * | 2020-11-11 | 2024-01-02 | Musarubra Us Llc | Systems and methods for delivery using a message queue |
-
2020
- 2020-05-22 US US16/881,469 patent/US11509619B2/en active Active
-
2021
- 2021-01-14 PL PL21704123.5T patent/PL4091315T3/pl unknown
- 2021-01-14 EP EP24163977.2A patent/EP4369684A3/en active Pending
- 2021-01-14 CA CA3164843A patent/CA3164843A1/en active Pending
- 2021-01-14 EP EP21704123.5A patent/EP4091315B1/en active Active
- 2021-01-14 WO PCT/US2021/013345 patent/WO2021146363A1/en unknown
- 2021-01-14 CN CN202410484862.XA patent/CN118363773A/zh active Pending
- 2021-01-14 CN CN202180021072.2A patent/CN115280740B/zh active Active
-
2022
- 2022-10-17 US US17/967,060 patent/US12074840B2/en active Active
Patent Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN104753817A (zh) * | 2013-12-25 | 2015-07-01 | 中国移动通信集团公司 | 一种云计算消息队列服务本地模拟方法和系统 |
CN110196885A (zh) * | 2019-06-13 | 2019-09-03 | 东方电子股份有限公司 | 一种云化分布式实时数据库系统 |
Also Published As
Publication number | Publication date |
---|---|
US20210218699A1 (en) | 2021-07-15 |
WO2021146363A1 (en) | 2021-07-22 |
US11509619B2 (en) | 2022-11-22 |
EP4369684A2 (en) | 2024-05-15 |
EP4369684A3 (en) | 2024-08-07 |
EP4091315B1 (en) | 2024-05-01 |
EP4091315A1 (en) | 2022-11-23 |
CN118363773A (zh) | 2024-07-19 |
CA3164843A1 (en) | 2021-07-22 |
US12074840B2 (en) | 2024-08-27 |
EP4091315C0 (en) | 2024-05-01 |
US20230037390A1 (en) | 2023-02-09 |
CN115280740A (zh) | 2022-11-01 |
PL4091315T3 (pl) | 2024-08-26 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
JP6831412B2 (ja) | 電子メッセージの転送を制御するためのインターフェース、システム、方法及びコンピュータプログラム製品 | |
US6760861B2 (en) | System, method and apparatus for data processing and storage to provide continuous operations independent of device failure or disaster | |
CN111371892A (zh) | 高并发分布式消息推送系统及方法 | |
US8868461B2 (en) | Electronic trading platform and method thereof | |
US8825798B1 (en) | Business event tracking system | |
US12074840B2 (en) | Techniques to provide streaming data resiliency utilizing a distributed message queue system | |
US9015731B2 (en) | Event handling system and method | |
US10936591B2 (en) | Idempotent command execution | |
SA517381770B1 (ar) | جهاز، نظام، طريقة ومنتج برنامج كومبيوتر لمعالجة طلبات معالجة إلكترونية | |
US7257611B1 (en) | Distributed nonstop architecture for an event processing system | |
WO2023207146A1 (zh) | Esop系统的服务模拟方法、装置、设备及存储介质 | |
US11677618B2 (en) | Systems and methods for real-time processing and transmitting of high-priority notifications | |
US20150371327A1 (en) | System for dynamically selecting a communications fabric | |
US8429451B2 (en) | Method of handling a message | |
CN113312649A (zh) | 报文处理方法、装置、设备、系统及存储介质 | |
CN109412890A (zh) | 基于dds的联合试验平台中间件节点状态检测方法 | |
CN113010379B (zh) | 电子设备监控系统 | |
CN113177052A (zh) | 一种分布式系统业务数据一致性处理方法、装置 | |
CN111552907A (zh) | 消息处理方法、装置、设备和存储介质 | |
CN115775148B (zh) | 基于消息机的医院信息系统支付处理方法、装置及设备 | |
US11836550B2 (en) | Systems and methods for moving, reconciling, and aggregating data from mainframe computers to hybrid cloud | |
CN116962513B (zh) | 一种金融行情合约数据接收方法及装置 | |
US20240054487A1 (en) | Method and system for event notification | |
JP2018181012A (ja) | 業務連携システムおよび業務連携方法 | |
CN115496583A (zh) | 异步账务数据处理方法、装置、设备及存储介质 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
REG | Reference to a national code |
Ref country code: HK Ref legal event code: DE Ref document number: 40083650 Country of ref document: HK |
|
GR01 | Patent grant | ||
GR01 | Patent grant |