CN111949424A - Method for realizing queue for processing declarative events - Google Patents
Method for realizing queue for processing declarative events Download PDFInfo
- Publication number
- CN111949424A CN111949424A CN202010988989.7A CN202010988989A CN111949424A CN 111949424 A CN111949424 A CN 111949424A CN 202010988989 A CN202010988989 A CN 202010988989A CN 111949424 A CN111949424 A CN 111949424A
- Authority
- CN
- China
- Prior art keywords
- event
- processing
- queue
- events
- repeater
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
- 238000000034 method Methods 0.000 title claims abstract description 26
- 230000007704 transition Effects 0.000 claims abstract description 9
- 230000003111 delayed effect Effects 0.000 claims description 6
- 238000004140 cleaning Methods 0.000 claims description 2
- 230000003247 decreasing effect Effects 0.000 claims description 2
- 230000001934 delay Effects 0.000 abstract description 2
- 230000009286 beneficial effect Effects 0.000 description 1
- 238000012795 verification Methods 0.000 description 1
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4843—Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
- G06F9/4881—Scheduling strategies for dispatcher, e.g. round robin, multi-level priority queues
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Retry When Errors Occur (AREA)
Abstract
The invention relates to the field of cloud platform event processing, in particular to a method for realizing a queue for processing an acoustic event, which comprises the following steps: s1, creating a queue, checking by a de-repeater before the event is added into the queue, wherein the de-repeater only allows the event which does not exist to be added into the queue; s2, arranging a plurality of events in a queue in a heap mode according to the priority order; s3, acquiring the frequency of the event from the queue through the speed limiter control processor; s4, the processor carries out state transition processing on the event, if the processing is successful, the reference of the event in the de-repeater is removed, and the state transition processing of the next event is carried out, if the processing is failed, the event which is failed in the processing of the processor is added into the de-repeater again after the back-off device records the failure times, reduces the priority and delays the processing time; the problems of repeated processing, overload processing and concurrent processing of normal events of the failure events caused by the fact that the failure event return queue is taken as the normal event to continue processing in the conventional process are solved.
Description
Technical Field
The invention relates to the technical field of cloud platform event processing, in particular to an implementation method for processing a queue of an acoustic event.
Background
At present, the existing flow for a cloud platform to realize the transition from the current state of a resource to the desired state is as follows:
firstly, inquiring and acquiring the current state of a specified resource, and putting the current state into a queue to be processed; step two, acquiring the event to be processed from the queue, and comparing the difference between the current state and the expected state; step three, processing the change of resources according to the difference; step four, if the processing is successful, the processing of the resource is finished; and step five, if the processing fails, continuing to process according to the state difference.
The prior process has the following technical problems:
1. if a state change event of one resource already exists in the queue, when the current event is not processed and another event of the same resource is received, the events are repeatedly processed, and particularly when the load of the cloud platform is large, a lot of computing resources are repeatedly wasted;
2. the components for acquiring the queue in the cloud platform and the components for processing the resource change in the queue are generally performed asynchronously, when the processing rates are inconsistent, the speed of acquiring the queue from the queue is fast, but the speed of processing the resource change is slow, so that a large number of blocked threads are accumulated in the system, and the processing speed of the system is further reduced;
3. after a resource change event is acquired from the queue, but the processing fails, the event usually repeatedly enters the queue, and if the cloud platform is loaded heavily, a large number of events which fail to be processed are repeatedly enqueued, so that the normal enqueued events are influenced, and the processing efficiency of the whole cloud platform is reduced.
4. Since the components for acquiring the queue and the components for processing the resource change in the queue are generally performed asynchronously, in a high concurrency situation, two events of the same resource are processed, which may cause inconsistency of states of resources in the cloud platform and generate an error.
Disclosure of Invention
Based on the above problems, the invention provides a method for implementing a queue for processing an declarative event, which solves the problems of repeated processing, overload processing and concurrent processing of normal events of a failure event caused by the fact that the conventional process takes a failure event return queue as a normal event to continue processing.
In order to solve the technical problems, the technical scheme adopted by the invention is as follows:
an implementation method for processing a queue of declarative events, including the steps of:
s1, creating a queue, checking by a de-repeater before the event is added into the queue, wherein the de-repeater only allows the event which does not exist to be added into the queue;
s2, arranging a plurality of events according to the priority order by taking the heap as a data structure in the queue;
s3, acquiring the interval time and the number of events from the queue through the speed limiter control processor;
and S4, the processor performs state transition processing on the event, if the processing is successful, the reference of the event in the de-repeater is cleared and the state transition processing of the next event is performed, and if the processing is failed, the event which is failed in the processing of the processor is recorded by the back-off device, the failure frequency is reduced, the priority is lowered, and the processing time is delayed, and then the event is added into the de-repeater again.
Further, in step S1, before the event passes through the deduplication device, the event generates a unique key identifier by using a generating function, the deduplication device checks the unique key identifier, if the unique key identifier is not recorded in the deduplication device, the event normally joins the queue, and records the unique key identifier in the deduplication device, and if the unique key identifier is recorded in the deduplication device, the new event with the unique key identifier covers the old event with the same unique key identifier.
Further, the de-duplicator has a map data structure that is thread-safe.
Further, the speed limiter obtains a preset number of events from the queue at preset time intervals by a timer and returns the events to the processor.
Further, in step S4, the event that the processing fails is counted by the back-off device using a binary exponential back-off algorithm for the number of times of failure, and as the number of times of failure increases, the priority of the event that the processing fails is correspondingly decreased, and the processing time is delayed.
Further, the binary exponential backoff algorithm is that the number of times that the event failed in the processing passes through the backoff device is n, the priority is reduced, the deferred processing time is m, and n and m satisfy the formula: m is 2 × n.
Further, after the event which fails to be processed is successfully processed by the processor, a cleaning function is called to process the reference of the event which fails to be processed in the back-off device and the de-repeater.
Compared with the prior art, the invention has the beneficial effects that:
1. through the arrangement of the de-weighting device, repeated processing on the same event is reduced, the running efficiency of the whole cloud platform is greatly improved, and the events are arranged in the queue in a stacking mode according to the priority sequence, so that the processing speed of the events can be integrally improved;
2. the overspeed governor is additionally arranged between the queue and the processor, so that the events taken out of the queue can be ensured not to exceed the preset number under the asynchronous condition, the system can be ensured not to run in an overload manner, and the condition that the system is crushed by excessive requests can be effectively avoided;
3. the events which fail to be processed are processed by the back-off device, so that the events which normally join the queue are separated from the events which fail to be processed, the processing of the normal events cannot be influenced by the events which fail to be processed when the events are obtained, and the processing efficiency is improved;
4. the component for acquiring the queue is changed into the speed limiter, and the processor and the speed limiter are asynchronously operated, so that two events of the same resource are prevented from being processed under the high concurrence condition, and the robustness of the system is improved.
Drawings
FIG. 1 is a flow chart of the present invention.
Detailed Description
The invention will be further described with reference to the accompanying drawings. Embodiments of the present invention include, but are not limited to, the following examples.
As shown in fig. 1, an implementation method for processing a queue of declarative events includes the following steps:
s1, creating a queue, checking by a de-repeater before the event is added into the queue, wherein the de-repeater only allows the event which does not exist to be added into the queue;
when a queue is created, the capacity, the speed limit rate and a generating method of the event unique key identifier in each queue need to be appointed, wherein a generating function is used for generating a unique key identifier for an event, a de-duplication device is a section of check logic before the event is added into the queue and is used for judging whether the same event exists in the queue according to the unique key identifier, and in order to quicken the query of the unique key identifier, the de-duplication device is provided with a map data structure with thread safety.
When the event passes through the deduplication machine, the deduplication machine checks the unique key identifier, if the unique key identifier is not recorded in the deduplication machine, the event is normally added into the queue, the unique key identifier is recorded in the deduplication machine, if the unique key identifier is recorded in the deduplication machine, the new event with the unique key identifier covers the old event with the same unique key identifier, the purpose of queue deduplication is achieved through the deduplication machine, the event of each resource is processed in the queue only once, and the operation efficiency of the whole cloud platform is greatly improved.
S2, arranging a plurality of events according to the priority order by taking the heap as a data structure in the queue;
the queue is a priority queue implemented by a data structure such as a heap, and for an event with a high priority, the event will be at the head of the queue, and if a new event with a unique key identifier in step S1 covers an old event with the same unique key identifier, the events in the queue will be sorted again, so that the event with the high priority is arranged at the head of the queue.
S3, acquiring the interval time and the number of events from the queue through the speed limiter control processor;
the speed limiter obtains a preset number of events from the queue by a timer at preset time intervals and returns the events to the processor, wherein the preset time and the preset number are determined by appointed speed limiting rate parameters when the queue is created in the step S1, the processor indirectly obtains the events through the speed limiter, so that the number of threads of components for obtaining the queue and components for processing resource change in the queue is limited, the events taken out within a period of time are guaranteed not to exceed the preset number under an asynchronous condition, the system is guaranteed not to run in an overload mode, the components for obtaining the queue become the speed limiter, the processor and the speed limiter are asynchronously operated, two events of the same resource are prevented from being processed under the condition of high concurrency, and the robustness of the system is improved.
S4, the processor carries out state transition processing on the event, if the processing is successful, the reference of the event in the de-repeater is removed, and the state transition processing of the next event is carried out, if the processing is failed, the event which is failed in the processing of the processor is added into the de-repeater again after the back-off device records the failure times, reduces the priority and delays the processing time;
the newly added backoff device performs backoff processing on the event which fails in processing, so that the event which is normally added into the queue can be ensured to be distinguished from the event which fails in processing, the processing of the normal event cannot be influenced by the event which fails repeatedly, and the processing efficiency of the cloud platform is improved.
The event that fails to be processed is counted by the backoff device through a binary exponential backoff algorithm, the priority of the event that fails is correspondingly reduced along with the increase of the failure times, and the processing time is delayed, wherein the binary exponential backoff algorithm specifically comprises that the number of times that the event that fails to be processed passes through the backoff device is n, the priority is reduced, the delay processing time is m, and n and m satisfy the formula: for example, after an event processor fails to process, the processor returns an event with failed processing to the deduplication machine, the event with failed processing passes through the backoff before returning to the deduplication machine, the backoff counts the event, the event fails for 1 time, the priority-1, the deferred processing time +2s, the event fails for 2 times, the priority-2, the deferred processing time +4s, and so on, meanwhile, the event with failed processing with the unique key identifier enters the deduplication machine again with the reduced priority and the deferred processing time, the deduplication machine inquires the same unique key identifier, and then the original priority and the original processing time of the event are covered by the new event with the reduced priority and the deferred processing time, that is, under the condition of the same unique key identifier, the new event covers the original event, and the covering process triggers the heaping of the queue, the heap process reorders the events in the queue so that the events with high priority are arranged at the head of the queue.
If the event that fails processing is successfully processed by the processor, a clean up function is invoked to process the reference to the event that failed processing in the back-off and deduplication.
The above is an embodiment of the present invention. The specific parameters in the above embodiments and examples are only for the purpose of clearly illustrating the invention verification process of the inventor and are not intended to limit the scope of the invention, which is defined by the claims, and all equivalent structural changes made by using the contents of the specification and the drawings of the present invention should be covered by the scope of the present invention.
Claims (7)
1. An implementation method for processing a queue of declarative events, comprising the steps of:
s1, creating a queue, checking by a de-repeater before the event is added into the queue, wherein the de-repeater only allows the event which does not exist to be added into the queue;
s2, arranging a plurality of events according to the priority order by taking the heap as a data structure in the queue;
s3, acquiring the interval time and the number of events from the queue through the speed limiter control processor;
and S4, the processor performs state transition processing on the event, if the processing is successful, the reference of the event in the de-repeater is cleared and the state transition processing of the next event is performed, and if the processing is failed, the event which is failed in the processing of the processor is recorded by the back-off device, the failure frequency is reduced, the priority is lowered, and the processing time is delayed, and then the event is added into the de-repeater again.
2. An implementation method of a queue for processing declarative events as defined in claim 1, wherein: in step S1, before the event passes through the deduplication device, the deduplication device uses the generation function to generate a unique key identifier for the event, the deduplication device checks the unique key identifier, if the unique key identifier is not recorded in the deduplication device, the event is normally added to the queue, and the unique key identifier is recorded in the deduplication device, and if the unique key identifier is recorded in the deduplication device, the new event with the unique key identifier covers the old event with the same unique key identifier.
3. An implementation method of a queue for processing declarative events as defined in claim 2, wherein: the de-duplicator has a thread-safe map data structure.
4. An implementation method of a queue for processing declarative events as defined in claim 1, wherein: the speed limiter obtains a preset number of events from the queue at preset time intervals by a timer and returns the events to the processor.
5. An implementation method of a queue for processing declarative events as defined in claim 1, wherein: in step S4, the event that fails to be processed is counted by the back-off device using a binary exponential back-off algorithm, and as the number of failures increases, the priority of the event that fails is correspondingly decreased, and the processing time is delayed.
6. An implementation method of a queue for processing declarative events as defined in claim 5, wherein: the binary exponential backoff algorithm is characterized in that the number of times that the event failed in processing passes through the backoff device is n, the priority is reduced, the delayed processing time is m, and n and m meet the formula: m is 2 × n.
7. An implementation method of a queue for processing declarative events as defined in claim 6, wherein: and after the event which fails in the processing is successfully processed by the processor, calling a cleaning function to process the reference of the event which fails in the back-off device and the de-repeater.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202010988989.7A CN111949424A (en) | 2020-09-18 | 2020-09-18 | Method for realizing queue for processing declarative events |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202010988989.7A CN111949424A (en) | 2020-09-18 | 2020-09-18 | Method for realizing queue for processing declarative events |
Publications (1)
Publication Number | Publication Date |
---|---|
CN111949424A true CN111949424A (en) | 2020-11-17 |
Family
ID=73356881
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202010988989.7A Pending CN111949424A (en) | 2020-09-18 | 2020-09-18 | Method for realizing queue for processing declarative events |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN111949424A (en) |
Cited By (3)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN112817705A (en) * | 2021-01-22 | 2021-05-18 | 京东方科技集团股份有限公司 | Information release method, terminal equipment and information release system |
CN113010012A (en) * | 2021-03-09 | 2021-06-22 | 湖北亿咖通科技有限公司 | Method for configuring virtual character in vehicle-mounted entertainment system and computer storage medium |
CN116582501A (en) * | 2023-05-10 | 2023-08-11 | 北京志凌海纳科技有限公司 | Method and system for solving slow effect of Vpc-agent network configuration based on dynamic priority |
Citations (8)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US9311139B1 (en) * | 2015-05-29 | 2016-04-12 | Flexera Software Llc | Batch processing of oversubscribed system based on subscriber usage patterns |
CN107423120A (en) * | 2017-04-13 | 2017-12-01 | 阿里巴巴集团控股有限公司 | Method for scheduling task and device |
CN109104336A (en) * | 2018-09-27 | 2018-12-28 | 平安普惠企业管理有限公司 | Service request processing method, device, computer equipment and storage medium |
CN109388503A (en) * | 2018-09-20 | 2019-02-26 | 阿里巴巴集团控股有限公司 | A kind of event-handling method and device |
US20190205221A1 (en) * | 2017-12-28 | 2019-07-04 | Lendingclub Corporation | Error handling for services requiring guaranteed ordering of asynchronous operations in a distributed environment |
CN110290217A (en) * | 2019-07-01 | 2019-09-27 | 腾讯科技(深圳)有限公司 | Processing method and processing device, storage medium and the electronic device of request of data |
CN110297711A (en) * | 2019-05-16 | 2019-10-01 | 平安科技(深圳)有限公司 | Batch data processing method, device, computer equipment and storage medium |
CN110427257A (en) * | 2019-07-29 | 2019-11-08 | 招商局金融科技有限公司 | Multi-task scheduling method, device and computer readable storage medium |
-
2020
- 2020-09-18 CN CN202010988989.7A patent/CN111949424A/en active Pending
Patent Citations (8)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US9311139B1 (en) * | 2015-05-29 | 2016-04-12 | Flexera Software Llc | Batch processing of oversubscribed system based on subscriber usage patterns |
CN107423120A (en) * | 2017-04-13 | 2017-12-01 | 阿里巴巴集团控股有限公司 | Method for scheduling task and device |
US20190205221A1 (en) * | 2017-12-28 | 2019-07-04 | Lendingclub Corporation | Error handling for services requiring guaranteed ordering of asynchronous operations in a distributed environment |
CN109388503A (en) * | 2018-09-20 | 2019-02-26 | 阿里巴巴集团控股有限公司 | A kind of event-handling method and device |
CN109104336A (en) * | 2018-09-27 | 2018-12-28 | 平安普惠企业管理有限公司 | Service request processing method, device, computer equipment and storage medium |
CN110297711A (en) * | 2019-05-16 | 2019-10-01 | 平安科技(深圳)有限公司 | Batch data processing method, device, computer equipment and storage medium |
CN110290217A (en) * | 2019-07-01 | 2019-09-27 | 腾讯科技(深圳)有限公司 | Processing method and processing device, storage medium and the electronic device of request of data |
CN110427257A (en) * | 2019-07-29 | 2019-11-08 | 招商局金融科技有限公司 | Multi-task scheduling method, device and computer readable storage medium |
Cited By (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN112817705A (en) * | 2021-01-22 | 2021-05-18 | 京东方科技集团股份有限公司 | Information release method, terminal equipment and information release system |
CN113010012A (en) * | 2021-03-09 | 2021-06-22 | 湖北亿咖通科技有限公司 | Method for configuring virtual character in vehicle-mounted entertainment system and computer storage medium |
CN116582501A (en) * | 2023-05-10 | 2023-08-11 | 北京志凌海纳科技有限公司 | Method and system for solving slow effect of Vpc-agent network configuration based on dynamic priority |
CN116582501B (en) * | 2023-05-10 | 2024-02-09 | 北京志凌海纳科技有限公司 | Method and system for solving slow effect of Vpc-agent network configuration based on dynamic priority |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN111949424A (en) | Method for realizing queue for processing declarative events | |
US10884667B2 (en) | Storage controller and IO request processing method | |
CN112866136B (en) | Service data processing method and device | |
CN112463066B (en) | Method and equipment for controlling burst service quality of distributed block storage logical volume | |
US20240129251A1 (en) | Data processing method and apparatus, computer device, and readable storage medium | |
CN107038482A (en) | Applied to AI algorithm engineerings, the Distributed Architecture of systematization | |
CN117633116A (en) | Data synchronization method, device, electronic equipment and storage medium | |
CN110377398A (en) | A kind of method for managing resource, device and host equipment, storage medium | |
US20220335047A1 (en) | System and method for dynamic memory allocation for query execution | |
CN112995051B (en) | Network traffic recovery method and device | |
CN116991562B (en) | Data processing method and device, electronic equipment and storage medium | |
CN110888739B (en) | Distributed processing method and device for delayed tasks | |
CN107958414B (en) | Method and system for eliminating long transactions of CICS (common integrated circuit chip) system | |
CN110704223A (en) | Recovery system and method for single-node abnormity of database | |
CN115794446A (en) | Message processing method and device, electronic equipment and storage medium | |
US20230318908A1 (en) | Method for Handling Large-Scale Host Failures on Cloud Platform | |
CN109933436B (en) | Processing method and processing apparatus | |
CN107330064A (en) | A kind of method and system that small documents are created based on CIFS agreements | |
WO2020140623A1 (en) | Electronic device, metadata processing method and computer readable storage medium | |
CN113010278A (en) | Batch processing method and system for financial insurance core system | |
CN107844590B (en) | Small file migration method, device, equipment and computer readable storage medium | |
CN106484725B (en) | A kind of data processing method, device and system | |
CN110851263A (en) | Green cloud task scheduling method for heterogeneous cloud data center | |
CN110955553A (en) | Data tilt overload protection method | |
WO2024012592A1 (en) | Adaptive data disk capacity management method and apparatus, electronic device, and storage medium |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
RJ01 | Rejection of invention patent application after publication | ||
RJ01 | Rejection of invention patent application after publication |
Application publication date: 20201117 |