CN112307057A - Data processing method and device, electronic equipment and computer storage medium - Google Patents
Data processing method and device, electronic equipment and computer storage medium Download PDFInfo
- Publication number
- CN112307057A CN112307057A CN202011162997.2A CN202011162997A CN112307057A CN 112307057 A CN112307057 A CN 112307057A CN 202011162997 A CN202011162997 A CN 202011162997A CN 112307057 A CN112307057 A CN 112307057A
- Authority
- CN
- China
- Prior art keywords
- aggregation
- index
- target
- metric
- data
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
- 238000003672 processing method Methods 0.000 title claims abstract description 14
- 230000002776 aggregation Effects 0.000 claims abstract description 183
- 238000004220 aggregation Methods 0.000 claims abstract description 183
- 238000012545 processing Methods 0.000 claims abstract description 61
- 238000005259 measurement Methods 0.000 claims abstract description 54
- 238000000034 method Methods 0.000 claims abstract description 36
- 230000006870 function Effects 0.000 claims description 90
- 238000005096 rolling process Methods 0.000 claims description 14
- 230000004931 aggregating effect Effects 0.000 claims description 10
- 238000004590 computer program Methods 0.000 claims description 10
- 238000012935 Averaging Methods 0.000 claims description 6
- 238000012216 screening Methods 0.000 claims description 5
- 238000012544 monitoring process Methods 0.000 abstract description 10
- 238000005516 engineering process Methods 0.000 abstract description 4
- 238000004364 calculation method Methods 0.000 description 8
- 230000008569 process Effects 0.000 description 7
- 238000013500 data storage Methods 0.000 description 4
- 238000013515 script Methods 0.000 description 4
- 238000010586 diagram Methods 0.000 description 3
- 230000003287 optical effect Effects 0.000 description 3
- 230000003068 static effect Effects 0.000 description 3
- 238000004458 analytical method Methods 0.000 description 2
- 238000004891 communication Methods 0.000 description 2
- 230000005055 memory storage Effects 0.000 description 2
- 238000012986 modification Methods 0.000 description 2
- 230000004048 modification Effects 0.000 description 2
- 230000009467 reduction Effects 0.000 description 2
- 230000002159 abnormal effect Effects 0.000 description 1
- 230000009471 action Effects 0.000 description 1
- 230000005540 biological transmission Effects 0.000 description 1
- 238000001914 filtration Methods 0.000 description 1
- 230000014509 gene expression Effects 0.000 description 1
- 230000002093 peripheral effect Effects 0.000 description 1
- 230000000750 progressive effect Effects 0.000 description 1
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/245—Query processing
- G06F16/2453—Query optimisation
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/245—Query processing
- G06F16/2458—Special types of queries, e.g. statistical queries, fuzzy queries or distributed queries
- G06F16/2474—Sequence data queries, e.g. querying versioned data
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/90—Details of database functions independent of the retrieved data types
- G06F16/95—Retrieval from the web
- G06F16/953—Querying, e.g. by the use of web search engines
- G06F16/9537—Spatial or temporal dependent retrieval, e.g. spatiotemporal queries
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/50—Network services
- H04L67/51—Discovery or management thereof, e.g. service location protocol [SLP] or web services
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Physics & Mathematics (AREA)
- Data Mining & Analysis (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computational Linguistics (AREA)
- Signal Processing (AREA)
- Computer Networks & Wireless Communication (AREA)
- Fuzzy Systems (AREA)
- Mathematical Physics (AREA)
- Probability & Statistics with Applications (AREA)
- Software Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
The invention discloses a data processing method and device, electronic equipment and a computer storage medium, and relates to the field of computers, wherein the method comprises the following steps: acquiring index data of a time sequence database instance through a distributed publish-subscribe message system; generating an aggregation identifier based on a metric index contained in the index data and a tag set corresponding to the metric index, wherein the tag set is used for indicating an aggregation function type for processing the metric index; and carrying out aggregation processing on the measurement indexes according to the aggregation identifier and a preset aggregation time window. The invention solves the technical problems of reduced performance, low working efficiency and the like of the time sequence database instance caused by the need of collecting a large amount of monitoring indexes in the related technology.
Description
Technical Field
The present invention relates to the field of computers, and in particular, to a data processing method and apparatus, an electronic device, and a computer storage medium.
Background
At present, an infiluxdb time sequence database is used in a big data monitoring system, and the basic monitoring and application monitoring acquisition period is 10 seconds. Along with the increase of services, collected monitoring indexes are more and more, so that the storage pressure of the InfluxDB data is high, the data storage is often accumulated due to overlarge data amount, the query performance of an InfluxDB database is seriously influenced, and the monitoring data query of a monitoring panel is slow; and because the alarm system also depends on the time sequence data in the InfluxDB time sequence database, the problems of false alarm, missing alarm and the like easily occur to alarm.
In the related technology, by using an self-contained aggregation scheme of the InfluxDB, the InfluxDB automatically executes a script according to a configured time period to perform data aggregation and rewrites the aggregated data into the InfluxDB, and the InfluxDB CQ aggregation scheme (fully called Continue Query) has a large influence on the Query and write performance of the InfluxDB. However, the expansion performance of the scheme is low, and a new CQ needs to be created for the new indexes.
In another scheme, historical data is aggregated by using a kapator (which is an open source framework and is used for processing, monitoring and warning time sequence data), and the storage capacity of the historical data is reduced by aggregating the data by using a kapator writing batch processing and stream processing script. However, the Kapacitor is used for aggregation processing, and the Kapacitor compiling task script is poor in fault tolerance and easy to cause task non-operation; the aggregation task also consumes a large amount of memory resources of the kapacitor; moreover, the kapacitor is a single-point system and is weak in stability; the extensibility is not good, and new aggregation scripts need to be rewritten for new indexes.
In view of the above problems in the related art, no effective solution has been found at present.
Disclosure of Invention
The technical problems to be solved by the embodiments of the present invention are that the performance of the time sequence database instance is reduced and the working efficiency is low due to the need of collecting a large amount of index data in the related art. The embodiment of the invention provides a data processing method and device, electronic equipment and a computer storage medium.
According to an embodiment of the present invention, there is provided a data processing method including: acquiring index data of a time sequence database instance through a distributed publish-subscribe message system; generating an aggregation identifier based on a metric index contained in the index data and a tag set corresponding to the metric index, wherein the tag set is used for indicating an aggregation function type for processing the metric index; and carrying out aggregation processing on the measurement indexes according to the aggregation identifier and a preset aggregation time window.
Optionally, the generating an aggregation identifier based on the metric index included in the index data and the label set corresponding to the metric index includes: determining a plurality of databases contained in the time series database instance and a plurality of metric indicators contained in each database; analyzing a target measurement index through a time sequence database row protocol to obtain a first identification code of the target measurement index and a target aggregation function type corresponding to the target measurement index, wherein the tag set comprises a plurality of aggregation function types; and generating an aggregation identifier of the target metric index according to a combination of a second identification code of a target database, the first identification code and the target aggregation function type, wherein the target database is any one of the databases, and the target metric index is any one of the metric indexes stored in the target database.
Optionally, the method further includes: screening the label set based on the target aggregation function type to reserve a label corresponding to the target aggregation function type, wherein the target aggregation function type at least includes one of the following: a summation function, a maximum function, and an averaging function.
Optionally, the aggregating the metric index according to the aggregation identifier and a preset aggregation time window includes: determining a target aggregation function type for processing the metric index according to the aggregation identifier; setting a timed rolling window for processing the metric index based on the preset aggregation time window; and aggregating the metric indexes based on the timed rolling window and the target aggregation function type.
Optionally, the aggregating the metric index based on the timed rolling window and the target aggregation function type includes: analyzing the measurement index according to the time sequence database row protocol to obtain a field value corresponding to the measurement index; and calculating the field value according to the type of the target aggregation function to obtain an aggregation value.
Optionally, the method further includes: and outputting the aggregation value to a proxy server of the time sequence database instance according to a field form.
Optionally, the method further includes: comparing the aggregate value with a preset value; and if the aggregation value is greater than or equal to the preset value, triggering an alarm message.
According to an embodiment of the present invention, there is provided a data processing apparatus including: the acquisition module is used for acquiring index data of a time sequence database instance through a distributed publish-subscribe message system; a generating module, configured to generate an aggregation identifier based on a metric indicator included in the indicator data and a tag set corresponding to the metric indicator, where the tag set is used to indicate a type of an aggregation function for processing the metric indicator; and the processing module is used for carrying out aggregation processing on the measurement indexes according to the aggregation identifier and a preset aggregation time window.
Optionally, the generating module includes: a first determining unit, configured to determine a plurality of databases included in the time-series database instance and a plurality of metric indexes included in each database; the analysis unit is used for analyzing a target measurement index through a time sequence database row protocol to obtain a first identification code of the target measurement index and a target aggregation function type corresponding to the target measurement index, wherein the tag set comprises a plurality of aggregation function types; and the generating unit is used for generating the aggregation identifier of the target metric index according to the combination of the second identifier of the target database, the first identifier and the target aggregation function type, wherein the target database is any one of the plurality of databases, and the target metric index is any one of the metric indexes stored in the target database.
Optionally, the apparatus further comprises: a screening module, configured to screen the tag set based on the target aggregation function type to reserve a tag corresponding to the target aggregation function type, where the target aggregation function type at least includes one of: a summation function, a maximum function, and an averaging function.
Optionally, the processing module includes: a second determining unit, configured to determine, according to the aggregation identifier, a target aggregation function type for processing the metric indicator; a setting unit, configured to set a timed rolling window for processing the metric index based on the preset aggregation time window; and the processing unit is used for carrying out aggregation processing on the metric indexes based on the timing rolling window and the target aggregation function type.
Optionally, the processing module includes: the acquisition unit is used for analyzing the measurement index according to the time sequence database row protocol to acquire a field value corresponding to the measurement index; and the calculation unit is used for calculating the field value according to the type of the target aggregation function to obtain an aggregation value.
Optionally, the apparatus further comprises: and the output module is used for outputting the aggregation value to the proxy server of the time sequence database instance according to a field form.
Optionally, the apparatus further comprises: the comparison module is used for comparing the aggregation value with a preset value; and the alarm module is used for triggering an alarm message when the aggregation value is greater than or equal to the preset value.
According to yet another embodiment of the present invention, there is also provided an electronic device, including a memory in which a computer program is stored and a processor configured to execute the computer program to perform the steps in any of the above method embodiments.
According to a further embodiment of the present invention, there is also provided a computer storage medium having a computer program stored thereon, wherein the computer program is arranged to perform the steps in any of the above apparatus embodiments when executed.
According to the embodiment of the invention, all index data of the time sequence database instance are subscribed through the distributed publishing and subscribing message system; generating an aggregation identifier based on the measurement indexes contained in the index data and the aggregation function types in the measurement indexes, wherein different measurement indexes correspond to different aggregation strategies; the measurement indexes are aggregated according to the aggregation strategy and the preset aggregation time window, so that the aggregation processing of the index data based on the time dimension is realized, the index data storage amount and the calculated amount are greatly reduced, the query efficiency of the index data is improved, and the query performance of the time sequence database instance is further improved, so that the technical problems that the time sequence database instance performance is reduced, the working efficiency is low and the like due to the fact that a large amount of index data needs to be collected in the related technology are solved, and the performance and the working efficiency of the time sequence database instance are improved.
The technical solution of the present invention is further described in detail by the accompanying drawings and embodiments.
Drawings
The accompanying drawings, which are incorporated in and constitute a part of this specification, illustrate embodiments of the invention and together with the description, serve to explain the principles of the invention.
The invention will be more clearly understood from the following detailed description, taken with reference to the accompanying drawings, in which:
FIG. 1 is a flow chart of a method of processing data according to an embodiment of the invention;
FIG. 2 is a flow chart of a method for providing data processing according to an embodiment of the present invention;
fig. 3 is a block diagram of a configuration of a data processing apparatus according to an embodiment of the present invention;
FIG. 4 is a block diagram of an electronic device according to an embodiment of the present application.
Detailed Description
Various exemplary embodiments of the present invention will now be described in detail with reference to the accompanying drawings. It should be noted that: the relative arrangement of the components and steps, the numerical expressions and numerical values set forth in these embodiments do not limit the scope of the present invention unless specifically stated otherwise.
Meanwhile, it should be understood that the sizes of the respective portions shown in the drawings are not drawn in an actual proportional relationship for the convenience of description.
The following description of at least one exemplary embodiment is merely illustrative in nature and is in no way intended to limit the invention, its application, or uses.
Techniques, methods, and apparatus known to those of ordinary skill in the relevant art may not be discussed in detail but are intended to be part of the specification where appropriate.
It should be noted that: like reference numbers and letters refer to like items in the following figures, and thus, once an item is defined in one figure, further discussion thereof is not required in subsequent figures.
Embodiments of the invention are operational with numerous other general purpose or special purpose computing system environments or configurations. Examples of well known computing systems, environments, and/or configurations that may be suitable for use with the computer system/server include, but are not limited to: personal computer systems, server computer systems, thin clients, thick clients, hand-held or laptop devices, microprocessor-based systems, set top boxes, programmable consumer electronics, network pcs, minicomputer systems, mainframe computer systems, distributed cloud computing environments that include any of the above systems, and the like.
The computer system/server may be described in the general context of computer system-executable instructions, such as program modules, being executed by a computer system. Generally, program modules may include routines, programs, objects, components, logic, data structures, etc. that perform particular tasks or implement particular abstract data types. The computer system/server may be practiced in distributed cloud computing environments where tasks are performed by remote processing devices that are linked through a communications network. In a distributed cloud computing environment, program modules may be located in both local and remote computer system storage media including memory storage devices.
The computer system/server may be described in the general context of computer system-executable instructions, such as program modules, being executed by a computer system. Generally, program modules may include routines, programs, objects, components, logic, data structures, etc. that perform particular tasks or implement particular abstract data types. The computer system/server may be practiced in distributed cloud computing environments where tasks are performed by remote processing devices that are linked through a communications network. In a distributed cloud computing environment, program modules may be located in both local and remote computer system storage media including memory storage devices.
In the present embodiment, a data processing method is provided, and fig. 1 is a flowchart of a data processing method according to an embodiment of the present invention, as shown in fig. 1, the flowchart includes the following steps:
step S102, index data of a time sequence database instance is obtained through a distributed publishing and subscribing message system;
step S104, generating an aggregation identifier based on the metric indexes contained in the index data and a label set corresponding to the metric indexes, wherein the label set is used for indicating the aggregation function type of the metric indexes;
and step S106, carrying out aggregation processing on the measurement indexes according to the aggregation identifiers and the preset aggregation time window.
According to the data processing method provided by the embodiment of the invention, all index data of a time sequence database instance are subscribed through a distributed publishing and subscribing message system; generating an aggregation identifier based on the measurement indexes contained in the index data and the aggregation function types in the measurement indexes, wherein different measurement indexes correspond to different aggregation strategies; the measurement indexes are aggregated according to the aggregation strategy and the preset aggregation time window, so that the aggregation processing of the index data based on the time dimension is realized, the index data storage amount and the calculated amount are greatly reduced, the query efficiency of the index data is improved, and the query performance of the time sequence database instance is further improved, so that the technical problems that the time sequence database instance performance is reduced, the working efficiency is low and the like due to the fact that a large amount of index data needs to be collected in the related technology are solved, and the performance and the working efficiency of the time sequence database instance are improved.
The data processing method provided by the embodiment of the invention is applied to the Flink, the Apache Flink is a framework and a distributed processing engine, is used for performing stateful calculation on unbounded and bounded data streams, and can be used for batch processing, namely processing static data sets and historical data sets; it can also be used for stream processing, i.e. processing some real-time data streams in real-time, producing results of the data in real-time. In this embodiment, first, all index data of the time-series database instance infiluxdb in kafka, which is a high-throughput distributed publish-subscribe messaging system that can process all action flow data in a customer-scale website, is read in real time by Flink, and infiluxdb, which is a time-series database for processing mass writing and load query. The InfluxDB is intended to be used as a back-end storage for any use case involving large amounts of time-stamped data, including DevOps monitoring, application metrics, Internet of things sensor data and real-time analytics.
Then, the index data is analyzed through an InfluxDB line protocol, and a metering index metric (namely the above-mentioned metering index) is extracted, wherein the InfluxDB line protocol is a text format for writing data points into InfluxDB, and the data points adopting the format can be successfully analyzed and written by the InfluxDB. The format of the infiuxdb line protocol is as follows:
+-----------+--------+-+---------+-+---------+
|measurement|,tag_set||field_set||timestamp|
+-----------+--------+-+---------+-+---------+
wherein, measurement is a container of fields, tags and time columns, and the name of measurement is used for describing field data stored in the measurement; the field _ set is used for storing a field of index data, in the InfluxDB, the field must exist, and if the field is used as a query condition, all field values meeting the query condition are scanned; tag _ set index identifier (i.e. the tag set) for informing that the index data needs to be counted, averaged, summed and the like (i.e. the aggregation function type), i.e. aggregation policy; timestamp is a timestamp, infixdb is a Time sequence database, and the data of infixdb has a column named Time, and a UTC (Coordinated Universal Time) timestamp, that is, the Time for reading index data from kafka, is stored in the data.
Further, generating an aggregation identifier according to the name of the metering index metric and tag _ set; and performing aggregation operation on the index data according to the set aggregation time window and an aggregation strategy in the aggregation identifier.
In a possible implementation manner of this embodiment, all the infilxdb index data are subscribed by the kafka, and the Flink stream calculation is adopted, so that high-performance and high-throughput data transmission of index data access is realized, distributed data consumption is supported, offline processing and real-time data processing are supported, and the expansion performance is high.
In a possible implementation manner provided in the embodiment of the present invention, generating an aggregation identifier based on a metric index included in index data and a tag set corresponding to the metric index includes: determining a plurality of databases contained in the time sequence database instance and a plurality of measurement indexes contained in each database; analyzing the target measurement index through a time sequence database row protocol to obtain a first identification code of the target measurement index and a target aggregation function type corresponding to the target measurement index, wherein the tag set comprises a plurality of aggregation function types; and generating an aggregation identifier of the target metric index according to the combination of the second identification code, the first identification code and the target aggregation function type of the target database, wherein the target database is any one of the plurality of databases, and the target metric index is any one of the metric indexes stored in the target database.
In an embodiment of the present disclosure, the infiux db is a time sequence database instance, which may include a plurality of databases db (i.e., databases), a stack of measurement elements (i.e., the measurement indicators) may be below one database db, and a tag set (i.e., the tag set), a fields set (i.e., the indicator fields), and a timestamp may be inside the measurement elements, that is, the time sequence database instance and the databases are in a one-to-many relationship. In this embodiment, the aggregation identifier is generated according to the ID of the database (i.e., the second identifier), the name of the measurement index measurement (i.e., the first identifier of the measurement index), and the tag set, and is used as the key (key name) of the aggregation data.
In one possible implementation manner provided in the embodiment of the present invention, the method further includes: screening the label set based on the target aggregation function type to reserve labels corresponding to the target aggregation function type, wherein the target aggregation function type at least comprises one of the following items: a summation function, a maximum function, and an averaging function.
Optionally, the target aggregation function in this embodiment may be a combination of at least two functions of the aggregation functions, and the field value is calculated according to the combination function. According to the embodiment of the invention, the aggregation processing of the index data according to various aggregation strategies is realized, so that the calculation performance of the index data is improved.
In one embodiment of the present disclosure, the aggregation function includes at least: FILL (), intageral (), stream (), SUM (), MEAN (), media (), COUNT (), etc., functions, not to name a few, where the function FILL () represents a fixed value padding for a field (field); the function INTEGRAL () represents the INTEGRAL value that returns all the values in one field (field); the function SPREAD () represents the difference between the minimum and maximum values of the return field (field); the function SUM () represents the SUM of all values in one field that is returned; the function MEAN () represents the arithmetic MEAN (average) that returns the value in one field (field); function media () returns a MEDIAN value (MEDIAN) from the sorted values in a single field (field); the function COUNT () represents the number of non-null values returned in a (field) field.
In a possible implementation manner provided in the embodiment of the present invention, the aggregating the metric according to the aggregation identifier and the preset aggregation time window includes: determining a target aggregation function type of the processing metric index according to the aggregation identifier; setting a timed rolling window of the processing metric index based on a preset aggregation time window; and aggregating the measurement indexes based on the timing rolling window and the target aggregation function type.
Optionally, the aggregating the metric index based on the timed rolling window and the target aggregation function type includes: analyzing the measurement index according to a time sequence database row protocol to obtain a field value corresponding to the measurement index; and calculating the field value according to the type of the target aggregation function to obtain an aggregation value.
According to the embodiment, the monitoring acquisition period of the current infiluxdb time-series database application is 10 seconds, for example, the original time-series database is to gather data within 10 seconds into one point (i.e. to store in a storage area, for example, this is a row of data), and process 6 points in 1 minute, and if the data is to be stored for 5 minutes, process 30 points, and the calculation amount is relatively large, and the operation is complex. Through the embodiment of the invention, based on reasonable configuration, a preset aggregation time window (for example, 5 minutes) is set, namely, data of 30 points are aggregated and stored in a storage area to be used as a line of data; aggregation of 10 seconds to 5 minutes by time dimension aggregation is equivalent to shrinking the original data storage to 1/30.
Secondly, unnecessary tags are filtered out through configuration, machine information (namely the tags) in the indexes is removed according to respective service characteristics, after time and the useless tags are filtered, the data volume is reduced to 100 times of the original data volume, the storage volume is greatly reduced, the historical data query efficiency is improved to a great extent, and more than 1 year or even permanent storage of the historical data is realized.
In one possible implementation manner provided in the embodiment of the present invention, the method further includes: and outputting the aggregation value to a proxy server of the time-series database instance according to a field form. In this embodiment, the aggregation result (i.e., the aggregation value) obtained according to the above embodiment is written back to the infixdb-proxy (i.e., the time-series database instance-proxy server) according to the field format (i.e., the above row protocol format), so as to complete the dimension reduction and aggregation of the index data. In the embodiment, the data writing infiluxdb agent writes kafka first, and then splits the data in the consumption kafka to each real infiluxdb instance by maintaining the routing table and the infiluxdb instance table, which supports horizontal capacity expansion and main-standby copy.
In one possible implementation manner provided in the embodiment of the present invention, the method further includes: comparing the aggregation value with a preset value; and if the aggregation value is greater than or equal to the preset value, triggering an alarm message. In this embodiment, an alarm is performed based on the Flink stream processing, for example, if the abnormal amount within the preset 5 minutes (i.e., the aggregation time window) is set to exceed 200 (i.e., the preset value), the alarm operation may be performed in the infixdb-proxy, or may be performed in the Flink after the aggregation value is obtained.
To further explain the embodiment of the present invention in conjunction with an embodiment, fig. 2 is a flowchart of a data processing method according to an embodiment of the present invention, and referring to fig. 2, index information of infiluxdb is read from kafka in real time based on Flink; acquiring time sequence time (namely index time) from the index information, setting an aggregation time window (for example, 5 minutes) according to configuration, analyzing a metric measurement index in the index information through an inflxdb row protocol to obtain the name, the timestamp and an aggregation strategy (namely, the tag set) of the measurement index, generating an aggregation identifier based on tag combination, acquiring the aggregation identifier according to the index, and using the aggregation identifier as a key of aggregation data; and then filtering out unwanted tags by configuration, setting a timed rolling window, finally carrying out aggregation by using different aggregators according to the index type (namely the target aggregation function type), such as summation aggregation, average aggregation, or coincidence aggregation (such as averaging and then summing), and the like, and then writing back the data in an inflixdb-proxy agent to complete data dimension reduction aggregation.
By the steps of the embodiment, by utilizing the characteristics of flink real-time calculation, low delay, high availability and strong stability, the flink real-time data processing engine can distribute the processing aggregation processes to different flink task nodes to execute aggregation logic, and finally unified results are output; and moreover, the function of storing the calculation state (recording the position of the consumer consumption data so as to continue breakpoint tasks) is provided at regular time, when the aggregation task is interrupted by a short time of network, the aggregation calculation can be automatically replied, when the code is required to be upgraded, a snapshot can be created while the task is stopped, the task can continue to start the aggregation task at the breakpoint, the breakpoint condition can not exist, and the aggregation result can be acquired more stably.
In this embodiment, a data processing apparatus is further provided, and the apparatus is used to implement the foregoing embodiments and preferred embodiments, and details of which have been already described are omitted. As used below, the term "module" may be a combination of software and/or hardware that implements a predetermined function. Although the means described in the embodiments below are preferably implemented in software, an implementation in hardware, or a combination of software and hardware is also possible and contemplated.
Fig. 3 is a block diagram of a data processing apparatus according to an embodiment of the present invention, as shown in fig. 3, the apparatus including: the acquiring module 30 is configured to acquire index data of a time sequence database instance through a distributed publish-subscribe message system; a generating module 32, connected to the obtaining module 30, configured to generate an aggregation identifier based on a metric index included in the index data and a tag set corresponding to the metric index, where the tag set is used to indicate a type of an aggregation function for processing the metric index; and the processing module 34 is connected to the generating module 32, and is configured to aggregate the metric indexes according to the aggregation identifier and the preset aggregation time window.
A possible implementation manner is provided in the embodiment of the present application, and the generating module 32 includes: a first determination unit configured to determine a plurality of databases included in the time-series database instance and a plurality of metric indexes included in each database; the analysis unit is used for analyzing the target measurement index through a time sequence database row protocol to obtain a first identification code of the target measurement index and a target aggregation function type corresponding to the target measurement index, wherein the tag set comprises a plurality of aggregation function types; and the generating unit is used for generating the aggregation identifier of the target metric index according to the combination of the second identifier, the first identifier and the target aggregation function type of the target database, wherein the target database is any one of the plurality of databases, and the target metric index is any one of the metric indexes stored in the target database.
A possible implementation manner is provided in the embodiment of the present application, and the apparatus further includes: the screening module is configured to screen the tag set based on the target aggregation function type to reserve a tag corresponding to the target aggregation function type, where the target aggregation function type at least includes one of: a summation function, a maximum function, and an averaging function.
In the embodiment of the present application, a possible implementation manner is provided, and the processing module 34 includes: a second determining unit, configured to determine a target aggregation function type of the processing metric according to the aggregation identifier; a setting unit for setting a timed rolling window of the process metric index based on a preset aggregation time window; and the processing unit is used for carrying out aggregation processing on the measurement indexes based on the timing rolling window and the target aggregation function type.
In the embodiment of the present application, a possible implementation manner is provided, and the processing module 34 includes: the acquisition unit is used for analyzing the measurement indexes according to a row protocol of a time sequence database to acquire field values corresponding to the measurement indexes; and the calculating unit is used for calculating the field value according to the target aggregation function type to obtain an aggregation value.
A possible implementation manner is provided in the embodiment of the present application, and the apparatus further includes: and the output module is used for outputting the aggregation value to the proxy server of the time sequence database instance according to the field form.
A possible implementation manner is provided in the embodiment of the present application, and the apparatus further includes: the comparison module is used for comparing the aggregation value with a preset value; and the alarm module is used for triggering an alarm message when the aggregation value is greater than or equal to a preset value.
It should be noted that, the above modules may be implemented by software or hardware, and for the latter, the following may be implemented, but not limited to: the modules are all positioned in the same processor; alternatively, the modules are respectively located in different processors in any combination.
In an exemplary embodiment, there is provided an electronic device, as shown in fig. 4, the electronic device 400 shown in fig. 4 including: a processor 401 and a memory 403. Wherein the processor 401 is coupled to the memory 403, such as via a bus 402. Optionally, the electronic device 400 may also include a transceiver 404. It should be noted that the transceiver 404 is not limited to one in practical applications, and the structure of the electronic device 400 is not limited to the embodiment of the present application.
The Processor 401 may be a CPU (Central Processing Unit), a general purpose Processor, a DSP (Digital Signal Processor), an ASIC (Application Specific Integrated Circuit), an FPGA (Field Programmable Gate Array) or other Programmable logic device, a transistor logic device, a hardware component, or any combination thereof. Which may implement or perform the various illustrative logical blocks, modules, and circuits described in connection with the disclosure. The processor 401 may also be a combination of computing functions, e.g., comprising one or more microprocessors, a combination of a DSP and a microprocessor, or the like.
The Memory 403 may be a ROM (Read Only Memory) or other type of static storage device that can store static information and instructions, a RAM (Random Access Memory) or other type of dynamic storage device that can store information and instructions, an EEPROM (Electrically Erasable Programmable Read Only Memory), a CD-ROM (Compact Disc Read Only Memory) or other optical Disc storage, optical Disc storage (including Compact Disc, laser Disc, optical Disc, digital versatile Disc, blu-ray Disc, etc.), a magnetic Disc storage medium or other magnetic storage device, or any other medium that can be used to carry or store desired program code in the form of instructions or data structures and that can be accessed by a computer, but is not limited to these.
The memory 403 is used for storing application program codes for executing the scheme of the application, and the execution is controlled by the processor 401. Processor 401 is configured to execute application program code stored in memory 403 to implement the aspects illustrated in the foregoing method embodiments.
Among them, electronic devices include but are not limited to: mobile terminals such as mobile phones, notebook computers, digital broadcast receivers, PDAs (personal digital assistants), PADs (tablet computers), PMPs (portable multimedia players), in-vehicle terminals (e.g., in-vehicle navigation terminals), and the like, and fixed terminals such as digital TVs, desktop computers, and the like. The electronic device shown in fig. 4 is only an example, and should not bring any limitation to the functions and the scope of use of the embodiments of the present application.
Based on the same inventive concept, the present application further provides a computer-readable storage medium, in which a computer program is stored, where the computer program is configured to execute the data processing method of any one of the above embodiments when running.
It can be clearly understood by those skilled in the art that the specific working processes of the system, the apparatus, and the module described above may refer to the corresponding processes in the foregoing method embodiments, and for the sake of brevity, the detailed description is omitted here.
In the present specification, the embodiments are described in a progressive manner, each embodiment focuses on differences from other embodiments, and the same or similar parts in the embodiments are referred to each other. For the system embodiment, since it basically corresponds to the method embodiment, the description is relatively simple, and for the relevant points, reference may be made to the partial description of the method embodiment.
The method and system of the present invention may be implemented in a number of ways. For example, the methods and systems of the present invention may be implemented in software, hardware, firmware, or any combination of software, hardware, and firmware. The above-described order for the steps of the method is for illustrative purposes only, and the steps of the method of the present invention are not limited to the order specifically described above unless specifically indicated otherwise. Furthermore, in some embodiments, the present invention may also be embodied as a program recorded in a recording medium, the program including machine-readable instructions for implementing a method according to the present invention. Thus, the present invention also covers a recording medium storing a program for executing the method according to the present invention.
The description of the present invention has been presented for purposes of illustration and description, and is not intended to be exhaustive or limited to the invention in the form disclosed. Many modifications and variations will be apparent to practitioners skilled in this art. The embodiment was chosen and described in order to best explain the principles of the invention and the practical application, and to enable others of ordinary skill in the art to understand the invention for various embodiments with various modifications as are suited to the particular use contemplated.
Claims (10)
1. A method for processing data, comprising:
acquiring index data of a time sequence database instance through a distributed publish-subscribe message system;
generating an aggregation identifier based on a metric index contained in the index data and a tag set corresponding to the metric index, wherein the tag set is used for indicating an aggregation function type for processing the metric index;
and carrying out aggregation processing on the measurement indexes according to the aggregation identifier and a preset aggregation time window.
2. The method according to claim 1, wherein the generating an aggregate identifier based on the metric index included in the index data and the label set corresponding to the metric index comprises:
determining a plurality of databases contained in the time series database instance and a plurality of metric indicators contained in each database;
analyzing a target measurement index through a time sequence database row protocol to obtain a first identification code of the target measurement index and a target aggregation function type corresponding to the target measurement index, wherein the tag set comprises a plurality of aggregation function types;
and generating an aggregation identifier of the target metric index according to a combination of a second identification code of a target database, the first identification code and the target aggregation function type, wherein the target database is any one of the databases, and the target metric index is any one of the metric indexes stored in the target database.
3. The method of claim 2, further comprising:
screening the label set based on the target aggregation function type to reserve a label corresponding to the target aggregation function type, wherein the target aggregation function type at least includes one of the following: a summation function, a maximum function, and an averaging function.
4. The method according to claim 1, wherein the aggregating the metric according to the aggregation identifier and a preset aggregation time window comprises:
determining a target aggregation function type for processing the metric index according to the aggregation identifier;
setting a timed rolling window for processing the metric index based on the preset aggregation time window;
and aggregating the metric indexes based on the timed rolling window and the target aggregation function type.
5. The method of claim 4, wherein said aggregating the metric based on the timed rolling window and the target aggregation function type comprises:
analyzing the measurement index according to the time sequence database row protocol to obtain a field value corresponding to the measurement index;
and calculating the field value according to the type of the target aggregation function to obtain an aggregation value.
6. The method of claim 5, further comprising:
and outputting the aggregation value to a proxy server of the time sequence database instance according to a field form.
7. The method of claim 5 or 6, further comprising:
comparing the aggregate value with a preset value;
and if the aggregation value is greater than or equal to the preset value, triggering an alarm message.
8. An apparatus for processing data, comprising:
the acquisition module is used for acquiring index data of a time sequence database instance through a distributed publish-subscribe message system;
a generating module, configured to generate an aggregation identifier based on a metric indicator included in the indicator data and a tag set corresponding to the metric indicator, where the tag set is used to indicate a type of an aggregation function for processing the metric indicator;
and the processing module is used for carrying out aggregation processing on the measurement indexes according to the aggregation identifier and a preset aggregation time window.
9. An electronic device, comprising a processor and a memory, wherein the memory has stored therein a computer program, the processor being configured to execute the computer program to perform the data processing method of any one of claims 1 to 7.
10. A computer storage medium, in which a computer program is stored, wherein the computer program is configured to perform the data processing method of any one of claims 1 to 7 when executed.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202011162997.2A CN112307057A (en) | 2020-10-27 | 2020-10-27 | Data processing method and device, electronic equipment and computer storage medium |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202011162997.2A CN112307057A (en) | 2020-10-27 | 2020-10-27 | Data processing method and device, electronic equipment and computer storage medium |
Publications (1)
Publication Number | Publication Date |
---|---|
CN112307057A true CN112307057A (en) | 2021-02-02 |
Family
ID=74330906
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202011162997.2A Pending CN112307057A (en) | 2020-10-27 | 2020-10-27 | Data processing method and device, electronic equipment and computer storage medium |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN112307057A (en) |
Cited By (11)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN113084388A (en) * | 2021-03-29 | 2021-07-09 | 广州明珞装备股份有限公司 | Welding quality detection method, system, device and storage medium |
CN113342817A (en) * | 2021-06-23 | 2021-09-03 | 蘑菇物联技术(深圳)有限公司 | Data down-sampling method, device and system and computer readable storage medium |
CN113468248A (en) * | 2021-09-06 | 2021-10-01 | 天津中新智冠信息技术有限公司 | Data statistical method, device, equipment and storage medium |
CN113672396A (en) * | 2021-10-25 | 2021-11-19 | 中电云数智科技有限公司 | Streaming computing job processing method and device |
CN113742341A (en) * | 2021-08-25 | 2021-12-03 | 杭州安恒信息技术股份有限公司 | Time series data aggregation method and device, computer equipment and storage medium |
CN114547073A (en) * | 2022-02-10 | 2022-05-27 | 清华大学 | Aggregation query method and device for time series data and storage medium |
CN114979186A (en) * | 2022-05-16 | 2022-08-30 | 浪潮云信息技术股份公司 | Flow link analysis method and system based on Flink component |
CN115037729A (en) * | 2022-04-21 | 2022-09-09 | 中国建设银行股份有限公司 | Data aggregation method and device, electronic equipment and computer readable medium |
WO2023020247A1 (en) * | 2021-08-17 | 2023-02-23 | 杭州涂鸦信息技术有限公司 | Method and apparatus for precision reduction of time series index data, and computer device |
CN116150183A (en) * | 2023-04-04 | 2023-05-23 | 深圳依时货拉拉科技有限公司 | Data query expression generation method, device, equipment and storage medium |
CN116911642A (en) * | 2023-09-12 | 2023-10-20 | 中国长江电力股份有限公司 | Multi-dimensional multi-state oriented hydroelectric generating set equipment index calculation system and method |
Citations (6)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN107015872A (en) * | 2016-12-09 | 2017-08-04 | 上海壹账通金融科技有限公司 | The processing method and processing device of monitoring data |
CN109800129A (en) * | 2019-01-17 | 2019-05-24 | 青岛特锐德电气股份有限公司 | A kind of real-time stream calculation monitoring system and method for processing monitoring big data |
CN110245158A (en) * | 2019-06-10 | 2019-09-17 | 上海理想信息产业(集团)有限公司 | A kind of multi-source heterogeneous generating date system and method based on Flink stream calculation technology |
US20200004868A1 (en) * | 2018-06-27 | 2020-01-02 | International Business Machines Corporation | Dynamic incremental updating of data cubes |
CN111274256A (en) * | 2020-01-20 | 2020-06-12 | 远景智能国际私人投资有限公司 | Resource control method, device, equipment and storage medium based on time sequence database |
CN111526060A (en) * | 2020-06-16 | 2020-08-11 | 网易(杭州)网络有限公司 | Method and system for processing service log |
-
2020
- 2020-10-27 CN CN202011162997.2A patent/CN112307057A/en active Pending
Patent Citations (6)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN107015872A (en) * | 2016-12-09 | 2017-08-04 | 上海壹账通金融科技有限公司 | The processing method and processing device of monitoring data |
US20200004868A1 (en) * | 2018-06-27 | 2020-01-02 | International Business Machines Corporation | Dynamic incremental updating of data cubes |
CN109800129A (en) * | 2019-01-17 | 2019-05-24 | 青岛特锐德电气股份有限公司 | A kind of real-time stream calculation monitoring system and method for processing monitoring big data |
CN110245158A (en) * | 2019-06-10 | 2019-09-17 | 上海理想信息产业(集团)有限公司 | A kind of multi-source heterogeneous generating date system and method based on Flink stream calculation technology |
CN111274256A (en) * | 2020-01-20 | 2020-06-12 | 远景智能国际私人投资有限公司 | Resource control method, device, equipment and storage medium based on time sequence database |
CN111526060A (en) * | 2020-06-16 | 2020-08-11 | 网易(杭州)网络有限公司 | Method and system for processing service log |
Non-Patent Citations (1)
Title |
---|
姚怡 等: "《大学计算机基础》", 31 July 2020, 北京:中国铁道出版社, pages: 350 - 351 * |
Cited By (14)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN113084388A (en) * | 2021-03-29 | 2021-07-09 | 广州明珞装备股份有限公司 | Welding quality detection method, system, device and storage medium |
CN113342817A (en) * | 2021-06-23 | 2021-09-03 | 蘑菇物联技术(深圳)有限公司 | Data down-sampling method, device and system and computer readable storage medium |
WO2023020247A1 (en) * | 2021-08-17 | 2023-02-23 | 杭州涂鸦信息技术有限公司 | Method and apparatus for precision reduction of time series index data, and computer device |
CN113742341A (en) * | 2021-08-25 | 2021-12-03 | 杭州安恒信息技术股份有限公司 | Time series data aggregation method and device, computer equipment and storage medium |
CN113468248A (en) * | 2021-09-06 | 2021-10-01 | 天津中新智冠信息技术有限公司 | Data statistical method, device, equipment and storage medium |
CN113672396A (en) * | 2021-10-25 | 2021-11-19 | 中电云数智科技有限公司 | Streaming computing job processing method and device |
CN113672396B (en) * | 2021-10-25 | 2021-12-28 | 中电云数智科技有限公司 | Streaming computing job processing method and device |
CN114547073A (en) * | 2022-02-10 | 2022-05-27 | 清华大学 | Aggregation query method and device for time series data and storage medium |
CN115037729A (en) * | 2022-04-21 | 2022-09-09 | 中国建设银行股份有限公司 | Data aggregation method and device, electronic equipment and computer readable medium |
CN115037729B (en) * | 2022-04-21 | 2024-05-28 | 中国建设银行股份有限公司 | Data aggregation method, device, electronic equipment and computer readable medium |
CN114979186A (en) * | 2022-05-16 | 2022-08-30 | 浪潮云信息技术股份公司 | Flow link analysis method and system based on Flink component |
CN116150183A (en) * | 2023-04-04 | 2023-05-23 | 深圳依时货拉拉科技有限公司 | Data query expression generation method, device, equipment and storage medium |
CN116911642A (en) * | 2023-09-12 | 2023-10-20 | 中国长江电力股份有限公司 | Multi-dimensional multi-state oriented hydroelectric generating set equipment index calculation system and method |
CN116911642B (en) * | 2023-09-12 | 2023-12-26 | 中国长江电力股份有限公司 | Multi-dimensional multi-state oriented hydroelectric generating set equipment index calculation system and method |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN112307057A (en) | Data processing method and device, electronic equipment and computer storage medium | |
US20180365085A1 (en) | Method and apparatus for monitoring client applications | |
CN111209352B (en) | Data processing method and device, electronic equipment and storage medium | |
CN110362455B (en) | Data processing method and data processing device | |
CN112506743A (en) | Log monitoring method and device and server | |
CN111339175B (en) | Data processing method, device, electronic equipment and readable storage medium | |
CN109062769B (en) | Method, device and equipment for predicting IT system performance risk trend | |
CN112463543A (en) | Business data monitoring method, rule data generating method, device and system | |
CN114785690B (en) | Monitoring method based on service grid and related equipment | |
CN111400294B (en) | Data anomaly monitoring method, device and system | |
CN110765189A (en) | Exception management method and system for Internet products | |
CN113596078B (en) | Service problem positioning method and device | |
CN110941530A (en) | Method and device for acquiring monitoring data, computer equipment and storage medium | |
CN112395315A (en) | Method for counting log files and detecting abnormity and electronic device | |
CN109901991B (en) | Method and device for analyzing abnormal call and electronic equipment | |
CN114153703A (en) | Micro-service exception positioning method and device, electronic equipment and program product | |
CN110011845B (en) | Log collection method and system | |
CN110138720B (en) | Method and device for detecting abnormal classification of network traffic, storage medium and processor | |
CN112306989A (en) | Database instance processing method and device, storage medium and electronic device | |
CN112596974A (en) | Full link monitoring method, device, equipment and storage medium | |
CN115277355A (en) | Method, device, equipment and medium for processing state code data of monitoring system | |
CN114531361A (en) | Service topology analysis method and device of distributed system and storage medium | |
CN115220984A (en) | Business application monitoring method and device, electronic equipment and storage medium | |
CN118331823B (en) | Method and system for managing and monitoring alarm of space engineering business operation log | |
CN111444172A (en) | Data monitoring method, device, medium and equipment |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
CB02 | Change of applicant information |
Address after: 100102 201 / F, block C, 2 lizezhong 2nd Road, Chaoyang District, Beijing Applicant after: Beijing Shuidi Technology Group Co.,Ltd. Address before: 100102 201, 2 / F, block C, No.2 lizezhong 2nd Road, Chaoyang District, Beijing Applicant before: Beijing Health Home Technology Co.,Ltd. |
|
CB02 | Change of applicant information |