CN118113766A - 批量数据处理方法、装置、设备及介质 - Google Patents
批量数据处理方法、装置、设备及介质 Download PDFInfo
- Publication number
- CN118113766A CN118113766A CN202410322009.8A CN202410322009A CN118113766A CN 118113766 A CN118113766 A CN 118113766A CN 202410322009 A CN202410322009 A CN 202410322009A CN 118113766 A CN118113766 A CN 118113766A
- Authority
- CN
- China
- Prior art keywords
- data
- subscription information
- partition
- topic
- batch
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
- 238000003672 processing method Methods 0.000 title claims abstract description 25
- 238000005192 partition Methods 0.000 claims abstract description 208
- 238000000034 method Methods 0.000 claims abstract description 56
- 230000008569 process Effects 0.000 claims abstract description 28
- 238000012545 processing Methods 0.000 claims description 68
- 238000004590 computer program Methods 0.000 claims description 16
- 238000010586 diagram Methods 0.000 description 9
- 238000004891 communication Methods 0.000 description 8
- 238000006243 chemical reaction Methods 0.000 description 6
- 238000011068 loading method Methods 0.000 description 6
- 230000008676 import Effects 0.000 description 5
- 230000007246 mechanism Effects 0.000 description 5
- 238000007781 pre-processing Methods 0.000 description 5
- 230000008901 benefit Effects 0.000 description 4
- 230000005540 biological transmission Effects 0.000 description 4
- 238000007405 data analysis Methods 0.000 description 4
- 238000012423 maintenance Methods 0.000 description 4
- 238000011084 recovery Methods 0.000 description 4
- 238000004458 analytical method Methods 0.000 description 3
- 230000002547 anomalous effect Effects 0.000 description 3
- 230000003139 buffering effect Effects 0.000 description 3
- 230000006835 compression Effects 0.000 description 3
- 238000007906 compression Methods 0.000 description 3
- 238000013500 data storage Methods 0.000 description 3
- 238000013461 design Methods 0.000 description 3
- 230000006870 function Effects 0.000 description 3
- 230000003287 optical effect Effects 0.000 description 3
- 230000003993 interaction Effects 0.000 description 2
- 238000012986 modification Methods 0.000 description 2
- 230000004048 modification Effects 0.000 description 2
- 230000009471 action Effects 0.000 description 1
- 238000013459 approach Methods 0.000 description 1
- 238000003491 array Methods 0.000 description 1
- 238000013473 artificial intelligence Methods 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 238000003339 best practice Methods 0.000 description 1
- 230000007547 defect Effects 0.000 description 1
- 238000001514 detection method Methods 0.000 description 1
- 238000011161 development Methods 0.000 description 1
- 238000005516 engineering process Methods 0.000 description 1
- 239000000284 extract Substances 0.000 description 1
- 238000000605 extraction Methods 0.000 description 1
- 230000004927 fusion Effects 0.000 description 1
- 230000006872 improvement Effects 0.000 description 1
- 239000004973 liquid crystal related substance Substances 0.000 description 1
- 238000010801 machine learning Methods 0.000 description 1
- 238000007726 management method Methods 0.000 description 1
- 238000013507 mapping Methods 0.000 description 1
- 238000012544 monitoring process Methods 0.000 description 1
- 239000013307 optical fiber Substances 0.000 description 1
- 238000005457 optimization Methods 0.000 description 1
- 230000008520 organization Effects 0.000 description 1
- 239000004065 semiconductor Substances 0.000 description 1
- 230000001953 sensory effect Effects 0.000 description 1
- 238000006467 substitution reaction Methods 0.000 description 1
- 230000007704 transition Effects 0.000 description 1
- 238000012795 verification Methods 0.000 description 1
- 230000000007 visual effect Effects 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/25—Integrating or interfacing systems involving database management systems
- G06F16/254—Extract, transform and load [ETL] procedures, e.g. ETL data flows in data warehouses
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/23—Updating
- G06F16/2365—Ensuring data consistency and integrity
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/27—Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Data Mining & Analysis (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computer Security & Cryptography (AREA)
- Computing Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本发明公开了一种批量数据处理方法、装置、设备及介质。该方法包括:获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据;获取所述主题分区对应的目标线程;通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。本发明实施例可以提高数据入库存储的实时性、一致性和完整性。
Description
技术领域
本发明涉及计算机技术领域,尤其涉及一种批量数据处理方法、装置、设备及介质。
背景技术
数据批量写入为不同行业提供了强大的数据存储和分析能力,涉及复杂的技术原理和最佳实践。不仅可以改善业务运营,还可以加强数据驱动的决策,为各行业的持续发展提供有力支持。
数据批量处理的方式有多种:ETL(Extraction-Transformation-Loading,提取-转换-加载)工具和流数据处理工具Spark。
ETL流程通常复杂,需要花费大量时间在数据的抽取、转换和加载过程上。对于大规模数据集,这可能导致性能瓶颈,特别是在处理实时数据时,需要定义和维护多个转换步骤,ETL工具的配置和维护需要时间和资源,并且这种流程常常不具备通用性。而Spark是一个强大而复杂的分布式计算框架,设置和维护Spark集群需要专业知识。这可能增加了部署和维护的复杂性,尤其对于小型项目或团队来说可能有些过于繁重,并且Spark集群通常需要大量的计算和内存资源,因此需要投资于硬件和云计算资源,这可能对成本构成挑战,Spark的容错性是通过RDD(Resilient Distributed Datasets,弹性分布式数据集)来实现的,但有时候在处理大规模数据时,任务失败或数据丢失可能会成为问题。需要实现恢复机制来处理这些问题。
发明内容
本发明提供了一种批量数据处理方法、装置、设备及介质,可以提高数据入库存储的实时性、一致性和完整性。
第一方面,本发明实施例提供了一种批量数据处理方法,该方法包括:
获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据;
获取所述主题分区对应的目标线程;
通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
第二方面,本发明实施例还提供了一种批量数据处理装置,该装置包括:
订阅信息获取模块,用于获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据;
分区线程确定模块,用于获取所述主题分区对应的目标线程;
批量存储模块,用于通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
第三方面,本发明实施例提供了一种电子设备,该电子设备包括:
至少一个处理器;以及
与至少一个处理器通信连接的存储器;其中,
存储器存储有可被至少一个处理器执行的计算机程序,计算机程序被至少一个处理器执行,以使至少一个处理器能够执行本发明任一实施例的批量数据处理方法。
第四方面,本发明实施例提供了一种计算机可读存储介质,计算机可读存储介质存储有计算机指令,计算机指令用于使处理器执行时实现本发明任一实施例的批量数据处理方法。
本发明实施例的技术方案,通过获取订阅信息和以及对应的主题分区,通过目标线程按照订阅信息的所属批次将订阅信息存储到主题分区对应的存储空间中,实现不同线程并行处理不同主题分区的批量数据,实现快速处理大量数据,提高数据处理的速度,同时可以按照主题分区分开处理数据,以及将数据存储如主题分区对应的存储空间,可以确保数据写入前后的一致性和完整性,解决了现有技术中批量数据处理的实时性差和数据丢失等问题,可以并行处理批量数据,提高数据实时性,提高数据批量处理的效率,同时,将数据按照主题分区采用对应的线程存入对应的存储空间中,减少不同主题分区之间的数据混乱,提高写入数据的一致性和完整性。
应当理解,本部分所描述的内容并非旨在标识本发明的实施例的关键或重要特征,也不用于限制本发明的范围。本发明的其它特征将通过以下的说明书而变得容易理解。
附图说明
为了更清楚地说明本发明实施例中的技术方案,下面将对实施例描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
图1是根据本发明实施例一提供的一种批量数据处理方法的流程图;
图2是根据本发明实施例二提供的一种批量数据处理方法的流程图;
图3是根据本发明实施例二提供的一种线程的处理流程的示意图;
图4是根据本发明实施例三提供的一种批量数据处理方法的流程图;
图5是根据本发明实施例三提供的一种缓存队列的示意图;
图6是根据本发明实施例三提供的一种存储结构的示意图;
图7是根据本发明实施例三提供的一种数据缓冲方法的示意图;
图8是根据本发明实施例四提供的一种批量数据处理装置的结构示意图;
图9是实现本发明实施例的批量数据处理方法的电子设备的结构示意图。
具体实施方式
为了使本技术领域的人员更好地理解本发明方案,下面将结合本发明实施例中的附图,对本发明实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本发明一部分的实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都应当属于本发明保护的范围。
需要说明的是,本发明的说明书和权利要求书及上述附图中的术语“第一”、“第二”等是用于区别类似的对象,而不必用于描述特定的顺序或先后次序。应该理解这样使用的数据在适当情况下可以互换,以便这里描述的本发明的实施例能够以除了在这里图示或描述的那些以外的顺序实施。此外,术语“包括”和“具有”以及他们的任何变形,意图在于覆盖不排他的包含,例如,包含了一系列步骤或单元的过程、方法、系统、产品或设备不必限于清楚地列出的那些步骤或单元,而是可包括没有清楚地列出的或对于这些过程、方法、产品或设备固有的其它步骤或单元。
本发明实施例的技术方案中,所涉及的转向请求报文等的获取、存储和应用等,均符合相关法律法规的规定,且不违背公序良俗。
实施例一
图1为本发明实施例一提供的一种批量数据处理方法的流程图。本发明实施例可适用于从消息订阅系统中获取批量数据并写入数据库的情况,该方法可以由批量数据处理装置来执行,该批量数据处理装置可以采用硬件和/或软件的形式实现,该批量数据处理装置可配置于电子设备中。电子设备可以是客户端设备或服务器设备等,客户端设备可以包括:个人计算机、笔记本电脑、智能手机、平板电脑、物联网设备或便携式可穿戴设备等。
参见图1所示的批量数据处理方法,包括:
S101、获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据。
订阅信息可以是从消息系统中获取订阅的主题的消息。在消息系统中可以按照消息来源或消息类型生成至少一个主题,主题对应的存储空间中存储该主题对应的消息。针对一个主题的存储空间可以设置至少一个分区,各分区存储该主题的不同消息,并且各分区负载均衡写入数据。主题分区可以是订阅信息所属的主题中的一个分区。订阅信息存储在所属的主题的一个分区中,该分区即为该订阅信息对应的主题分区。订阅信息可以包括至少一条订阅数据。可以集合一段时间内或一定数据量的同一主题分区的至少一条订阅数据,形成一个订阅信息,消息系统的各消费者获取订阅信息,相当于获取至少一条订阅数据,实现消息系统批量传输数据。可以将一个订阅信息包括的订阅数据作为一个批次的数据。不同订阅信息中订阅数据为不同批次的数据。
示例性的,消息系统可以是分布式发布订阅消息系统Kafka。实现本发明实施例提供的批量数据处理方法的系统中部署有消费者模块,例如,可以是Funnel线程,该消费者模块与消息系统交互。该消费者模块是从Kafka订阅的全部主题topic中摄取数据。
S102、获取所述主题分区对应的目标线程。
目标线程用于处理对应的主题分区的订阅信息,即处理对应的主题分区的至少一条订阅数据。可以为每个主题分区创建一个线程,不同主题分区对应不同线程。同一个主题可以具有多个主题分区,不同主题对应不同线程,不同线程可以对应不同主题,也可以对应相同主题。
S103、通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
同一主题分区的不同批次的订阅信息异步处理,并按照批次的前后顺序,顺序存储订阅信息,以及按照订阅信息中订阅数据的排列顺序,顺序存储该订阅信息中的订阅数据。在写入的数据库中,按照主题分区设置对应的存储空间,在存储空间中顺序存储各批次的订阅信息。按照主题分区写入到对应的存储空间,在相同映射关系中写入数据,可以确保数据一致性和完整性。存储空间可以包括下述至少一项:数据库、数据表和数据表页。不同线程并行处理对应的主题分区的数据,实现不同主题分区的数据并行写入,增加写入数据的数据量,提高写入速度。
本发明实施例的技术方案,通过获取订阅信息和以及对应的主题分区,通过目标线程按照订阅信息的所属批次将订阅信息存储到主题分区对应的存储空间中,实现不同线程并行处理不同主题分区的批量数据,实现快速处理大量数据,提高数据处理的速度,同时可以按照主题分区分开处理数据,以及将数据存储如主题分区对应的存储空间,可以确保数据写入前后的一致性和完整性,解决了现有技术中批量数据处理的实时性差和数据丢失等问题,可以并行处理批量数据,提高数据实时性,提高数据批量处理的效率,同时,将数据按照主题分区采用对应的线程存入对应的存储空间中,减少不同主题分区之间的数据混乱,提高写入数据的一致性和完整性。
实施例二
图2为本发明实施例二提供的一种批量数据处理方法的流程图。本发明实施例在上述实施例的基础上,对下述内容进行了优化改进。
进一步地,将“获取所述主题分区对应的目标线程”细化为“查询所述主题分区对应的目标线程是否存在;在所述主题分区对应的目标线程不存在时,创建所述主题分区对应的目标线程;在所述主题分区对应的目标线程存在时,获取所述主题分区对应的目标线程”。
需要说明的是,在本发明实施例中未详述的部分,可参见其他实施例的表述。
参见图2所示的批量数据处理方法,包括:
S201、获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据。
S202、查询所述主题分区对应的目标线程是否存在。
可以根据主题分区创建对应的线程。在新增主题分区时,可以根据新增的主题分区创建对应的线程。查询目标线程是否存在,在不存在时,创建目标线程,在存在时,将订阅信息发送至目标线程进行处理。
S203、在所述主题分区对应的目标线程不存在时,创建所述主题分区对应的目标线程。
S204、在所述主题分区对应的目标线程存在时,获取所述主题分区对应的目标线程。
S205、通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
可选的,批量数据处理方法还可以包括:创建新数据表,并获取所述新数据表的信息;将所述新数据表的信息发送至订阅系统,以使所述订阅系统根据所述新数据表的标识创建所述新数据表对应的主题,并根据所述新数据表对应的数据库节点的数量,创建所述对应的主题的分区;订阅所述新数据表对应的主题;所述获取订阅信息,以及所述订阅信息对应的主题分区,包括:获取订阅的主题产生的订阅信息以及所述订阅的主题的分区中存储所述订阅信息的主题分区。
可以在数据库中创建新的数据表。一个数据库对应至少一个数据库节点。一个数据表对应至少一个数据库节点,实际上,一个数据表可以划分为不重叠的多个子表,各子表存储的数据不同且相互独立,一个子表的数据可以由至少一个数据库节点负责处理。示例性的,新的数据表对应4个数据库节点,其中,两个数据库节点用于处理该新的数据表中不同子表的数据,即用于结合处理该新的数据表中数据。剩余两个数据库节点分别为这两个数据库节点的副本节点。
订阅系统根据创建的新数据表,对应创建主题以及主题分区,由此实现主题与数据表对应,以及主题分区与数据表对应。根据新数据表对应的数据库节点的数量,在相应主题下创建对应数量的分区。示例性的,分区的数量为数据库节点的数量的倍数。例如,数据库节点的数量为4,分区数量为4;又如,数据库节点的数量为4,分区数量为8。
订阅系统可以在发布订阅信息时,添加订阅信息所在的主题以及主题的分区。从订阅系统中消费该订阅信息,可以同时获取该订阅信息所在的主题以及所在主题的分区。
通过创建新的数据表时,订阅系统根据新的数据表的需求动态的创建新的主题以及主题分区,进而,可以根据新建的主题和主题分区,确保数据的处理不会因为新的数据表以及新的主题的引入而受到影响,可以适配更多的应用场景,适应不断变化的数据流,保持高性能和可伸缩性。
可选的,所述订阅信息包括多条订单数据或多条库存数据,所述主题分区包括订单分区或库存分区,所述目标线程包括订单分区对应的第一线程或所述库存分区对应的第二线程。
可以设置应用场景为订单数据和库存数据的入库场景。在订阅系统中,主题包括订单主题和库存主题。订单主题包括至少一个订单分区,以及库存主题包括至少一个库存分区。可以从一个订单分区中提取多条订单数据,形成订阅信息,将该订阅信息发送至订单分区对应的第一线程,将多条订单数据存储到订单分区对应的数据表中。又如,从一个库存分区中提取多条库存数据,形成订阅信息,将该订阅信息发送至库存分区对应的第二线程,将多条库存数据存储到库存分区对应的数据表中。
在一个具体的例子中,将来自订阅系统Kafka的数据批量导入到ClickHouse数据库的应用场景中。首先使用Kafka消费者库创建一个消费者线程Funnel,Funnel线程的主要任务是从Kafka订阅的全部topic中摄取数据。这些主题可能包括各种类型的数据,例如日志、事件和订单等等。Funnel负责从Kafka中拉取数据并将其传递给核心处理器ConsumerDispatch。Consumer Dispatch充当中央分发器的角色。当数据到达Consumer Dispatch时,需要对数据进行进一步的处理和路由。具体来说,Consumer Dispatch根据数据的主题topic和分区partition值将数据发送到相应的处理数据的线程pipeline。一个关键的原则是根据数据的topic和partition值将数据路由到对应的pipeline。这种分区级的处理方式有助于提高并发性和性能,因为每个pipeline线程只负责处理特定分区的数据,从而避免了锁竞争和资源争夺的情况。当数据到达Funnel时,Funnel可以使用分区信息将数据传递给正确的pipeline,从而确保高效的并行处理。此外,如果有新的主题被创建,ConsumerDispatch线程可以根据新主题的需求动态地创建新的pipeline,确保数据的处理不会因新主题的引入而受阻。这种设计方式使得系统能够适应不断变化的数据流,保持高性能和可伸缩性。
例如:如图3所示,在一个电子商务平台的应用场景中,该平台使用Kafka来处理订单数据和库存数据。订单数据发布到主题2,库存数据发布到主题1。这两个主题的每个分区都有对应的线程(pipeline)来处理数据。例如,主题1的分区0将有一个名为主题1-分区0的pipeline来处理所有来自该分区的库存数据。
当订单数据到达Kafka集群并被消费者线程(Funnel)分配到相应的分区时,消费者分发器(Consumer Dispatch)会将其发送到相应的订单处理的pipeline,例如,名为主题2-分区0的pipeline。同样,在库存数据到达时,将库存数据正确路由到相应的库存处理的pipeline,例如,名为主题1-分区0的pipeline。
本发明的主要目的是将大量数据高效地导入ClickHouse数据库中。这个工具的主要功能包括解决数据导入的性能和处理问题,高度利用Clickhouse高压缩率和并行加载等优势。用户可以将来自Kafka的数据批量导入到ClickHouse,从日志文件到其他数据库和数据仓库的数据都可以轻松应对。这个工具的主要优势之一是高性能数据加载,它经过优化,能够以最大速度将数据写入数据库,从而支持快速的数据分析和查询。
本发明实施例通过查询主题分区对应的目标线程是否存在,并在不存在时,创建目标线程,可以实时根据新增的主题分区对应新建线程,实现低成本部署线程,且易于维护,可以新主题的需求动态地创建新的线程,确保数据的处理不会因新主题的引入而受阻,能够适应不断变化的数据流,保持高性能和可伸缩性。
实施例三
图4为本发明实施例三提供的一种批量数据处理方法的流程图。本发明实施例在上述实施例的基础上,对下述内容进行了优化改进。
进一步地,将“按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中”细化为“将所述订阅信息添加到所述目标线程对应的缓存队列中;从所述目标线程对应的缓存队列中获取待处理数据;在所述待处理数据中包括所述订阅信息时,获取所述订阅信息对应的数据表,所述数据表与所述主题分区对应;按照所述订阅信息所属批次,将所述订阅信息以列式存储结构存储到所述主题分区对应的数据表中”。
需要说明的是,在本发明实施例中未详述的部分,可参见其他实施例的表述。
参见图4所示的批量数据处理方法,包括:
S401、获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据。
S402、获取所述主题分区对应的目标线程。
S403、通过所述主题分区对应的目标线程,将所述订阅信息添加到所述目标线程对应的缓存队列中。
每个线程都对应设置有缓存队列。缓存队列中按照添加顺序排列接收的数据。线程按照缓存队列,依次取数据进行处理。
在一个具体的例子中,在订阅系统Kafka的数据批量导入到ClickHouse数据库的应用场景中,存在数据来源与传输过程,具体为:Pipeline的数据处理开始于Funnel,其任务是从Kafka中获取数据。这些数据以protobuf格式的二进制数据存在,通常包括来自不同源头的信息。一批次数据,或者说一个offset,是从Kafka主题中提取得到。这些数据通过Funnel发送到Pipeline中,进入一个二进制缓存队列queue。此时,Pipeline会记录该数据的二进制大小,并跟踪queue的大小(queueSize)。
例如:Pipeline正在处理来自电子商务平台的订单数据。一条数据可能包括订单详细信息和客户信息等。这些数据以二进制格式发送到Pipeline的queue。如图5所示,缓存队列包括n批数据,即n个offset。
S404、通过所述主题分区对应的目标线程,从所述目标线程对应的缓存队列中获取待处理数据。
目标线程从对应的缓存队列中依照添加顺序取出待处理数据。待处理数据可以包括至少一个订阅的信息。
此外,目标线程从缓存队列中提取数据之后,先进行预处理操作,将预处理之后的结果进行入库。预处理操作可以包括如下内容:
A、数据从队列中提取出来,通常需要进行反序列化操作。这是因为原始数据可能以一种不可直接处理的格式存在,比如二进制数据、JSON和XML等。反序列化的过程将数据转换成能够在系统中进行处理的结构化格式。这个格式可以是对象、数据帧、或其他数据结构,具体取决于应用的需求和数据的特性。
B、之后通常需要对数据进行表存在性校验。检查数据是否包含所需的字段或属性,以及这些字段是否符合预定义的规范和数据模式。表存在性校验:可以通过ClickHouse的Manager模块,检测这个表是否存在,以及该Manager模块还可以用于新建数据表。
C、除了基本的反序列化和校验,数据预处理还可以包括数据清洗和转换步骤。这可能包括去除重复记录、填充缺失值、单位转换和数据格式转换等操作,以确保数据在后续分析中一致和可用。
D、如果以上任何一步的数据不符合要求,该数据可能被标记为异常数据或者被拒绝,Pipeline通常会将这种异常数据发送到专门处理异常数据的处理单元或Kafka主题。这个机制有助于维护数据质量和完整性,同时确保正常数据的流畅处理。
综合来看,数据预处理是数据管道中的关键环节,确保从原始数据到最终可用于分析的数据的平稳转换。这一过程有助于确保数据质量、一致性和可靠性,从而为数据分析和应用提供可信赖的基础。
S405、通过所述主题分区对应的目标线程,在所述待处理数据中包括所述订阅信息时,获取所述订阅信息对应的数据表,所述数据表与所述主题分区对应。
在待处理数据包括订阅信息,目标线程对订阅信息进行处理时,目标线程根据订阅信息对应的主题分区,确定订阅信息对应的数据表。主题分区与数据表存在对应关系。订阅信息对应的数据表用于存储订阅信息。
S406、通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息以列式存储结构存储到所述主题分区对应的数据表中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
在主题分区对应的数据表中,按照批量数据的所属批次的接收顺序,依次存储各批次的数据。列式存储结构可以是,将列的元数据存储为键(key),将列的数据存储为值(value),即value为订阅信息,而订阅信息本身就是批量数据,具有一定结构的多条数据。通过这样的列式存储结构不区分具体结构和内容,可以使得系统可以处理不同批次不同结构的数据,并且可以高效的维护数据的一致性。
在一个具体的例子中,处理后的数据在Pipeline中被存储到一个结构中,通常是一个Map<String,Map<String,Table>>。外层Map的键通常代表库名,内层Map的键代表表名,而Table则包含了存储表结构和实际数据的对象。这个步骤有助于数据的组织和分配,使其准备好写入。而Table的数据存储采用列式结构,使用Map<Column,Column>表示。在这个结构中,键(key)的Column是列的元数据信息,包括列名和类型,而值(value)的Column则表示该列对应的数据,是一个List<Sections>。每个Section代表一批数据,包含数据记录以及该批数据的起始位置。存储结构可以如图6所示,数据缓存中data cache包括数据库1(database1)和数据库2(database2)。数据库1包括表1(table1)和表2(tabel2),表1包括3个Section,分别是批次1数据、批次2数据和批次3数据。同样,表2包括3个Section,分别是批次1数据、批次2数据和批次3数据。
每个Section不仅仅包含了数据记录,还记录了本批数据的起始位置。例如,如果第一批数据有6条记录,那么第一批数据的起始位置为0。当第二批数据到来时,第二批数据的起始位置为6。这种设置的目的在于确保数据的连续性和一致性。
可选的,所述按照所述订阅信息所属批次,将所述订阅信息以列式存储结构存储到所述主题分区对应的数据表中,包括:若在所述主题分区对应的数据表中数据列数小于所述订阅信息中数据列数,在所述主题分区对应的数据表中,为预先存储的历史批次的数据添加数值为空的目标列;在所述历史批次的数据之后,顺序存储所述订阅信息。
数据表中数据列数可以是指该数据表存储的数据的列数。订阅信息中数据列数,可以是指订阅信息中包括的多条订阅数据的列数。一条订阅数据可以包括多个字段的属性值,一个字段的属性值为一列,相应的,订阅信息可以包括至少一列的数据。订阅信息中的数据是同一批数据,同一批数据的列数相同。而不同批次的数据列数可以不同。
历史批次的数据可以是指历史时间接收的至少一批数据,在订阅信息之前存储的数据。目标列可以是指在历史批次的数据包括的列之后的列。目标列的数量为至少一个。在历史批次的数据之后,可以是在历史批次的数据所在行或列之后,写入订阅信息中各条订阅数据。
若在先接收的批次的数据的列数小于在后批次的数据的列数,在存储时在后批次的数据多出的列数的数据会偏移到在先批次的数据的相应列处,从而引起数据错位和混乱。为了解决不同批次的数据的列数不同而引起的数据写入错位的问题,可以在发现在先批次的数据的列数小于在后批次的数据的列数时,对在先批次的数据增加数值为空的列,避免在后批次的数据中多出的列的数据向前偏移。
示例性的,例如第一批数据有两列,而第二批数据有三列,需要进行相应的处理。具体地,如果第一批数据有两列,而第二批数据有三列,就需要在第一批数据的第三列中插入null值,确保不同批次的数据在对应列上具有相同的长度,以免导致数据错位或混乱。
通过在订阅信息写入之前,将数据表中历史存储的数据的数据列数与订阅信息的数据列数进行比较,并在历史存储的数据的数据列数小于订阅信息的数据列数时,在历史存储的数据的列之后写入至少一个空白数值的列,以使修改后的历史存储的数据的列数与订阅信息的数据列数相同,避免订阅信息中多余的列的数据写入导致数据错位,从而提高数据写入准确性和完整性。
可选的,所述从所述目标线程对应的缓存队列中获取待处理数据,包括:从所述目标线程对应的缓存队列中获取至少一个订阅的信息;根据入库间隔时长和/或入库数据量,将获取的各订阅的信息,确定为待处理数据。
在从缓存队列中获取数据并入库时,可以收集多个数据,进行批量写入。缓存队列中数据为订阅的信息。入库间隔时长可以是指将缓存队列中数据写入数据库的周期。入库数据量可以是指将缓存队列中数据写入数据库的数据量。例如,按照缓存队列中各订阅的信息的顺序,依次取数据,直到取数据的开始时间与当前时间之间的差值等于入库间隔时长时,确定取出的订阅的信息为待处理数据。又如,按照缓存队列中各订阅的信息的顺序,依次取数据,直到取出的订阅的信息的数据量等于入库数据量时,确定取出的订阅的信息为待处理数据。又如,差值等于入库间隔时长和数据量等于入库数据量,任意一个条件满足时,确定取出的订阅的信息为待处理数据。需要说明的是,取出的每个订阅的信息均包括至少一条订阅数据,也即缓存队列中取出的一个信息本身即为批量数据,并且将缓存队列中数据写入数据库时,取出批量数据写入,进一步增加写入的数据量。
在一个具体的例子中,Pipeline需要判断数据的是否满足入库条件。入库条件可以是一定的时间间隔(例如,每分钟入库一次)或数据大小(例如,每收集100MB的数据入库一次)。如果不符合条件,Pipeline会继续拉取并处理数据。一旦满足条件,Pipeline触发写入操作,停止拉取queue的数据。
通过入库间隔时长和/或入库数据量,从缓存队列中确定取出的批量的数据,并一次性写入数据库,实现批量写入数据库,有助于维护数据一致性,并且一次性写入数据库,可以避免不断增加的数据影响性能。
此外,从Kafka读取数据到缓存队列的过程中,还设置有数据缓冲机制,目的是防止数据丢失或队列溢出。可选的,通过消费者线程将订阅信息添加到缓存队列中,并检测该缓存队列的数据量是否大于缓冲阈值,如果是,停止接收该目标线程对应的主题分区中发布的数据;如果否,则继续在接收到该目标线程对应的主题分区中发布的数据。目标线程在从缓存队列中提取数据之前,判断该缓存队列中的数据量是否小于或等于恢复阈值,如果是,恢复接收该目标线程对应的主题分区中发布的数据;如果否,则继续停止接收该目标线程对应的主题分区中发布的数据。通常缓冲阈值大于或等于该恢复阈值。
为了避免队列溢出,可以设置阈值,当队列包括的数据达到一定数据量时,暂停此分区的Kafka数据拉取,以等待数据处理和写入操作追赶上。Pipeline维护一个数据缓冲机制。这包括限制queue的字节大小,例如设置为20MB。如图7所示,Funnel线程在发送数据到Pipeline后,同时会检查此Pipeline的queueSize是否达到了数据缓冲的限制。如果达到了限制,Funnel会调用Kafka接口,pause(Collection<TopicPartition>partitions),停止拉取与该Pipeline对应的topic-partition的数据。同时,在Pipeline线程处理数据之前,会判断是否可以解除这个限制,比如queue的字节大小减少到了10MB。resume(Collection<TopicPartition>partitions);这种控制机制有助于确保系统的稳定性,防止不断增加的数据压倒处理和写入速度。
例如:如果订单数据的处理速度变慢,可能导致队列积压,这种积压不做限制的话,会导致应用内存无限增长,为了防止这种情况,Pipeline会暂停从Kafka获取新的订单数据,直到缓存队列包括的数据量减少到预设数据量时,重新从Kafka获取新的数据。
本发明实施例通过缓存队列存储订阅信息,并从缓存队列中顺序读取待处理数据进行处理,可以减轻数据库的负载压力,并且通过列式存储结构存储订阅信息,可以使得系统可以处理不同批次不同结构的数据,并且可以高效的维护数据的一致性,为后续的数据处理和分析提供了可靠的基础。
针对现有技术,一方面,pipeline整体核心处理流程,Kafka partition与pipeline对应关系,此方式可控制的粒度更细,操作所有的partition数据全部分层隔离开,既保证数据的一致性同时可确保数据不丢失,所有的partition同时处理数据,吞吐量极大。数据从队列中提取后,进行反序列化操作,将数据转换成系统可以处理的结构化格式,如对象和数据帧等。进行表存在性校验,确保数据包含所需字段且符合预定义规范和数据模式。包括数据清洗和转换步骤,如去重、填充缺失值和单位转换等,以确保数据一致性和可用性。异常数据会被标记并发送到专门处理异常数据的处理单元或Kafka主题,维护数据质量和完整性。另一方面,搭载pipeline的数据存储设计数据在Pipeline中以Map<String,Map<String,Table>>的结构进行存储,外层Map代表库名,内层Map代表表名,Table包含了表结构和实际数据的对象。
本发明实施例具有以下特点:独特的存储结构、低资源消耗、低部署成本、容易维护、简单且高吞吐量、高通用性以及使用仅为2核心和4GB内存的资源,可以处理每分钟达到约500万条APM(Application Performance Monitoring)数据。可应用于多种数据入库场景中。
本发明实施例将大量数据高效地导入ClickHouse数据库中,可以解决数据导入的性能和处理问题,高度利用Clickhouse高压缩率和并行加载等优势。用户可以将来自Kafka的数据批量导入到ClickHouse,从日志文件到其他数据库和数据仓库的数据都可以轻松应对。可以实现高性能数据加载,经过优化,能够以最大速度将数据写入数据库,从而支持快速的数据分析和查询。此外,还可以执行数据格式转换和预处理,确保导入的数据与ClickHouse数据库的表结构和数据类型匹配,同时利用ClickHouse的列式存储和压缩功能,高度压缩数据以及节省存储空间等。在导入过程中出现中断或错误,也可以进行适当的处理和数据恢复,确保数据的完整性和一致性,从而实现整个过程保证了数据的流畅传输、处理、存储和最终写入,同时确保数据质量和系统稳定性。
实施例四
图8为本发明实施例四提供的一种批量数据处理装置的结构示意图。本发明实施例可适用于对监控的实例的采集数据进行融合存储的情况,该装置可以执行批量数据处理方法,该装置可以采用硬件和/或软件的形式实现,该装置可配置于电子设备中。
参见图8所示的批量数据处理装置800,包括:如下模块:
订阅信息获取模块801,用于获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据;
分区线程确定模块802,用于获取所述主题分区对应的目标线程;
批量存储模块803,用于通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
本发明实施例的技术方案,通过获取订阅信息和以及对应的主题分区,通过目标线程按照订阅信息的所属批次将订阅信息存储到主题分区对应的存储空间中,实现不同线程并行处理不同主题分区的批量数据,实现快速处理大量数据,提高数据处理的速度,同时可以按照主题分区分开处理数据,以及将数据存储如主题分区对应的存储空间,可以确保数据写入前后的一致性和完整性,解决了现有技术中批量数据处理的实时性差和数据丢失等问题,可以并行处理批量数据,提高数据实时性,提高数据批量处理的效率,同时,将数据按照主题分区采用对应的线程存入对应的存储空间中,减少不同主题分区之间的数据混乱,提高写入数据的一致性和完整性。
进一步的,所述注册检测模块,包括:查询所述主题分区对应的目标线程是否存在;在所述主题分区对应的目标线程不存在时,创建所述主题分区对应的目标线程;在所述主题分区对应的目标线程存在时,获取所述主题分区对应的目标线程。
进一步的,批量数据处理装置还包括:创建新数据表;将所述新数据表发送至订阅系统,以使所述订阅系统根据所述新数据表创建所述新数据表对应的主题,并根据所述新数据表对应的数据库节点的数量,创建所述对应的主题的分区;订阅所述新数据表对应的主题;所述获取订阅信息,以及所述订阅信息对应的主题分区,包括:获取订阅的主题产生的订阅信息以及所述订阅的主题的分区中存储所述订阅信息的主题分区。
进一步的,所述按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,包括:将所述订阅信息添加到所述目标线程对应的缓存队列中;从所述目标线程对应的缓存队列中获取待处理数据;在所述待处理数据中包括所述订阅信息时,获取所述订阅信息对应的数据表,所述数据表与所述主题分区对应;按照所述订阅信息所属批次,将所述订阅信息以列式存储结构存储到所述主题分区对应的数据表中。
进一步的,所述按照所述订阅信息所属批次,将所述订阅信息以列式存储结构存储到所述主题分区对应的数据表中,包括:若在所述主题分区对应的数据表中数据列数小于所述订阅信息中数据列数,在所述主题分区对应的数据表中,为预先存储的历史批次的数据添加数值为空的目标列;在所述历史批次的数据之后,顺序存储所述订阅信息。
进一步的,所述从所述目标线程对应的缓存队列中获取待处理数据,包括:从所述目标线程对应的缓存队列中获取至少一个订阅的信息;根据入库间隔时长和/或入库数据量,将获取的各订阅的信息,确定为待处理数据。
进一步的,所述订阅信息包括多条订单数据或多条库存数据,所述主题分区包括订单分区或库存分区,所述目标线程包括订单分区对应的第一线程或所述库存分区对应的第二线程。
本发明实施例所提供的批量数据处理装置可执行本发明任意实施例所提供的批量数据处理方法,具备执行批量数据处理方法相应的功能模块和有益效果。
实施例五
图9示出了可以用来实施本发明的实施例的电子设备900的结构示意图。
如图9所示,电子设备900包括至少一个处理器901,以及与至少一个处理器901通信连接的存储器,如只读存储器(ROM)902、随机访问存储器(RAM)903等,其中,存储器存储有可被至少一个处理器执行的计算机程序,处理器901可以根据存储在只读存储器(ROM)902中的计算机程序或者从存储单元908加载到随机访问存储器(RAM)903中的计算机程序,来执行各种适当的动作和处理。在RAM 903中,还可存储电子设备900操作所需的各种程序和数据。处理器901、ROM 902以及RAM 903通过总线904彼此相连。输入/输出(I/O)接口905也连接至总线904。
电子设备900中的多个部件连接至I/O接口905,包括:输入单元906,例如键盘、鼠标等;输出单元907,例如各种类型的显示器、扬声器等;存储单元908,例如磁盘、光盘等;以及通信单元909,例如网卡、调制解调器、无线通信收发机等。通信单元909允许电子设备900通过诸如因特网的计算机网络和/或各种电信网络与其他设备交换信息/数据。
处理器901可以是各种具有处理和计算能力的通用和/或专用处理组件。处理器901的一些示例包括但不限于中央处理单元(CPU)、图形处理单元(GPU)、各种专用的人工智能(AI)计算芯片、各种运行机器学习模型算法的处理器、数字信号处理器(DSP)、以及任何适当的处理器、控制器、微控制器等。处理器901执行上文所描述的各个方法和处理,例如批量数据处理方法。
在一些实施例中,批量数据处理方法可被实现为计算机程序,其被有形地包含于计算机可读存储介质,例如存储单元908。在一些实施例中,计算机程序的部分或者全部可以经由ROM 902和/或通信单元909而被载入和/或安装到电子设备900上。当计算机程序加载到RAM 903并由处理器901执行时,可以执行上文描述的批量数据处理方法的一个或多个步骤。备选地,在其他实施例中,处理器901可以通过其他任何适当的方式(例如,借助于固件)而被配置为执行批量数据处理方法。
本文中以上描述的系统和技术的各种实施方式可以在数字电子电路系统、集成电路系统、现场可编程门阵列(FPGA)、专用集成电路(ASIC)、专用标准产品(ASSP)、芯片上系统的系统(SOC)、复杂可编程逻辑设备(CPLD)、计算机硬件、固件、软件、和/或它们的组合中实现。这些各种实施方式可以包括:实施在一个或者多个计算机程序中,该一个或者多个计算机程序可在包括至少一个可编程处理器的可编程系统上执行和/或解释,该可编程处理器可以是专用或者通用可编程处理器,可以从存储系统、至少一个输入装置、和至少一个输出装置接收数据和指令,并且将数据和指令传输至该存储系统、该至少一个输入装置、和该至少一个输出装置。
用于实施本发明的方法的计算机程序可以采用一个或多个编程语言的任何组合来编写。这些计算机程序可以提供给通用计算机、专用计算机或其他可编程数据处理装置的处理器,使得计算机程序当由处理器执行时使流程图和/或框图中所规定的功能/操作被实施。计算机程序可以完全在机器上执行、部分地在机器上执行,作为独立软件包部分地在机器上执行且部分地在远程机器上执行或完全在远程机器或服务器上执行。
在本发明的上下文中,计算机可读存储介质可以是有形的介质,其可以包含或存储以供指令执行系统、装置或设备使用或与指令执行系统、装置或设备结合地使用的计算机程序。计算机可读存储介质可以包括但不限于电子的、磁性的、光学的、电磁的、红外的、或半导体系统、装置或设备,或者上述内容的任何合适组合。备选地,计算机可读存储介质可以是机器可读信号介质。机器可读存储介质的更具体示例会包括基于一个或多个线的电气连接、便携式计算机盘、硬盘、随机存取存储器(RAM)、只读存储器(ROM)、可擦除可编程只读存储器(EPROM或快闪存储器)、光纤、便捷式紧凑盘只读存储器(CD-ROM)、光学储存设备、磁储存设备、或上述内容的任何合适组合。
为了提供与用户的交互,可以在电子设备上实施此处描述的系统和技术,该电子设备具有:用于向用户显示信息的显示装置(例如,CRT(阴极射线管)或者LCD(液晶显示器)监视器);以及键盘和指向装置(例如,鼠标或者轨迹球),用户可以通过该键盘和该指向装置来将输入提供给电子设备。其它种类的装置还可以用于提供与用户的交互;例如,提供给用户的反馈可以是任何形式的传感反馈(例如,视觉反馈、听觉反馈、或者触觉反馈);并且可以用任何形式(包括声输入、语音输入或者、触觉输入)来接收来自用户的输入。
可以将此处描述的系统和技术实施在包括后台部件的计算系统(例如,作为数据服务器)、或者包括中间件部件的计算系统(例如,应用服务器)、或者包括前端部件的计算系统(例如,具有图形用户界面或者网络浏览器的用户计算机,用户可以通过该图形用户界面或者该网络浏览器来与此处描述的系统和技术的实施方式交互)、或者包括这种后台部件、中间件部件、或者前端部件的任何组合的计算系统中。可以通过任何形式或者介质的数字数据通信(例如,通信网络)来将系统的部件相互连接。通信网络的示例包括:局域网(LAN)、广域网(WAN)、区块链网络和互联网。
计算系统可以包括客户端和服务器。客户端和服务器一般远离彼此并且通常通过通信网络进行交互。通过在相应的计算机上运行并且彼此具有客户端-服务器关系的计算机程序来产生客户端和服务器的关系。服务器可以是云服务器,又称为云计算服务器或云主机,是云计算服务体系中的一项主机产品,以解决了传统物理主机与VPS(VirtualPrivate Server,虚拟专用服务器)服务中,存在的管理难度大,故障扩展性弱的缺陷。
应该理解,可以使用上面所示的各种形式的流程,重新排序、增加或删除步骤。例如,本发明中记载的各步骤可以并行地执行也可以顺序地执行也可以不同的次序执行,只要能够实现本发明的技术方案所期望的结果,本文在此不进行限制。
上述具体实施方式,并不构成对本发明保护范围的限制。本领域技术人员应该明白的是,根据设计要求和其他因素,可以进行各种修改、组合、子组合和替代。任何在本发明的精神和原则之内所作的修改、等同替换和改进等,均应包含在本发明保护范围之内。
Claims (10)
1.一种批量数据处理方法,其特征在于,包括:
获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据;
获取所述主题分区对应的目标线程;
通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
2.根据权利要求1所述的方法,其特征在于,所述获取所述主题分区对应的目标线程,包括:
查询所述主题分区对应的目标线程是否存在;
在所述主题分区对应的目标线程不存在时,创建所述主题分区对应的目标线程;
在所述主题分区对应的目标线程存在时,获取所述主题分区对应的目标线程。
3.根据权利要求1或2所述的方法,其特征在于,还包括:
创建新数据表,并获取所述新数据表的信息;
将所述新数据表的信息发送至订阅系统,以使所述订阅系统根据所述新数据表的标识创建所述新数据表对应的主题,并根据所述新数据表对应的数据库节点的数量,创建所述对应的主题的分区;
订阅所述新数据表对应的主题;
所述获取订阅信息,以及所述订阅信息对应的主题分区,包括:
获取订阅的主题产生的订阅信息以及所述订阅的主题的分区中存储所述订阅信息的主题分区。
4.根据权利要求1所述的方法,其特征在于,所述按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,包括:
将所述订阅信息添加到所述目标线程对应的缓存队列中;
从所述目标线程对应的缓存队列中获取待处理数据;
在所述待处理数据中包括所述订阅信息时,获取所述订阅信息对应的数据表,所述数据表与所述主题分区对应;
按照所述订阅信息所属批次,将所述订阅信息以列式存储结构存储到所述主题分区对应的数据表中。
5.根据权利要求4所述的方法,其特征在于,所述按照所述订阅信息所属批次,将所述订阅信息以列式存储结构存储到所述主题分区对应的数据表中,包括:
若在所述主题分区对应的数据表中数据列数小于所述订阅信息中数据列数,在所述主题分区对应的数据表中,为预先存储的历史批次的数据添加数值为空的目标列;
在所述历史批次的数据之后,顺序存储所述订阅信息。
6.根据权利要求4所述的方法,其特征在于,所述从所述目标线程对应的缓存队列中获取待处理数据,包括:
从所述目标线程对应的缓存队列中获取至少一个订阅的信息;
根据入库间隔时长和/或入库数据量,将获取的各订阅的信息,确定为待处理数据。
7.根据权利要求1所述的方法,其特征在于,所述订阅信息包括多条订单数据或多条库存数据,所述主题分区包括订单分区或库存分区,所述目标线程包括订单分区对应的第一线程或所述库存分区对应的第二线程。
8.一种批量数据处理装置,其特征在于,包括:
订阅信息获取模块,用于获取订阅信息,以及所述订阅信息对应的主题分区;所述订阅信息包括同一批次的至少一条订阅数据;
分区线程确定模块,用于获取所述主题分区对应的目标线程;
批量存储模块,用于通过所述主题分区对应的目标线程,按照所述订阅信息所属批次,将所述订阅信息存储到所述主题分区对应的存储空间中,其中,不同主题分区对应的线程并行处理不同主题分区对应的订阅信息。
9.一种电子设备,其特征在于,所述电子设备包括:
至少一个处理器;以及
与所述至少一个处理器通信连接的存储器;其中,
所述存储器存储有可被所述至少一个处理器执行的计算机程序,所述计算机程序被所述至少一个处理器执行,以使所述至少一个处理器能够执行权利要求1-7中任一项所述的批量数据处理方法。
10.一种计算机可读存储介质,其特征在于,所述计算机可读存储介质存储有计算机指令,所述计算机指令用于使处理器执行时实现权利要求1-7中任一项所述的批量数据处理方法。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202410322009.8A CN118113766A (zh) | 2024-03-20 | 2024-03-20 | 批量数据处理方法、装置、设备及介质 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202410322009.8A CN118113766A (zh) | 2024-03-20 | 2024-03-20 | 批量数据处理方法、装置、设备及介质 |
Publications (1)
Publication Number | Publication Date |
---|---|
CN118113766A true CN118113766A (zh) | 2024-05-31 |
Family
ID=91216904
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202410322009.8A Pending CN118113766A (zh) | 2024-03-20 | 2024-03-20 | 批量数据处理方法、装置、设备及介质 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN118113766A (zh) |
Cited By (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN118411101A (zh) * | 2024-07-03 | 2024-07-30 | 澳优乳业(中国)有限公司 | 交易单证的处理方法、装置、电子设备、介质和程序产品 |
-
2024
- 2024-03-20 CN CN202410322009.8A patent/CN118113766A/zh active Pending
Cited By (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN118411101A (zh) * | 2024-07-03 | 2024-07-30 | 澳优乳业(中国)有限公司 | 交易单证的处理方法、装置、电子设备、介质和程序产品 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN109189835B (zh) | 实时生成数据宽表的方法和装置 | |
US10262032B2 (en) | Cache based efficient access scheduling for super scaled stream processing systems | |
US10409650B2 (en) | Efficient access scheduling for super scaled stream processing systems | |
US20130227194A1 (en) | Active non-volatile memory post-processing | |
CN107016480B (zh) | 任务调度方法、装置及系统 | |
WO2023082681A1 (zh) | 基于批流一体的数据处理方法、装置、计算机设备和介质 | |
CN112527899A (zh) | 数据同步的方法、装置、设备以及存储介质 | |
CN111562885A (zh) | 数据处理方法、装置、计算机设备及存储介质 | |
CN112148578A (zh) | 基于机器学习的it故障缺陷预测方法 | |
CN114968953A (zh) | 日志的存储检索方法、系统、终端设备及介质 | |
CN113962597A (zh) | 一种数据分析方法、装置、电子设备及存储介质 | |
CN118113766A (zh) | 批量数据处理方法、装置、设备及介质 | |
CN117633116A (zh) | 数据同步方法、装置、电子设备及存储介质 | |
CN115408391A (zh) | 一种数据库表变更方法、装置、设备和存储介质 | |
CN116383207A (zh) | 一种数据标签管理方法、装置、电子设备和存储介质 | |
CN115221116A (zh) | 一种数据写入方法、装置、设备及可读存储介质 | |
CN112732165B (zh) | 偏移量管理方法、装置及存储介质 | |
CN111459931A (zh) | 数据查重方法和数据查重装置 | |
CN111652616B (zh) | 交易数据实时监控方法及装置 | |
CN115150466B (zh) | 一种数据分发的实现方法、装置、电子设备及存储介质 | |
US20240078141A1 (en) | Method and system for event topic checkpointing | |
CN114969139A (zh) | 大数据运维管理方法、系统、装置及存储介质 | |
CN111654410B (zh) | 网关请求监控方法、装置、设备及介质 | |
CN117931805A (zh) | 一种数据处理方法、装置、电子设备和存储介质 | |
CN117579641A (zh) | 一种服务间数据通信系统、方法、设备及存储介质 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination |