CN116841753B - Stream processing and batch processing switching method and switching device - Google Patents
Stream processing and batch processing switching method and switching device Download PDFInfo
- Publication number
- CN116841753B CN116841753B CN202311109939.7A CN202311109939A CN116841753B CN 116841753 B CN116841753 B CN 116841753B CN 202311109939 A CN202311109939 A CN 202311109939A CN 116841753 B CN116841753 B CN 116841753B
- Authority
- CN
- China
- Prior art keywords
- processing
- average
- data input
- stream
- batch
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 238000012545 processing Methods 0.000 title claims abstract description 405
- 238000000034 method Methods 0.000 title claims abstract description 55
- 230000008901 benefit Effects 0.000 claims abstract description 57
- 238000011217 control strategy Methods 0.000 claims abstract description 39
- 238000004364 calculation method Methods 0.000 claims abstract description 24
- 238000005111 flow chemistry technique Methods 0.000 claims abstract description 19
- 239000013598 vector Substances 0.000 claims description 84
- 239000011159 matrix material Substances 0.000 claims description 83
- 238000005070 sampling Methods 0.000 claims description 34
- 230000009466 transformation Effects 0.000 claims description 28
- 238000004590 computer program Methods 0.000 claims description 24
- 238000003860 storage Methods 0.000 claims description 13
- 238000000513 principal component analysis Methods 0.000 claims description 11
- 238000000354 decomposition reaction Methods 0.000 claims description 9
- 238000004422 calculation algorithm Methods 0.000 claims description 8
- 238000009826 distribution Methods 0.000 claims description 4
- 230000008569 process Effects 0.000 description 12
- 230000006870 function Effects 0.000 description 7
- 238000010586 diagram Methods 0.000 description 5
- 238000005516 engineering process Methods 0.000 description 5
- 238000012544 monitoring process Methods 0.000 description 4
- 230000004044 response Effects 0.000 description 4
- 230000008878 coupling Effects 0.000 description 3
- 238000010168 coupling process Methods 0.000 description 3
- 238000005859 coupling reaction Methods 0.000 description 3
- 230000010354 integration Effects 0.000 description 3
- 238000004458 analytical method Methods 0.000 description 2
- 230000008859 change Effects 0.000 description 2
- 238000004891 communication Methods 0.000 description 2
- 238000013461 design Methods 0.000 description 2
- 238000013507 mapping Methods 0.000 description 2
- 230000009467 reduction Effects 0.000 description 2
- 238000012795 verification Methods 0.000 description 2
- 238000003491 array Methods 0.000 description 1
- 238000010923 batch production Methods 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 230000003139 buffering effect Effects 0.000 description 1
- 238000001514 detection method Methods 0.000 description 1
- 235000019800 disodium phosphate Nutrition 0.000 description 1
- 230000000694 effects Effects 0.000 description 1
- 238000011156 evaluation Methods 0.000 description 1
- 230000008571 general function Effects 0.000 description 1
- 230000003993 interaction Effects 0.000 description 1
- 230000007246 mechanism Effects 0.000 description 1
- 238000012986 modification Methods 0.000 description 1
- 230000004048 modification Effects 0.000 description 1
- 230000003287 optical effect Effects 0.000 description 1
- 230000002085 persistent effect Effects 0.000 description 1
- 238000012216 screening Methods 0.000 description 1
- 230000011218 segmentation Effects 0.000 description 1
- 238000006467 substitution reaction Methods 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5005—Allocation of resources, e.g. of the central processing unit [CPU] to service a request
- G06F9/5027—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F18/00—Pattern recognition
- G06F18/20—Analysing
- G06F18/22—Matching criteria, e.g. proximity measures
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F18/00—Pattern recognition
- G06F18/20—Analysing
- G06F18/23—Clustering techniques
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5005—Allocation of resources, e.g. of the central processing unit [CPU] to service a request
- G06F9/5011—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resources being hardware resources other than CPUs, Servers and Terminals
- G06F9/5016—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resources being hardware resources other than CPUs, Servers and Terminals the resource being the memory
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Data Mining & Analysis (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Physics & Mathematics (AREA)
- Software Systems (AREA)
- Life Sciences & Earth Sciences (AREA)
- Artificial Intelligence (AREA)
- Bioinformatics & Cheminformatics (AREA)
- Bioinformatics & Computational Biology (AREA)
- Computer Vision & Pattern Recognition (AREA)
- Evolutionary Biology (AREA)
- Evolutionary Computation (AREA)
- Management, Administration, Business Operations System, And Electronic Commerce (AREA)
Abstract
The invention is suitable for the technical field of financial data processing, and provides a switching method and a switching device for stream processing and batch processing, wherein the switching method comprises the following steps: and calculating a stream processing benefit score according to the average processing time, the average waiting time, the data input quantity, the average CPU utilization rate and the average memory occupancy rate. And further determining whether to switch the stream processing mode to the batch processing mode based on the processing benefit score. When the batch processing mode is determined to be switched, matching a batch processing control strategy according to the flow processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor calculation power parameter, the maximum data input amount, the minimum data input amount and the average data input amount so as to realize batch processing switching. By the switching logic, the stream processing is switched to batch processing, so that overload condition of the stream processing is avoided when burst data stream is encountered.
Description
Technical Field
The invention belongs to the technical field of financial data processing, and particularly relates to a switching method and a switching device for stream processing and batch processing.
Background
In the field of financial data processing, streaming and batch processing are two distinct computing modes, which typically require processing using different tools and techniques. However, as real-time demands increase, business systems often need to be able to process real-time streaming data and historical batch data simultaneously.
FLINK (Apache Flink) is a distributed stream processing and batch computing engine that fuses stream processing and batch processing capabilities in a unified framework. The principle of FLINK Stream batch integration is achieved by dividing the data Stream into Bounded streams (bound streams) and Unbounded streams (Unbounded streams). The feature of the flank stream batch integration is that it provides a consistent programming model and API interface that can seamlessly switch between stream processing and offline batch processing. This allows developers and data engineers to share the same set of code and logic to meet both real-time processing and batch analysis requirements. This flexibility enables users to perform real-time streaming and offline batch processing tasks on one cluster, thereby reducing management costs and increasing computational efficiency.
However, the conventional flank stream batch integration technology is directed to the switching logic of stream processing and batch processing, which is simply to switch stream processing and batch processing based on the transaction period and the closed-market period, but if a burst data stream is encountered during the transaction period, the data is processed one by one according to the arrival sequence as the stream processing processes the data stream in a real-time continuous manner. Therefore, when a burst data stream is encountered, overload condition often occurs in stream processing, and thus a server is down, which is a technical problem to be solved urgently.
Disclosure of Invention
In view of this, the embodiments of the present invention provide a method, a device, a terminal device, and a computer readable storage medium for switching stream processing and batch processing, so as to solve the technical problem that when a burst data stream is encountered, overload condition often occurs in stream processing, and thus a server is down.
A first aspect of an embodiment of the present invention provides a method for switching between stream processing and batch processing, where the switching method includes:
switching a stream processing mode and a batch processing mode based on the Flink;
if the current processing mode is a stream processing mode, acquiring average processing time corresponding to a plurality of processing requests, average waiting time corresponding to a plurality of processing requests, data input quantity in a plurality of sampling times, average CPU utilization rate and average memory occupancy rate;
calculating a stream processing benefit score according to the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate;
if the flow processing benefit score is smaller than a first threshold value, obtaining a surge index, a minimum delay parameter, a current memory space parameter, a current processor computing power parameter, a maximum data input amount, a minimum data input amount and an average data input amount which are input by a user;
Determining a matching batch control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity;
and switching the stream processing mode into a batch processing mode, and carrying out batch processing on the data to be processed based on the batch processing control strategy.
Further, the step of calculating a stream processing benefit score according to the average processing time corresponding to the plurality of processing requests, the average waiting time corresponding to the plurality of processing requests, the data input amount in the plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate includes:
substituting the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate into the following formula I to obtain the flow processing benefit score;
The first formula is:
wherein,representing stream processing benefit score,/->Representing the data input in the ith said sampling time,/th>Representing said maximum data input amount, +.>Representing the prime number average data input, k representing the number of samples of said sampling time, +.>Representing said average processing time corresponding to the jth processing request,/for>And representing the average waiting time corresponding to the jth processing request, n represents the number of the processing requests, L represents the average CPU utilization rate, and R represents the average memory occupancy rate.
Further, the step of determining a matching batch control policy based on the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor power parameter, the maximum data input, the minimum data input, and the average data input comprises:
constructing the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount into a current state vector;
Acquiring a plurality of pre-stored clustering center vectors and weight coefficients corresponding to the clustering center vectors respectively; the plurality of clustering center vectors are calculated in a plurality of historical state vectors based on a clustering algorithm;
respectively calculating respective corresponding vector distances between the current state vector and a plurality of clustering center vectors;
the vector distances and the weight coefficients corresponding to the clustering center vectors are weighted and summed to obtain the target parameters;
and matching the batch processing control strategy corresponding to the numerical range according to the numerical range of the target parameter.
Further, before the step of obtaining the pre-stored plurality of cluster center vectors and the weight coefficients corresponding to the plurality of cluster center vectors, the method further includes:
acquiring the historical state vectors corresponding to a plurality of historical sample data, calculating the similarity among the plurality of historical state vectors, and constructing a similarity matrix composed of a plurality of similarities;
calculating a transformation matrix according to the similarity matrix, and calculating a normalized target matrix according to the transformation matrix and the similarity matrix;
performing feature decomposition on the normalized target matrix based on principal component analysis to obtain a plurality of initial center vectors;
And carrying out clustering calculation based on the initial center vector and the historical state vectors to obtain a plurality of clustering center vectors.
Further, after the step of performing cluster computation based on the initial center vector and the plurality of history state vectors to obtain a plurality of cluster center vectors, the method further includes:
acquiring prior probabilities corresponding to the clustering center vectors respectively;
substituting the clustering center vector and the prior probability into a formula II to obtain weight coefficients corresponding to the clustering center vectors;
the formula II is as follows:
wherein,weight coefficient representing the j-th cluster center,/->Gaussian distribution representing the ith historical state vector in the jth cluster center, +.>Mean vector representing the j-th cluster center, < ->Covariance matrix representing jth cluster center, k representing number of cluster centers, +.>Representing the prior probability of the jth cluster center,representing the prior probability of the kth cluster center, a represents the number of historical state vectors.
Further, the step of performing feature decomposition on the normalized target matrix based on principal component analysis to obtain a plurality of initial center vectors includes:
Performing feature decomposition on the normalized target matrix based on principal component analysis to obtain a plurality of feature values and feature vectors corresponding to the feature values;
and taking the eigenvectors corresponding to the first k eigenvalues as the initial center vector according to the numerical value.
Further, the step of calculating a transformation matrix according to the similarity matrix and calculating a normalized target matrix according to the transformation matrix and the similarity matrix includes:
substituting the similarity matrix into the following formula III to obtain the transformation matrix;
the formula III is:
wherein,representing the transformation matrix->A numerical value representing an ith row and a jth column in the similarity matrix;
subtracting the transformation matrix from the similarity matrix to obtain an unnormalized target matrix;
dividing the non-normalized target matrix by the transformation matrix to obtain the normalized target matrix.
A second aspect of an embodiment of the present invention provides a switching device for stream processing and batch processing, including:
the first switching unit is used for switching the stream processing mode and the batch processing mode based on the Flink;
the first acquisition unit is used for acquiring average processing time corresponding to the plurality of processing requests, average waiting time corresponding to the plurality of processing requests, data input quantity in the plurality of sampling times, average CPU utilization rate and average memory occupancy rate if the current processing mode is a stream processing mode;
The first calculation unit is used for calculating a stream processing benefit score according to the average processing time corresponding to the plurality of processing requests, the average waiting time corresponding to the plurality of processing requests, the data input quantity in the plurality of sampling times, the CPU average utilization rate and the memory average occupancy rate;
the second obtaining unit is used for obtaining the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount which are input by a user if the flow processing benefit score is smaller than the first threshold value;
the second calculation unit is used for determining a matched batch processing control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity;
and the second switching unit is used for switching the stream processing mode into a batch processing mode and carrying out batch processing on the data to be processed based on the batch processing control strategy.
A third aspect of an embodiment of the present invention provides a terminal device comprising a memory, a processor and a computer program stored in the memory and executable on the processor, the processor implementing the steps of the method of the first aspect when executing the computer program.
A fourth aspect of the embodiments of the present invention provides a computer-readable storage medium storing a computer program which, when executed by a processor, implements the steps of the method of the first aspect.
Compared with the prior art, the embodiment of the invention has the beneficial effects that: the invention switches the stream processing mode and the batch processing mode based on the Flink; if the current processing mode is a stream processing mode, acquiring average processing time corresponding to a plurality of processing requests, average waiting time corresponding to a plurality of processing requests, data input quantity in a plurality of sampling times, average CPU utilization rate and average memory occupancy rate; calculating a stream processing benefit score according to the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate; if the flow processing benefit score is smaller than a first threshold value, obtaining a surge index, a minimum delay parameter, a current memory space parameter, a current processor computing power parameter, a maximum data input amount, a minimum data input amount and an average data input amount which are input by a user; determining a matching batch control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity; and switching the stream processing mode into a batch processing mode, and carrying out batch processing on the data to be processed based on the batch processing control strategy. According to the scheme, the flow processing benefit score is calculated according to the average processing time, the average waiting time, the data input quantity, the average CPU utilization rate and the average memory occupancy rate. And further determining whether to switch the stream processing mode to the batch processing mode based on the processing benefit score. When the batch processing mode is determined to be switched, matching a batch processing control strategy according to the flow processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor calculation power parameter, the maximum data input amount, the minimum data input amount and the average data input amount so as to realize batch processing switching. By the switching logic, the stream processing is switched to batch processing, so that overload condition of the stream processing is avoided when burst data stream is encountered. The batch processing control strategy defines the length of the data blocks and the distributed processing quantity, and the batch processing efficiency is improved through the proper length of the data blocks and the distributed processing quantity.
Drawings
In order to more clearly illustrate the technical solutions of the embodiments of the present invention, the drawings that are required to be used in the embodiments or the related technical descriptions will be briefly described, and it is apparent that the drawings in the following description are only some embodiments of the present invention, and other drawings may be obtained according to the drawings without inventive effort for those skilled in the art.
FIG. 1 is a schematic flow chart of a method for switching between stream processing and batch processing provided by the present invention;
FIG. 2 shows a flow processing mode switching flow chart of the Flink technology provided by the invention;
FIG. 3 is a schematic diagram of a switching device for stream processing and batch processing according to an embodiment of the present invention;
fig. 4 shows a schematic diagram of a terminal device according to an embodiment of the present invention.
Detailed Description
In the following description, for purposes of explanation and not limitation, specific details are set forth such as the particular system architecture, techniques, etc., in order to provide a thorough understanding of the embodiments of the present invention. It will be apparent, however, to one skilled in the art that the present invention may be practiced in other embodiments that depart from these specific details. In other instances, detailed descriptions of well-known systems, devices, circuits, and methods are omitted so as not to obscure the description of the present invention with unnecessary detail.
The embodiment of the application provides a switching method, a switching device, terminal equipment and a computer readable storage medium for stream processing and batch processing, which are used for solving the technical problem that when burst data streams are encountered, overload condition often occurs in stream processing, and then server downtime is caused.
First, the application provides a switching method of stream processing and batch processing. Referring to fig. 1, fig. 1 is a schematic flow chart of a method for switching stream processing and batch processing according to the present application. As shown in fig. 1, the method for switching between stream processing and batch processing may include the steps of:
step 101: switching a stream processing mode and a batch processing mode based on the Flink;
the application is based on the existing Flink technology to automatically switch so as to overcome the overload condition of stream processing when encountering burst data stream.
Referring to fig. 2, fig. 2 shows a flow processing mode switching flowchart of the link technology provided by the present application. As shown in FIG. 2, the Flink technique sends the change of the data stream in the monitoring source table to the processor in the form of Kafka through the CDC plug-in. The processor distinguishes the current data as real-time data or offline data based on the change in the data stream. And if the data flow is changed from the offline data to the real-time data, switching the batch processing mode to the flow processing mode.
Step 102: if the current processing mode is a stream processing mode, acquiring average processing time corresponding to a plurality of processing requests, average waiting time corresponding to a plurality of processing requests, data input quantity in a plurality of sampling times, average CPU utilization rate and average memory occupancy rate;
the stream processing mode performs real-time processing and analysis based on continuously flowing data. The data arrives in a stream one by one at the system and is processed in a short time by a real-time computing engine (e.g., apache Kafka, apache Flink). The batch mode processes in units of data sets (batches) that accumulate over a time window, such as daily or hourly. The data is collected and stored at regular time intervals and then the job or task is performed at a predetermined time. From the above, the overload condition of the stream processing facing the burst data stream is easy to occur, and the server is down. The batch mode is non-real time data processing, so that overload condition can not occur in the face of burst data flow. Therefore, the application only aims at the automatic switching of the stream processing mode, so as to avoid the technical problem that the server is down due to overload condition when the burst data stream is encountered.
In order to sufficiently evaluate whether the processing capability of the stream processing mode can process the current input data, it is necessary to obtain an average processing time corresponding to a plurality of processing requests, an average waiting time corresponding to a plurality of processing requests, an amount of data input in a plurality of sampling times, an average CPU usage rate, and an average memory occupancy rate to calculate a stream processing benefit score (the stream processing benefit score is used to characterize the current processing capability of the stream processing mode).
Step 103: calculating a stream processing benefit score according to the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate;
specifically, substituting the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate into the following formula I to obtain the stream processing benefit score;
the first formula is:
wherein,representing stream processing benefit score,/->Representing the data input in the ith said sampling time,/th >Representing said maximum data input amount, +.>Representing the prime number average data input, k representing the number of samples of said sampling time, +.>Representing said average processing time corresponding to the jth processing request,/for>And representing the average waiting time corresponding to the jth processing request, n represents the number of the processing requests, L represents the average CPU utilization rate, and R represents the average memory occupancy rate.
The application comprehensively considers the influence of various factors, and calculates the stream processing benefit score based on the average processing time, the average waiting time corresponding to the processing requests, the data input quantity in the sampling times, the CPU average utilization rate and the memory average occupancy rate because the average processing time, the average waiting time corresponding to the processing requests, the data input quantity in the sampling times, the CPU average utilization rate and the memory average occupancy rate have certain influence on the calculation of the stream processing benefit score so as to realize the evaluation of the processing capability of the current stream processing mode. The first formula is based on a large amount of experimental data and verification, but is not limited to the mathematical expression.
Step 104: if the flow processing benefit score is smaller than a first threshold value, obtaining a surge index, a minimum delay parameter, a current memory space parameter, a current processor computing power parameter, a maximum data input amount, a minimum data input amount and an average data input amount which are input by a user;
When the stream processing benefit score is less than the first threshold, the processing power representing the current stream processing mode is insufficient to process the current data stream. It is necessary to switch the stream processing mode to the batch processing mode. In order to further improve the processing efficiency of the batch processing mode, the present application needs to calculate a batch processing control policy according to a plurality of dimension data (such as a user input surge index, a minimum delay parameter, a current memory space parameter, a current processor power parameter, a maximum data input amount, a minimum data input amount, and an average data input amount). The batch processing control strategy comprises control parameters such as data block length, distributed processing quantity and the like.
The surge index is a control parameter input by a user and is determined by the user according to human experience, so that the batch control strategy meets the actual environment requirements better. The minimum delay parameter is a preset parameter value, and is a processing delay for limiting the batch mode. The current memory space parameter refers to a memory space resource that can be currently invoked. The current processor computing power parameter refers to the processor computing power resource that can be currently invoked. The maximum data input amount represents the maximum amount of data in the plurality of processing requests. The smallest data amount of the smallest data input amounts. The average data input amount refers to an average of the data amounts in the plurality of processing requests.
Step 105: determining a matching batch control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity;
specifically, step 105 specifically includes steps 1051 to 1055:
step 1051: constructing the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount into a current state vector;
step 1052: acquiring a plurality of pre-stored clustering center vectors and weight coefficients corresponding to the clustering center vectors respectively; the plurality of clustering center vectors are calculated in a plurality of historical state vectors based on a clustering algorithm;
in order to improve the calculation efficiency, the method pre-calculates a plurality of clustering center vectors and weight coefficients corresponding to the clustering center vectors before executing the scheme of the application, and the specific calculation logic is as follows:
Optionally, steps a to F are further included before step 1052:
step A: and acquiring the historical state vectors corresponding to the plurality of historical sample data, calculating the similarity among the plurality of historical state vectors, and constructing a similarity matrix composed of the plurality of similarities.
In order to minimize the influence of noise and local outliers on data in the present application, the similarity between a plurality of historical state vectors is calculated, and a similarity matrix consisting of a plurality of similarities is constructed. And the similarity matrix describes the similarity and the difference between the data points and reflects the internal structure of the data set so as to better realize clustering. And by calculating the similarity matrix, the direct calculation of the original data can be avoided, so that the calculation complexity is reduced. Meanwhile, the calculation of the similarity matrix can be conveniently performed in parallel, so that the calculation speed is increased.
And (B) step (B): and calculating a transformation matrix according to the similarity matrix, and calculating a normalized target matrix according to the transformation matrix and the similarity matrix.
In order to further improve the calculation accuracy, the method calculates a transformation matrix based on the similarity matrix, wherein the values in the transformation matrix can be used for representing the importance and the similarity of each value in the similarity matrix. In order to facilitate subsequent vector dimension reduction and clustering, a normalized target matrix corresponding to the transformation matrix needs to be calculated, and the specific calculation process is as follows:
Specifically, step B specifically includes steps B1 to B3:
step B1: substituting the similarity matrix into the following formula III to obtain the transformation matrix;
the formula III is:
wherein,representing the transformation matrix->A numerical value representing an ith row and a jth column in the similarity matrix;
step B2: subtracting the transformation matrix from the similarity matrix to obtain an unnormalized target matrix;
step B3: dividing the non-normalized target matrix by the transformation matrix to obtain the normalized target matrix.
In this embodiment, the transformation matrix is calculated through the similarity matrix, and the transformation matrix is used for representing the importance and similarity of each numerical value in the similarity matrix, so that the calculation accuracy is improved, and the normalized target matrix is calculated based on the transformation matrix, so that the subsequent vector dimension reduction and clustering are facilitated.
Step C: and carrying out feature decomposition on the normalized target matrix based on principal component analysis to obtain a plurality of initial center vectors.
Specifically, step C specifically includes step C1 and step C2:
step C1: performing feature decomposition on the normalized target matrix based on principal component analysis to obtain a plurality of feature values and feature vectors corresponding to the feature values;
Step C2: and taking the eigenvectors corresponding to the first k eigenvalues as the initial center vector according to the numerical value.
Since the clustering algorithm needs to select K initial cluster centers for subsequent cluster computation. The normalized target matrix is subjected to feature decomposition through principal component analysis to obtain a plurality of feature values and feature vectors corresponding to the feature values, and the feature vectors corresponding to the first k feature values are used as initial center vectors (namely initial clustering centers). Wherein the value of k is a super parameter, namely the preset number of clustering centers. It is worth noting that, because the principal component analysis can extract important data in the data, the K initial clustering centers obtained based on the principal component analysis have strong representativeness, and the accuracy of the clustering algorithm is further improved.
Step D: and carrying out clustering calculation based on the initial center vector and the historical state vectors to obtain a clustering center vector.
Optionally, step E to step F are further included before step D:
step E: and acquiring prior probabilities corresponding to the clustering center vectors respectively.
In the clustering algorithm, the prior probability (prior probability) refers to the prior probability distribution for each cluster at the time of modeling. It means that we have some prior assumption or knowledge of the probability of occurrence of each cluster before there is no data observation. In general, the prior probability may be determined by experience of a domain expert, analysis of historical data, and the like.
Step F: substituting the clustering center vector and the prior probability into a formula II to obtain weight coefficients corresponding to the clustering center vectors;
the formula II is as follows:
wherein,weight coefficient representing the j-th cluster center,/->Gaussian distribution representing the ith historical state vector in the jth cluster center, +.>Mean vector representing the j-th cluster center, < ->Covariance matrix representing jth cluster center, k representing number of cluster centers, +.>Representing the prior probability of the jth cluster center,representing the prior probability of the kth cluster center, a represents the number of historical state vectors.
The method comprehensively considers the influence of various factors, and calculates the weight coefficient of each cluster center based on the weight coefficient, the number of the cluster centers, the Gaussian distribution of the cluster centers, the mean vector of the cluster centers, the covariance matrix of the cluster centers and the number of the historical state vectors, so as to realize high-precision clustering. The second formula is based on a large amount of experimental data and verification, but is not limited to the mathematical expression.
Step 1053: respectively calculating respective corresponding vector distances between the current state vector and a plurality of clustering center vectors;
step 1054: the vector distances and the weight coefficients corresponding to the clustering center vectors are weighted and summed to obtain the target parameters;
step 1055: and matching the batch processing control strategy corresponding to the numerical range according to the numerical range of the target parameter.
Different batch control strategies have preset mapping relations with different numerical ranges, so that the corresponding batch control strategies can be matched based on the numerical ranges. The batch control policy includes a data block length and a distributed processing number.
Wherein, the data processed in the stream processing mode is unbounded data, and the data processed in the batch processing mode is bounded data (i.e. the data is processed in the form of data blocks). The present application controls the length of bounded data by defining the data block length in a batch control strategy. It can be appreciated that the data block length is too short to fully utilize the current computing power resources, resulting in low computing efficiency. And when the length of the data block is too long, a longer processing time is required, and the processing delay is increased. Therefore, reasonable data block length needs to be set to improve the calculation efficiency and reduce the delay.
Whereas distributed processing, by performing parallel processing on the data to be processed, in order to allocate reasonable distributed resources, limits the number of parallel processing by the number of distributed processing (i.e., the number of distributed processing refers to the number of parallel processing).
The distributed batch processing is a technology for cooperatively completing data processing by utilizing a plurality of computers, and generally relates to the steps of data slicing, distributing, parallel execution, result summarizing and the like. Parallel processing may be based on three distributed schemes:
first distributed scheme: scheme based on MapReduce model: mapReduce is a well-known distributed programming model proposed by Google for processing large-scale data sets. The MapReduce model divides data processing into two phases: map and Reduce. The Map stage is responsible for dividing input data into a plurality of key value pairs, and mapping each key value pair to generate an intermediate result. The Reduce stage is responsible for reducing intermediate results with the same keys to produce the final result. The MapReduce model may utilize a distributed file system (e.g., HDFS) and a distributed scheduling system (e.g., YARN) to enable storage of data and scheduling of tasks. The MapReduce model has the advantages of simplicity and easiness in use, and can automatically process the problems of data segmentation, task allocation, fault tolerance and the like. The MapReduce model has the disadvantage of being inflexible, being only suitable for some simple batch processing scenes, and having lower efficiency for complex data processing flows or algorithms requiring multiple iterations. Typical frameworks based on the MapReduce model are Hadoop MapReduce1 and Apache Spark2.
A second distributed scheme: scheme based on DAG (directed acyclic graph) model: a DAG is a graph structure that represents the dependencies between tasks in a data processing flow. The DAG model abstracts the data processing flow into a directed acyclic graph consisting of multiple vertices and edges, where the vertices represent tasks and the edges represent data or control flows between tasks. The DAG model can analyze and execute the DAG graph by utilizing a DAG engine, determine the execution sequence and parallelism of the tasks according to the dependency relationship between the tasks, and dynamically adjust the allocation and scheduling of the tasks according to the resource condition. The DAG model has the advantages of flexibility, high efficiency, support of complex data processing flow and algorithms for multiple iterations, and realization of a fine resource management and fault tolerance mechanism. The disadvantage of the DAG model is the high programming difficulty, the need for the user to define the DAG graph himself, and the attention to the internal implementation details of the DAG engine. Typical frameworks based on the DAG model are Apache Flink3 and Apache Storm.
Third distributed scheme: scheme based on task scheduling framework: the task scheduling framework is a scheme for constructing distributed batch processing logic based on the existing business application service cluster, and a special data platform or engine is not required to be additionally deployed. Task scheduling frameworks typically provide some general functions and interfaces such as timing triggers, task definitions, task slicing, task execution, task monitoring, etc., allowing users to only focus on the implementation of business logic and not on the details of distributed coordination. The task scheduling framework has the advantages of light weight, simplicity and convenience, can be rapidly integrated into the existing service system, and can complete batch processing tasks by utilizing the resources and the capabilities of the service clusters. A disadvantage of the task scheduling framework is that it is relatively limited in functionality and is not well suited for handling very complex or very large-scale data processing tasks. Typical frameworks based on task scheduling frameworks are ElasticJob and SchedulerX.
Step 106: and switching the stream processing mode into a batch processing mode, and carrying out batch processing on the data to be processed based on the batch processing control strategy.
Switching the stream processing mode to the batch processing mode involves the steps of: (1) determining a switching time: a point in time is determined at which to switch stream processing to batch processing. (2) Stopping stream data input: new real-time data input to the stream processing system is stopped. This may be achieved by closing the data source connection, suspending the data sink, or changing the direction of the data stream, etc. (3) Buffering the raw data: after stopping the data input, it is necessary to ensure that all stream data that has not been processed is buffered correctly. This may be accomplished by storing the data in memory or persistent storage, or writing it to a temporary file. (4) Switching the processing mode: depending on the system design, the configuration or code of the stream processing system is modified accordingly to accommodate batch processing.
As an alternative embodiment of the present invention, if it is detected that the input data stream is below the threshold value after switching to the batch mode, the batch mode may be switched to the stream processing mode.
In the present embodiment, by switching the stream processing mode and the batch processing mode based on the Flink; if the current processing mode is a stream processing mode, acquiring average processing time corresponding to a plurality of processing requests, average waiting time corresponding to a plurality of processing requests, data input quantity in a plurality of sampling times, average CPU utilization rate and average memory occupancy rate; calculating a stream processing benefit score according to the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate; if the flow processing benefit score is smaller than a first threshold value, obtaining a surge index, a minimum delay parameter, a current memory space parameter, a current processor computing power parameter, a maximum data input amount, a minimum data input amount and an average data input amount which are input by a user; determining a matching batch control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity; and switching the stream processing mode into a batch processing mode, and carrying out batch processing on the data to be processed based on the batch processing control strategy. According to the scheme, the flow processing benefit score is calculated according to the average processing time, the average waiting time, the data input quantity, the average CPU utilization rate and the average memory occupancy rate. And further determining whether to switch the stream processing mode to the batch processing mode based on the processing benefit score. When the batch processing mode is determined to be switched, matching a batch processing control strategy according to the flow processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor calculation power parameter, the maximum data input amount, the minimum data input amount and the average data input amount so as to realize batch processing switching. By the switching logic, the stream processing is switched to batch processing, so that overload condition of the stream processing is avoided when burst data stream is encountered. The batch processing control strategy defines the length of the data blocks and the distributed processing quantity, and the batch processing efficiency is improved through the proper length of the data blocks and the distributed processing quantity.
Referring to fig. 3, fig. 3 is a schematic diagram of a switching device for stream processing and batch processing, and fig. 3 is a schematic diagram of a switching device for stream processing and batch processing, where the switching device for stream processing and batch processing shown in fig. 3 includes:
a first switching unit 31 for switching the stream processing mode and the batch processing mode based on the link;
a first obtaining unit 32, configured to obtain an average processing time corresponding to the plurality of processing requests, an average waiting time corresponding to the plurality of processing requests, a data input amount in the plurality of sampling times, an average CPU utilization rate, and an average memory occupancy rate if the current processing mode is a stream processing mode;
a first calculating unit 33, configured to calculate a stream processing benefit score according to the average processing time corresponding to the plurality of processing requests, the average waiting time corresponding to the plurality of processing requests, the data input amount in the plurality of sampling times, the average CPU utilization rate, and the average memory occupancy rate;
a second obtaining unit 34, configured to obtain, if the flow processing benefit score is smaller than the first threshold, a surge index, a minimum delay parameter, a current memory space parameter, a current processor computing power parameter, a maximum data input amount, a minimum data input amount, and an average data input amount, which are input by a user;
A second calculation unit 35, configured to determine a matching batch control policy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor power parameter, the maximum data input amount, the minimum data input amount, and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity;
a second switching unit 36, configured to switch the stream processing mode to a batch processing mode, and batch process the data to be processed based on the batch processing control policy.
The invention provides a switching device for stream processing and batch processing, which is used for identifying the request type of a service request by responding to the received service request and determining the processing difficulty index of the service request according to the request type; acquiring selection probability and resource data corresponding to each of a plurality of servers to be selected, and calculating current processing capacity indexes corresponding to each of the plurality of servers to be selected according to the selection probability and the resource data; the selection probability refers to the probability that the server to be selected is selected as a target server within preset times; calculating the processing efficiency corresponding to each of the plurality of servers to be selected according to the processing difficulty index and the current processing capacity index; and screening target servers from a plurality of servers to be selected according to the processing efficiency, and processing the service request through the target servers. According to the scheme, the processing difficulty index of the service request is determined according to the service request type, the current processing capacity index is determined according to the selection probability of the server to be selected and the resource data, and the processing efficiency of each server to be selected is determined according to the processing difficulty index and the current processing capacity index, so that the target server is selected according to the processing efficiency. The invention comprehensively judges the processing efficiency of the server to be selected based on a plurality of dimensions (request type, selection probability and resource data), so that the balanced load is more fit with the actual situation, and the situations of overload and the like of the server can be well avoided.
Fig. 4 is a schematic diagram of a terminal device according to an embodiment of the present invention. As shown in fig. 4, a terminal device 4 of this embodiment includes: a processor 40, a memory 41 and a computer program 42 stored in said memory 41 and executable on said processor 40, for example a program for switching between stream processing and batch processing. The processor 40, when executing the computer program 42, implements the steps of the above-described respective one of the stream processing and batch processing switching method embodiments, such as steps 101 through 1046 shown in fig. 1. Alternatively, the processor 40, when executing the computer program 42, performs the functions of the units in the above-described device embodiments, such as the functions of the units 31 to 36 shown in fig. 3.
Illustratively, the computer program 42 may be partitioned into one or more units that are stored in the memory 41 and executed by the processor 40 to complete the present invention. The one or more units may be a series of computer program instruction segments capable of performing a specific function describing the execution of the computer program 42 in the one terminal device 4. For example, the specific functions of the computer program 42 that may be partitioned into units are as follows:
The first switching unit is used for switching the stream processing mode and the batch processing mode based on the Flink;
the first acquisition unit is used for acquiring average processing time corresponding to the plurality of processing requests, average waiting time corresponding to the plurality of processing requests, data input quantity in the plurality of sampling times, average CPU utilization rate and average memory occupancy rate if the current processing mode is a stream processing mode;
the first calculation unit is used for calculating a stream processing benefit score according to the average processing time corresponding to the plurality of processing requests, the average waiting time corresponding to the plurality of processing requests, the data input quantity in the plurality of sampling times, the CPU average utilization rate and the memory average occupancy rate;
the second obtaining unit is used for obtaining the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount which are input by a user if the flow processing benefit score is smaller than the first threshold value;
the second calculation unit is used for determining a matched batch processing control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity;
And the second switching unit is used for switching the stream processing mode into a batch processing mode and carrying out batch processing on the data to be processed based on the batch processing control strategy.
Including but not limited to a processor 40 and a memory 41. It will be appreciated by those skilled in the art that fig. 4 is merely an example of one type of terminal device 4 and is not meant to be limiting as to one type of terminal device 4, and may include more or fewer components than shown, or may combine certain components, or different components, e.g., the one type of terminal device may also include input and output devices, network access devices, buses, etc.
The processor 40 may be a central processing unit (Central Processing Unit, CPU), but may also be other general purpose processors, digital signal processors (Digital Signal Processor, DSPs), application specific integrated circuits (Application Specific Integrated Circuit, ASICs), off-the-shelf programmable gate arrays (Field-Programmable Gate Array, FPGAs) or other programmable logic devices, discrete gate or transistor logic devices, discrete hardware components, or the like. A general purpose processor may be a microprocessor or the processor may be any conventional processor or the like.
The memory 41 may be an internal storage unit of the terminal device 4, for example a hard disk or a memory of the terminal device 4. The memory 41 may also be an external storage device of the terminal device 4, such as a plug-in hard disk, a Smart Media Card (SMC), a Secure Digital (SD) Card, a Flash memory Card (Flash Card) or the like, which are provided on the terminal device 4. Further, the memory 41 may also include both an internal storage unit and an external storage device of the one terminal device 4. The memory 41 is used for storing the computer program and other programs and data required for the one roaming control device. The memory 41 may also be used for temporarily storing data that has been output or is to be output.
It should be understood that the sequence number of each step in the foregoing embodiment does not mean that the execution sequence of each process should be determined by the function and the internal logic, and should not limit the implementation process of the embodiment of the present invention.
It should be noted that, because the content of information interaction and execution process between the above devices/units is based on the same concept as the method embodiment of the present invention, specific functions and technical effects thereof may be referred to in the method embodiment section, and will not be described herein.
It will be apparent to those skilled in the art that, for convenience and brevity of description, only the above-described division of the functional units and modules is illustrated, and in practical application, the above-described functional distribution may be performed by different functional units and modules according to needs, that is, the internal structure of the apparatus is divided into different functional units or modules, so as to perform all or part of the functions described above. The functional units and modules in the embodiment may be integrated in one processing unit, or each unit may exist alone physically, or two or more units may be integrated in one unit, where the integrated units may be implemented in a form of hardware or a form of a software functional unit. In addition, the specific names of the functional units and modules are only for distinguishing from each other, and are not used for limiting the protection scope of the present invention. The specific working process of the units and modules in the above system may refer to the corresponding process in the foregoing method embodiment, which is not described herein again.
Embodiments of the present invention also provide a computer readable storage medium storing a computer program which, when executed by a processor, implements steps for implementing the various method embodiments described above.
Embodiments of the present invention provide a computer program product which, when run on a mobile terminal, causes the mobile terminal to perform steps that enable the implementation of the method embodiments described above.
The integrated units, if implemented in the form of software functional units and sold or used as stand-alone products, may be stored in a computer readable storage medium. Based on such understanding, the present invention may implement all or part of the flow of the method of the above embodiments, and may be implemented by a computer program to instruct related hardware, where the computer program may be stored in a computer readable storage medium, and when the computer program is executed by a processor, the computer program may implement the steps of each of the method embodiments described above. Wherein the computer program comprises computer program code which may be in source code form, object code form, executable file or some intermediate form etc. The computer readable medium may include at least: any entity or device capable of carrying computer program code to a photographing device/terminal apparatus, recording medium, computer Memory, read-Only Memory (ROM), random access Memory (Random Access Memory, RAM), electrical carrier signals, telecommunications signals, and software distribution media. Such as a U-disk, removable hard disk, magnetic or optical disk, etc. In some jurisdictions, computer readable media may not be electrical carrier signals and telecommunications signals in accordance with legislation and patent practice.
In the foregoing embodiments, the descriptions of the embodiments are emphasized, and in part, not described or illustrated in any particular embodiment, reference is made to the related descriptions of other embodiments.
Those of ordinary skill in the art will appreciate that the various illustrative elements and algorithm steps described in connection with the embodiments disclosed herein may be implemented as electronic hardware, or combinations of computer software and electronic hardware. Whether such functionality is implemented as hardware or software depends upon the particular application and design constraints imposed on the solution. Skilled artisans may implement the described functionality in varying ways for each particular application, but such implementation decisions should not be interpreted as causing a departure from the scope of the present invention.
In the embodiments provided in the present invention, it should be understood that the disclosed apparatus/network device and method may be implemented in other manners. For example, the apparatus/network device embodiments described above are merely illustrative, e.g., the division of the modules or units is merely a logical functional division, and there may be additional divisions in actual implementation, e.g., multiple units or components may be combined or integrated into another system, or some features may be omitted, or not performed. Alternatively, the coupling or direct coupling or communication connection shown or discussed may be an indirect coupling or communication connection via interfaces, devices or units, which may be in electrical, mechanical or other forms.
The units described as separate units may or may not be physically separate, and units shown as units may or may not be physical units, may be located in one place, or may be distributed over a plurality of network units.
It should be understood that the terms "comprises" and/or "comprising," when used in this specification and the appended claims, specify the presence of stated features, integers, steps, operations, elements, and/or components, but do not preclude the presence or addition of one or more other features, integers, steps, operations, elements, components, and/or groups thereof.
It should also be understood that the term "and/or" as used in the present specification and the appended claims refers to any and all possible combinations of one or more of the associated listed items, and includes such combinations.
As used in the present description and the appended claims, the term "if" may be interpreted as "when..once" or "in response to a determination" or "in response to a detection" depending on the context. Similarly, the phrase "if a determination" or "if a [ described condition or event ] is monitored" may be interpreted in the context of meaning "upon determination" or "in response to determination" or "upon monitoring a [ described condition or event ]" or "in response to monitoring a [ described condition or event ]".
Furthermore, the terms "first," "second," "third," and the like in the description of the present specification and in the appended claims, are used for distinguishing between descriptions and not necessarily for indicating or implying a relative importance.
Reference in the specification to "one embodiment" or "some embodiments" or the like means that a particular feature, structure, or characteristic described in connection with the embodiment is included in one or more embodiments of the invention. Thus, appearances of the phrases "in one embodiment," "in some embodiments," "in other embodiments," and the like in the specification are not necessarily all referring to the same embodiment, but mean "one or more but not all embodiments" unless expressly specified otherwise. The terms "comprising," "including," "having," and variations thereof mean "including but not limited to," unless expressly specified otherwise.
The above embodiments are only for illustrating the technical solution of the present invention, and not for limiting the same; although the invention has been described in detail with reference to the foregoing embodiments, it will be understood by those of ordinary skill in the art that: the technical scheme described in the foregoing embodiments can be modified or some technical features thereof can be replaced by equivalents; such modifications and substitutions do not depart from the spirit and scope of the technical solutions of the embodiments of the present invention, and are intended to be included in the scope of the present invention.
Claims (10)
1. A method for switching between stream processing and batch processing, the method comprising:
switching a stream processing mode and a batch processing mode based on the Flink;
if the current processing mode is a stream processing mode, acquiring average processing time corresponding to a plurality of processing requests, average waiting time corresponding to a plurality of processing requests, data input quantity in a plurality of sampling times, average CPU utilization rate and average memory occupancy rate;
calculating a stream processing benefit score according to the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate;
if the flow processing benefit score is smaller than a first threshold value, obtaining a surge index, a minimum delay parameter, a current memory space parameter, a current processor computing power parameter, a maximum data input amount, a minimum data input amount and an average data input amount which are input by a user;
determining a matching batch control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity;
And switching the stream processing mode into a batch processing mode, and carrying out batch processing on the data to be processed based on the batch processing control strategy.
2. The method for switching between stream processing and batch processing according to claim 1, wherein the step of calculating a stream processing benefit score based on the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input amount in a plurality of sampling times, the average CPU utilization, and the average memory occupancy comprises:
obtaining the maximum data input quantity of the data input quantity in a plurality of sampling time and the average data input quantity of the data input quantity in a plurality of sampling time;
substituting the average processing time corresponding to a plurality of processing requests, the average waiting time corresponding to a plurality of processing requests, the data input quantity in a plurality of sampling times, the average CPU utilization rate and the average memory occupancy rate into the following formula I to obtain the flow processing benefit score;
the first formula is:
wherein,representing stream processing benefit score,/->Representing the data input in the ith said sampling time,/th>Representing said maximum data input amount, +. >Representing the prime number average data input, k representing the number of samples of said sampling time, +.>Representing said average processing time corresponding to the jth processing request,/for>And representing the average waiting time corresponding to the jth processing request, n represents the number of the processing requests, L represents the average CPU utilization rate, and R represents the average memory occupancy rate.
3. The method of switching between stream processing and batch processing as set forth in claim 1, wherein said step of determining a matching batch control strategy based on said stream processing benefit score, said surge index, said minimum delay parameter, said current memory space parameter, said current processor power parameter, said maximum data input, said minimum data input, and said average data input comprises:
constructing the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount into a current state vector;
acquiring a plurality of pre-stored clustering center vectors and weight coefficients corresponding to the clustering center vectors respectively; the plurality of clustering center vectors are calculated in a plurality of historical state vectors based on a clustering algorithm;
Respectively calculating respective corresponding vector distances between the current state vector and a plurality of clustering center vectors;
the vector distances and the weight coefficients corresponding to the clustering center vectors are weighted and summed to obtain target parameters;
and matching the batch processing control strategy corresponding to the numerical range according to the numerical range of the target parameter.
4. The method for switching between stream processing and batch processing according to claim 3, further comprising, before said step of obtaining a plurality of pre-stored cluster center vectors and weight coefficients corresponding to each of the plurality of cluster center vectors:
acquiring the historical state vectors corresponding to a plurality of historical sample data, calculating the similarity among the plurality of historical state vectors, and constructing a similarity matrix composed of a plurality of similarities;
calculating a transformation matrix according to the similarity matrix, and calculating a normalized target matrix according to the transformation matrix and the similarity matrix;
performing feature decomposition on the normalized target matrix based on principal component analysis to obtain a plurality of initial center vectors;
and carrying out clustering calculation based on the initial center vector and the historical state vectors to obtain a plurality of clustering center vectors.
5. The method for switching between stream processing and batch processing as recited in claim 4, further comprising, after said step of performing a cluster calculation based on said initial center vector and a plurality of said history state vectors, a step of:
acquiring prior probabilities corresponding to the clustering center vectors respectively;
substituting the clustering center vector and the prior probability into a formula II to obtain weight coefficients corresponding to the clustering center vectors;
the formula II is as follows:
wherein,weight coefficient representing the j-th cluster center,/->Gaussian distribution representing the ith historical state vector in the jth cluster center, +.>Mean vector representing the j-th cluster center, < ->Covariance matrix representing jth cluster center, k representing number of cluster centers, +.>Representing the prior probability of the jth cluster center,/->Representing the prior probability of the kth cluster center, a represents the number of historical state vectors.
6. The method for switching between stream processing and batch processing as recited in claim 4, wherein said step of performing feature decomposition on said normalized target matrix based on principal component analysis to obtain a plurality of initial center vectors comprises:
Performing feature decomposition on the normalized target matrix based on principal component analysis to obtain a plurality of feature values and feature vectors corresponding to the feature values;
and taking the eigenvectors corresponding to the first k eigenvalues as the initial center vector according to the numerical value.
7. The method of switching between stream processing and batch processing as set forth in claim 4, wherein said step of calculating a transformation matrix from said similarity matrix and calculating a normalized target matrix from said transformation matrix and said similarity matrix comprises:
substituting the similarity matrix into the following formula III to obtain the transformation matrix;
the formula III is:
wherein,representing the transformation matrix->A numerical value representing an ith row and a jth column in the similarity matrix;
subtracting the transformation matrix from the similarity matrix to obtain an unnormalized target matrix;
dividing the non-normalized target matrix by the transformation matrix to obtain the normalized target matrix.
8. A switching device for stream processing and batch processing, the switching device comprising:
the first switching unit is used for switching the stream processing mode and the batch processing mode based on the Flink;
The first acquisition unit is used for acquiring average processing time corresponding to the plurality of processing requests, average waiting time corresponding to the plurality of processing requests, data input quantity in the plurality of sampling times, average CPU utilization rate and average memory occupancy rate if the current processing mode is a stream processing mode;
the first calculation unit is used for calculating a stream processing benefit score according to the average processing time corresponding to the plurality of processing requests, the average waiting time corresponding to the plurality of processing requests, the data input quantity in the plurality of sampling times, the CPU average utilization rate and the memory average occupancy rate;
the second obtaining unit is used for obtaining the surge index, the minimum delay parameter, the current memory space parameter, the current processor computing power parameter, the maximum data input amount, the minimum data input amount and the average data input amount which are input by a user if the flow processing benefit score is smaller than the first threshold value;
the second calculation unit is used for determining a matched batch processing control strategy according to the stream processing benefit score, the surge index, the minimum delay parameter, the current memory space parameter, the current processor power parameter, the maximum data input amount, the minimum data input amount and the average data input amount; the batch processing control strategy comprises a data block length and a distributed processing quantity;
And the second switching unit is used for switching the stream processing mode into a batch processing mode and carrying out batch processing on the data to be processed based on the batch processing control strategy.
9. A terminal device comprising a memory, a processor and a computer program stored in the memory and executable on the processor, characterized in that the processor implements the steps of the method according to any of claims 1 to 7 when the computer program is executed.
10. A computer readable storage medium storing a computer program, characterized in that the computer program when executed by a processor implements the steps of the method according to any one of claims 1 to 7.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202311109939.7A CN116841753B (en) | 2023-08-31 | 2023-08-31 | Stream processing and batch processing switching method and switching device |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202311109939.7A CN116841753B (en) | 2023-08-31 | 2023-08-31 | Stream processing and batch processing switching method and switching device |
Publications (2)
Publication Number | Publication Date |
---|---|
CN116841753A CN116841753A (en) | 2023-10-03 |
CN116841753B true CN116841753B (en) | 2023-11-17 |
Family
ID=88174668
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202311109939.7A Active CN116841753B (en) | 2023-08-31 | 2023-08-31 | Stream processing and batch processing switching method and switching device |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN116841753B (en) |
Families Citing this family (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN117435596B (en) * | 2023-12-20 | 2024-04-02 | 杭州网易云音乐科技有限公司 | Streaming batch task integration method and device, storage medium and electronic equipment |
Citations (9)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
JPH0368821A (en) * | 1989-08-09 | 1991-03-25 | Niigata Eng Co Ltd | Load cell apparatus |
WO2015008379A1 (en) * | 2013-07-19 | 2015-01-22 | 株式会社日立製作所 | Data processing device and data processing method |
CN106357503A (en) * | 2016-08-19 | 2017-01-25 | 百度在线网络技术(北京)有限公司 | Message processing method and instant communication server |
CN106528717A (en) * | 2016-10-26 | 2017-03-22 | 中国电子产品可靠性与环境试验研究所 | Data processing method and system |
CN106873945A (en) * | 2016-12-29 | 2017-06-20 | 中山大学 | Data processing architecture and data processing method based on batch processing and Stream Processing |
CN110362600A (en) * | 2019-07-22 | 2019-10-22 | 广西大学 | A kind of random ordering data flow distribution aggregate query method, system and medium |
CN113435989A (en) * | 2021-06-25 | 2021-09-24 | 中国工商银行股份有限公司 | Financial data processing method and device |
CN114281508A (en) * | 2021-12-29 | 2022-04-05 | 江苏达科信息科技有限公司 | Data batch-flow fusion offline calculation method |
CN116382892A (en) * | 2023-02-08 | 2023-07-04 | 深圳市融聚汇信息科技有限公司 | Load balancing method and device based on multi-cloud fusion and cloud service |
-
2023
- 2023-08-31 CN CN202311109939.7A patent/CN116841753B/en active Active
Patent Citations (9)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
JPH0368821A (en) * | 1989-08-09 | 1991-03-25 | Niigata Eng Co Ltd | Load cell apparatus |
WO2015008379A1 (en) * | 2013-07-19 | 2015-01-22 | 株式会社日立製作所 | Data processing device and data processing method |
CN106357503A (en) * | 2016-08-19 | 2017-01-25 | 百度在线网络技术(北京)有限公司 | Message processing method and instant communication server |
CN106528717A (en) * | 2016-10-26 | 2017-03-22 | 中国电子产品可靠性与环境试验研究所 | Data processing method and system |
CN106873945A (en) * | 2016-12-29 | 2017-06-20 | 中山大学 | Data processing architecture and data processing method based on batch processing and Stream Processing |
CN110362600A (en) * | 2019-07-22 | 2019-10-22 | 广西大学 | A kind of random ordering data flow distribution aggregate query method, system and medium |
CN113435989A (en) * | 2021-06-25 | 2021-09-24 | 中国工商银行股份有限公司 | Financial data processing method and device |
CN114281508A (en) * | 2021-12-29 | 2022-04-05 | 江苏达科信息科技有限公司 | Data batch-flow fusion offline calculation method |
CN116382892A (en) * | 2023-02-08 | 2023-07-04 | 深圳市融聚汇信息科技有限公司 | Load balancing method and device based on multi-cloud fusion and cloud service |
Non-Patent Citations (3)
Title |
---|
Liqing Zhao ; Bo Cheng ; Junliang Chen.LambDP: Data Processing Framework for Terminal Applications in IoTs Services.IEEE Xplore.2020,全文. * |
分布式流处理技术综述;崔星灿;禹晓辉;刘洋;吕朝阳;;计算机研究与发展(02);全文 * |
大数据流计算特点及"单一窗口"适用场景探讨;孙学忠;胡伟;;中国口岸科学技术(08);全文 * |
Also Published As
Publication number | Publication date |
---|---|
CN116841753A (en) | 2023-10-03 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
AU2021232839A1 (en) | Updating Attribute Data Structures to Indicate Trends in Attribute Data Provided to Automated Modelling Systems | |
WO2018099084A1 (en) | Method, device, chip and system for training neural network model | |
CN104850727B (en) | Distributed big data system risk appraisal procedure based on Cloud focus theory | |
CN114143326B (en) | Load adjustment method, management node, and storage medium | |
CN109445936B (en) | Cloud computing load clustering method and system and electronic equipment | |
CN112398700B (en) | Service degradation method and device, storage medium and computer equipment | |
CN113228574A (en) | Computing resource scheduling method, scheduler, internet of things system and computer readable medium | |
CN116841753B (en) | Stream processing and batch processing switching method and switching device | |
CN113516275A (en) | Power distribution network ultra-short term load prediction method and device and terminal equipment | |
CN112365070A (en) | Power load prediction method, device, equipment and readable storage medium | |
Chen et al. | Graph deep factors for forecasting with applications to cloud resource allocation | |
Xu et al. | Dynamic backup workers for parallel machine learning | |
CN109213965B (en) | System capacity prediction method, computer readable storage medium and terminal device | |
US20080195447A1 (en) | System and method for capacity sizing for computer systems | |
CN116382892B (en) | Load balancing method and device based on multi-cloud fusion and cloud service | |
CN114035906B (en) | Virtual machine migration method and device, electronic equipment and storage medium | |
CN116360921A (en) | Cloud platform resource optimal scheduling method and system for electric power Internet of things | |
Singh et al. | A feature extraction and time warping based neural expansion architecture for cloud resource usage forecasting | |
CN110580192B (en) | Container I/O isolation optimization method in mixed scene based on service characteristics | |
CN115994029A (en) | Container resource scheduling method and device | |
CN113467892A (en) | Distributed cluster resource configuration method and corresponding device, equipment and medium | |
CN118394534B (en) | Method, system, product, device and medium for expanding and shrinking volume of cluster application service | |
CN117726304B (en) | Project progress prediction and project resource allocation recommendation method | |
CN118331750B (en) | Dynamic resource allocation system, electronic equipment and storage medium for processing network threat | |
CN118215080B (en) | Edge computing task distribution method, controller and system based on soft definition network |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |