CN113282611B - 一种流数据同步的方法、装置、计算机设备及存储介质 - Google Patents
一种流数据同步的方法、装置、计算机设备及存储介质 Download PDFInfo
- Publication number
- CN113282611B CN113282611B CN202110728683.2A CN202110728683A CN113282611B CN 113282611 B CN113282611 B CN 113282611B CN 202110728683 A CN202110728683 A CN 202110728683A CN 113282611 B CN113282611 B CN 113282611B
- Authority
- CN
- China
- Prior art keywords
- data
- log
- service
- stream
- spark
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 238000000034 method Methods 0.000 title claims abstract description 52
- 238000004364 calculation method Methods 0.000 claims abstract description 67
- 230000010354 integration Effects 0.000 claims abstract description 21
- 238000012545 processing Methods 0.000 claims description 60
- 238000013515 script Methods 0.000 claims description 22
- 238000005192 partition Methods 0.000 claims description 18
- 238000012544 monitoring process Methods 0.000 claims description 14
- 238000006243 chemical reaction Methods 0.000 claims description 7
- 230000001360 synchronised effect Effects 0.000 abstract description 10
- 238000005516 engineering process Methods 0.000 abstract description 4
- 230000008569 process Effects 0.000 description 13
- 230000005540 biological transmission Effects 0.000 description 4
- 238000004891 communication Methods 0.000 description 4
- 238000010586 diagram Methods 0.000 description 4
- 238000007726 management method Methods 0.000 description 4
- 230000007246 mechanism Effects 0.000 description 4
- 230000007115 recruitment Effects 0.000 description 4
- 238000012549 training Methods 0.000 description 4
- 230000009471 action Effects 0.000 description 3
- 238000004458 analytical method Methods 0.000 description 3
- 238000013500 data storage Methods 0.000 description 3
- 230000006870 function Effects 0.000 description 3
- 230000003287 optical effect Effects 0.000 description 3
- 238000004422 calculation algorithm Methods 0.000 description 2
- 230000006835 compression Effects 0.000 description 2
- 238000007906 compression Methods 0.000 description 2
- 230000002776 aggregation Effects 0.000 description 1
- 238000004220 aggregation Methods 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 239000002131 composite material Substances 0.000 description 1
- 238000013480 data collection Methods 0.000 description 1
- 230000000694 effects Effects 0.000 description 1
- 238000011156 evaluation Methods 0.000 description 1
- 239000000835 fiber Substances 0.000 description 1
- 238000001914 filtration Methods 0.000 description 1
- 230000010365 information processing Effects 0.000 description 1
- 230000003993 interaction Effects 0.000 description 1
- 230000032297 kinesis Effects 0.000 description 1
- 238000012986 modification Methods 0.000 description 1
- 230000004048 modification Effects 0.000 description 1
- 230000006855 networking Effects 0.000 description 1
- ZLIBICFPKPWGIZ-UHFFFAOYSA-N pyrimethanil Chemical compound CC1=CC(C)=NC(NC=2C=CC=CC=2)=N1 ZLIBICFPKPWGIZ-UHFFFAOYSA-N 0.000 description 1
- 230000003068 static effect Effects 0.000 description 1
- 238000012546 transfer Methods 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/242—Query formulation
- G06F16/2433—Query languages
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/23—Updating
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/245—Query processing
- G06F16/2455—Query execution
- G06F16/24568—Data stream processing; Continuous queries
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/245—Query processing
- G06F16/2457—Query processing with adaptation to user needs
- G06F16/24578—Query processing with adaptation to user needs using ranking
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/27—Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/28—Databases characterised by their database models, e.g. relational or object models
- G06F16/283—Multi-dimensional databases or data warehouses, e.g. MOLAP or ROLAP
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Physics & Mathematics (AREA)
- Data Mining & Analysis (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computational Linguistics (AREA)
- Mathematical Physics (AREA)
- Computing Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本申请公开了一种流数据同步的方法、装置、计算机设备及存储介质,属于大数据技术领域。本申请通过获取业务数据,并将业务数据发送至数据仓库,其中,业务数据为流数据,通过l ogstash工具从第一业务系统中采集与业务数据对应的执行日志,通过kafka消息队列消费执行日志,得到日志消费结果,通过流计算引擎对业务数据和日志消费结果进行数据整合计算,得到整合数据,执行SQL更新语句,以将整合数据同步至第二业务库。此外,本申请还涉及区块链技术,业务数据可存储于区块链中。本申请通过流计算引擎对第一业务系统的业务数据和日志消费结果进行数据整合计算,得到整合数据,通过将整合数据同步至第二业务库,实现流数据的实时同步,提高流数据同步实时性。
Description
技术领域
本申请属于大数据技术领域,具体涉及一种流数据同步的方法、装置、计算机设备及存储介质。
背景技术
随着微服务的兴起,越来越多的系统使用微服务技术,微服务系统一般包括多个服务和多个数据库,因此在实际应用中,存在两个或者两个以上系统的数据进行关联和同步的情况,针对此场景,现在业内有两个常用方案:
一、使用同步或者异步的方式将初始业务系统的数据同步到目标业务系统,在目标业务系统中进行表关联,实现条件过滤。例如,一种人员选拔系统需要在核心人事系统、招聘系统、绩效系统、培训系统、假勤系统中通过同步或者异步的方式发送数据,在选拔系统中实现数据接收。在上述同步方式中,系统间数据同步存在数据丢失风险,系统间数据大量冗余,给目标系统带来数据存储压力,系统间高度耦合,相互影响,随着微服务越来越多,系统复杂度越来越高。
二、使用数据仓库进行离线计算,例如,在一种人员选拔系统中,需要事先将核心人事系统、招聘系统、绩效系统、培训系统、假勤系统中的数据采集到数据仓库,然后进行加工,加工过程在数据仓库中进行,不会给业务库带来压力,然后将加工后的数据写入选拔系统。但数据仓库的数据采集和数据写入一般是一天一次,在凌晨进行,数据传输的时效为T+1,数据时效性低。
发明内容
本申请实施例的目的在于提出一种流数据同步的方法、装置、计算机设备及存储介质,以解决现有微服务系统的数据同步方案存在的时效性较低,无法实现实时同步的技术问题。
为了解决上述技术问题,本申请实施例提供一种流数据同步的方法,采用了如下所述的技术方案:
一种流数据同步的方法,包括:
接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至所述数据仓库;
通过logstash工具从所述第一业务系统中采集与所述业务数据对应的执行日志;
将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果;
将所述数据仓库中的业务数据和所述日志消费结果发送至所述流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据;
执行预设的SQL更新语句,以将整合数据同步至所述第二业务库。
进一步地,在所述接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至所述数据仓库之后,还包括:
从所述数据同步指令中获取与所述业务数据对应的数据加工标识;
在预设的脚本库中查找与所述数据加工标识对应的数据加工脚本;
基于所述数据加工脚本对所述业务数据进行处理。
进一步地,所述通过logstash工具从所述第一业务系统中采集与所述业务数据对应的执行日志的步骤,具体包括:
对所述第一业务系统的数据库进行监测,当监测到所述数据库的事务发生改变时,通过所述logstash工具获取发生改变的所述事务的执行日志;
在所述数据库中建立临时存储区,并将所述执行日志写入所述临时存储区中。
进一步地,所述对所述第一业务系统的数据库进行监测,当监测到所述数据库的数据库事务发生改变时,通过所述logstash工具获取发生改变的所述事务的执行日志的步骤,具体包括:
确定所述业务数据对应的日志数据,并获取所述日志数据的主关键字;
基于所述日志数据的主关键字对第一业务系统数据库中的事务进行监测;
当监测到所述第一业务系统的数据库中与所述业务数据具有相同主关键字的事务发生改变时,生成发生改变的所述事务的最新时间戳;
获取所述最新时间戳的日志数据,得到执行日志。
进一步地,所述将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果的步骤,具体包括:
解析所述执行日志,并基于所述执行日志的内容生成类别创建指令;
基于所述类别创建指令在所述kafka消息队列中创建对应的主题类别;
将所述执行日志从所述临时存储区发送至所述主题类别中进行存储;
执行预设的日志消费指令以消费所述主题类别中的所述执行日志,得到日志消费结果。
进一步地,所述将所述数据仓库中的业务数据和所述日志消费结果发送至所述流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据的步骤,具体包括:
按照预设的第一调度频率采集所述数据仓库中的业务数据,生成第一Spark流对象;
按照预设的第二调度频率采集所述日志消费结果,生成第二Spark流对象;
分别对所述第一Spark流对象和所述第二Spark流对象进行对象转化,将所述第一Spark流对象和所述第二Spark流对象转化为RDD对象;
通过所述流计算引擎合并转化为RDD对象后的所述第一Spark流对象和所述第二Spark流对象,得到整合数据。
进一步地,所述执行预设的SQL更新语句,以将整合数据同步至所述第二业务库的步骤,具体包括:
调用RDDforeach工具对所述整合数据进行数据划分,得到若干个RDD分区;
在每一个所述RDD分区与所述第二业务库之间创建一个连接对象;
通过所述连接对象将每一个所述RDD分区内的数据同步至所述第二业务库内。
为了解决上述技术问题,本申请实施例还提供一种流数据同步的装置,采用了如下所述的技术方案:
一种流数据同步的装置,包括:
指令接收模块,用于接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至所述数据仓库;
日志采集模块,用于通过logstash工具从所述第一业务系统中采集与所述业务数据对应的执行日志;
日志消费模块,用于将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果;
数据整合模块,用于将所述数据仓库中的业务数据和所述日志消费结果发送至所述流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据;
数据同步模块,用于执行预设的SQL更新语句,以将整合数据同步至所述第二业务库。
为了解决上述技术问题,本申请实施例还提供一种计算机设备,采用了如下所述的技术方案:
一种计算机设备,包括存储器和处理器,所述存储器中存储有计算机可读指令,所述处理器执行所述计算机可读指令时实现如上述所述的流数据同步的方法的步骤。
为了解决上述技术问题,本申请实施例还提供一种计算机可读存储介质,采用了如下所述的技术方案:
一种计算机可读存储介质,所述计算机可读存储介质上存储有计算机可读指令,所述计算机可读指令被处理器执行时实现如上述所述的流数据同步的方法的步骤。
与现有技术相比,本申请实施例主要有以下有益效果:
本申请公开了一种流数据同步的方法、装置、计算机设备及存储介质,属于大数据技术领域。本申请的数据同步平台在数据仓库的基础上,引入流计算引擎,通过创建离线计算与流计算结合的方式来实现流数据的实时同步,极大提高数据同步实时性,本申请通过获取业务数据,并将业务数据发送至数据仓库,其中,业务数据为流数据,通过logstash工具从第一业务系统中采集与业务数据对应的执行日志,通过kafka消息队列消费执行日志,得到日志消费结果,通过流计算引擎对业务数据和日志消费结果进行数据整合计算,得到整合数据,执行SQL更新语句,以将整合数据同步至第二业务库,以实现流数据的实时同步,提高流数据同步实时性。
附图说明
为了更清楚地说明本申请中的方案,下面将对本申请实施例描述中所需要使用的附图作一个简单介绍,显而易见地,下面描述中的附图是本申请的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
图1示出了本申请可以应用于其中的示例性系统架构图;
图2示出了根据本申请的流数据同步的方法的一个实施例的流程图;
图3示出了根据本申请的流数据同步的装置的一个实施例的结构示意图;
图4示出了根据本申请的计算机设备的一个实施例的结构示意图。
具体实施方式
除非另有定义,本文所使用的所有的技术和科学术语与属于本申请的技术领域的技术人员通常理解的含义相同;本文中在申请的说明书中所使用的术语只是为了描述具体的实施例的目的,不是旨在于限制本申请;本申请的说明书和权利要求书及上述附图说明中的术语“包括”和“具有”以及它们的任何变形,意图在于覆盖不排他的包含。本申请的说明书和权利要求书或上述附图中的术语“第一”、“第二”等是用于区别不同对象,而不是用于描述特定顺序。
在本文中提及“实施例”意味着,结合实施例描述的特定特征、结构或特性可以包含在本申请的至少一个实施例中。在说明书中的各个位置出现该短语并不一定均是指相同的实施例,也不是与其它实施例互斥的独立的或备选的实施例。本领域技术人员显式地和隐式地理解的是,本文所描述的实施例可以与其它实施例相结合。
为了使本技术领域的人员更好地理解本申请方案,下面将结合附图,对本申请实施例中的技术方案进行清楚、完整地描述。
如图1所示,系统架构100可以包括终端设备101、102、103,网络104和服务器105。网络104用以在终端设备101、102、103和服务器105之间提供通信链路的介质。网络104可以包括各种连接类型,例如有线、无线通信链路或者光纤电缆等等。
用户可以使用终端设备101、102、103通过网络104与服务器105交互,以接收或发送消息等。终端设备101、102、103上可以安装有各种通讯客户端应用,例如网页浏览器应用、购物类应用、搜索类应用、即时通信工具、邮箱客户端、社交平台软件等。
终端设备101、102、103可以是具有显示屏并且支持网页浏览的各种电子设备,包括但不限于智能手机、平板电脑、电子书阅读器、MP3播放器(Moving PictureExpertsGroup Audio Layer III,动态影像专家压缩标准音频层面3)、MP4(MovingPictureExperts Group Audio Layer IV,动态影像专家压缩标准音频层面4)播放器、膝上型便携计算机和台式计算机等等。
服务器105可以是提供各种服务的服务器,例如对终端设备101、102、103上显示的页面提供支持的后台服务器。
需要说明的是,本申请实施例所提供的流数据同步的方法一般由服务器执行,相应地,流数据同步的装置一般设置于服务器中。
应该理解,图1中的终端设备、网络和服务器的数目仅仅是示意性的。根据实现需要,可以具有任意数目的终端设备、网络和服务器。
在本申请中,数据仓库使用Hadoop架构搭建的hive数据库,Hadoop是一个由Apache基金会所开发的分布式系统基础架构,用户可以在不了解分布式底层细节的情况下,开发分布式程序,充分利用集群的威力进行高速运算和存储。Hadoop实现了一个分布式文件系统(Distributed File System),其中一个组件是HDFS(Hadoop Distributed FileSystem)。使用Sqoop工具在数据仓库与传统数据库(如mysql、postgresql等)间进行数据的传递,Sqoop是一款开源的工具,可以将一个关系型数据库(例如MySQL,Oracle,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
在一种具体的实施例中,目前针对于人员选拔系统,通常通过配置数据仓库实现离线处理,即在系统处于繁忙阶段时,将核心人事、招聘、绩效、培训、假勤等系统的数据抽取到数据仓库,然后由调度平台定时执行脚本对数据进行清洗、加工,但为了节约服务器资源,调度平台调度频率为一天一次,即一般需要等到凌晨系统进入空闲阶段时,调度平台才会组织进行数据同步,将加工后的数据同步至人员选拔系统,以备人员选拔系统使用,所以数据同步的时效性为T+1,无法实现实时同步。
本申请的数据同步平台在数据仓库的基础上,引入流计算引擎,通过创建离线计算与流计算结合的方式来实现流数据的实时同步,极大提高数据同步实时性。本申请的流数据同步方法可以应用于人员选拔系统,以提高人员选拔系统的时效性,上述人员选拔系统包括第一业务库、第二业务库、数据仓库、Logstash工具、kafka消息队列和SparkStreaming流计算引擎,第一业务库包括核心人事业务库、招聘业务库、绩效业务库、培训业务库、假勤业务库等用于提供人员选拔评估数据的数据库,第一业务库分别与Logstash工具、数据仓库连接,Logstash工具与kafka消息队列连接,数据仓库、kafka消息队列分别与流计算引擎Spark Streaming连接,Spark Streaming与第二业务库连接,第二业务库为选拔业务库。
其中,流计算引擎为Spark Streaming引擎,Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。Spark Streaming支持从多种数据源获取数据,包括Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP Sockets。从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理,最后还可以将处理结果存储到文件系统、数据库和现场仪表盘中。与Spark的其他子框架一样,Spark Streaming也是基于核心Spark的。Spark Streaming在内部的处理机制是,接收实时的输入数据流,并根据一定的时间间隔(如1秒)拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。
继续参考图2,示出了根据本申请的流数据同步的的方法的一个实施例的流程图。所述的流数据同步的方法,包括以下步骤:
S201,接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至所述数据仓库。
具体的,服务器在接收到接收数据同步指令,获取与数据同步指令对应的业务数据,并将业务数据发送至所述数据仓库。其中,数据同步指令中携带有对应的数据加工标识,数据加工标识根据加工需求生成,用于指示数据仓库按照加工需求预先处理需要同步的业务数据,通过设置数据仓库,实现同步数据的离线处理,以降低业务库的压力。需要说明的是,上述业务数据为流数据。
其中,流数据是指由数千个数据源持续生成的数据,通常也同时以数据记录的形式发送,规模较小(约几千字节)。流数据包括多种数据,例如客户使用您的移动或Web应用程序生成的日志文件、网购数据、游戏内玩家活动、社交网站信息、金融交易大厅或地理空间服务,以及来自数据中心内所连接设备或仪器的遥测数据。
在本实施例中,流数据同步的方法运行于其上的电子设备(例如图1所示的服务器/终端设备)可以通过有线连接方式或者无线连接方式接收数据同步指令。需要指出的是,上述无线连接方式可以包括但不限于3G/4G连接、WiFi连接、蓝牙连接、WiMAX连接、Zigbee连接、UWB(ultra wideband)连接、以及其他现在已知或将来开发的无线连接方式。
S202,通过logstash工具从所述第一业务系统中采集与所述业务数据对应的执行日志。
其中,Logstash是一个应用程序日志、事件的传输、处理、管理和搜索的平台,可以用它来统一对应用程序日志进行收集管理,Logstash提供Web接口用于查询和统计。简单来说,logstash作为数据源与数据存储分析工具之间的桥梁,结合ElasticSearch以及Kibana,能够极大方便数据的处理与分析。Logstash配置有200多个插件,可以接受几乎各种各样的数据,包括日志、网络请求、关系型数据库、传感器或物联网等等。Logstash的数据处理过程主要包括:Inputs、Filters、Outputs三部分,另外在Inputs和Outputs中可以使用Codecs对数据格式进行处理。这四个部分均以插件形式存在,用户通过定义pipeline配置文件,设置需要使用的input、filter、output、codec插件,以实现特定的数据采集、数据处理、数据输出等功能。
具体的,服务器通过logstash工具从第一业务系统中采集与业务数据对应的执行日志。需要说明的是,当业务数据发生更新时,更新信息会被记录在执行日志中,通过logstash工具采集和读取与业务数据对应的执行日志,可以确定业务数据的当前状态,通过执行日志确定业务数据的当前状态,以判断数据同步过程中业务数据是否发生变化,如业务数据是否发生变化,则更新存储在数据仓库中的业务数据。
S203,将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果。
其中,Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
具体的,服务器通过预设的kafka消息队列对logstash工具采集到的执行日志进行消费,得到得到日志消费结果,其中,根据日志消费结果可以获得业务数据在发送至数据仓库之后是否发生变化和业务数据的当前状态信息等等。
S204,将所述数据仓库中的业务数据和所述日志消费结果发送至所述流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据。
具体的,本申请的流计算引擎为Spark Streaming,服务器将存储在数据仓库中的业务数据和kafka消息队列输出的日志消费结果传输至流计算引擎Spark Streaming,并通过流计算引擎Spark Streaming对业务数据和日志消费结果进行整合处理,得到整合数据。
需要说明的是,服务器按照预设的第一调度频率对业务数据进行数据采集,得到第一Spark流对象,服务器按照预设的第二调度频率对日志消费结果进行数据采集,得到第二Spark流对象,然后服务器分别将第一Spark流对象和第二Spark流对象均转化为RDD对象,然后通过流计算引擎Spark Streaming对转化为RDD对象后的第一Spark流对象和第二Spark流对象,得到整合数据。
本申请的数据同步平台在数据仓库的基础上,引入流计算引擎,通过创建离线计算与流计算结合的方式来实现流数据的实时同步,极大提高数据同步实时性。
S205,执行预设的SQL更新语句,以将整合数据同步至所述第二业务库。
具体的,在服务器在获得数据整合后,自动执行预设的SQL更新语句,以将整合数据同步至第二业务库,完成数据实时同步,其中,SQL更新语句用于指示数据同步和更新,SQL更新语句由开发人员预选配置。
本申请通过流计算引擎对第一业务系统的业务数据和日志消费结果进行数据整合计算,得到整合数据,通过将整合数据同步至第二业务库,实现流数据的实时同步,提高流数据同步实时性。
进一步地,在所述接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至所述数据仓库之后,还包括:
从所述数据同步指令中获取与所述业务数据对应的数据加工标识;
在预设的脚本库中查找与所述数据加工标识对应的数据加工脚本;
基于所述数据加工脚本对所述业务数据进行处理。
具体的,服务器从数据同步指令中获取业务数据对应的数据加工标识,并在预设的脚本库中查找与数据加工标识对应的数据加工脚本,基于数据加工脚本对业务数据进行处理。其中,数据同步指令中携带有对应的数据加工标识,数据加工标识根据加工需求生成,用于指示数据仓库按照加工需求预先处理需要同步的业务数据,通过设置数据仓库,实现同步数据的离线处理,以降低业务库的压力。
在本申请具体的实施例中,上述数据加工脚本为开发人员预先配置的hive语句,一种数据加工脚本具体为“insert into table1 select t2.column1,t3.column2 fromtable2 t2 join table3 t3 on t2.column1=t3.column2 group by t2.column1,t3.column2”。服务器在执行上述加工脚本时,会将表格table1中的数据添加至表格table2,将表格table2数据添加至表格table3。
在上述实施例中,通过获取数据加工标识以查找数据加工脚本,基于数据加工脚本预先对业务数据进行处理,实现同步数据的离线处理,以降低业务库的压力。
进一步地,所述通过logstash工具从所述第一业务系统中采集与所述业务数据对应的执行日志的步骤,具体包括:
对所述第一业务系统的数据库进行监测,当监测到所述数据库的事务发生改变时,通过所述logstash工具获取发生改变的所述事务的执行日志;
在所述数据库中建立临时存储区,并将所述执行日志写入所述临时存储区中。
其中,Logstash是一个应用程序日志、事件的传输、处理、管理和搜索的平台,可以用它来统一对应用程序日志进行收集管理,Logstash提供Web接口用于查询和统计。
具体的,服务器通过对第一业务系统的数据库进行实时监测,当服务器监测到数据库的事务发生改变时,控制logstash工具获取发生改变的事务对应的执行日志。其中,服务器会在数据库中预先建立一个临时存储区,并将logstash工具获取的执行日志写入临时存储区中进行暂存。
进一步地,所述对所述第一业务系统的数据库进行监测,当监测到所述数据库的数据库事务发生改变时,通过所述logstash工具获取发生改变的所述事务的执行日志的步骤,具体包括:
确定所述业务数据对应的日志数据,并获取所述日志数据的主关键字;
基于所述日志数据的主关键字对第一业务系统数据库中的事务进行监测;
当监测到所述第一业务系统的数据库中与所述业务数据具有相同主关键字的事务发生改变时,生成发生改变的所述事务的最新时间戳;
获取所述最新时间戳的日志数据,得到执行日志。
其中,日志数据的主关键字(primary key)是表中的一个或多个字段,它的值用于唯一的标识表中的某一条记录,所以将主关键字作为第一业务系统的数据库的数据库事务进行监测的基准。时间戳是使用数字签名技术产生的数据,签名的对象包括了原始文件信息、签名参数、签名时间等信息。时间戳系统用来产生和管理时间戳,对签名对象进行数字签名产生时间戳,以证明原始文件在签名时间之前已经存在。
具体的,服务器对业务数据进行解析,以确定业务数据对应的日志数据,并获取日志数据中的主关键字,然后基于日志数据的主关键字对第一业务系统的数据库进行检索,确定第一业务系统数据库中与业务数据具有相同主关键字的的事务,并对检索到的与业务数据具有相同主关键字的的事务进行实时监测,当监测到第一业务系统的数据库中与业务数据具有相同主关键字的事务发生改变时,生成发生改变的事务的最新时间戳,并获取最新时间戳的日志数据,得到执行日志。
在上述实施例中,通过对第一业务系统数据库中与业务数据具有相同主关键字的的事务进行实时监测,当监测到第一业务系统的数据库中与业务数据具有相同主关键字的事务发生改变时,logstash工具获取发生改变的事务的执行日志,可以确定业务数据的当前状态,通过执行日志确定业务数据的当前状态,以判断数据同步过程中业务数据是否发生变化,如业务数据是否发生变化,则更新存储在数据仓库中的业务数据。
进一步地,所述将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果的步骤,具体包括:
解析所述执行日志,并基于所述执行日志的内容生成类别创建指令;
基于所述类别创建指令在所述kafka消息队列中创建对应的主题类别;
将所述执行日志从所述临时存储区发送至所述主题类别中进行存储;
执行预设的日志消费指令以消费所述主题类别中的所述执行日志,得到日志消费结果。
具体的,服务器通过对获取的执行日志进行解析,获取执行日志的内容,并基于执行日志的内容生成kafka消息队的类别创建指令,基于类别创建指令在kafka消息队列中创建对应的主题类别,并将执行日志从临时存储区发送至主题类别中进行存储,以及通过执行预设的日志消费指令消费主题类别中的执行日志,得到日志消费结果。
在上述实施例中,基于执行日志的内容生成类别创建指令,基于类别创建指令在kafka消息队列中创建对应的主题类别,并通过数据同步工件将执行日志从临时存储区发送至主题类别中进行存储,然后执行预设的日志消费指令以消费主题类别中的执行日志,得到日志消费结果,其中,根据日志消费结果可以获得业务数据在发送至数据仓库之后是否发生变化和业务数据的当前状态信息等等。
进一步地,所述将所述数据仓库中的业务数据和所述日志消费结果发送至所述流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据的步骤,具体包括:
按照预设的第一调度频率采集所述数据仓库中的业务数据,生成第一Spark流对象;
按照预设的第二调度频率采集所述日志消费结果,生成第二Spark流对象;
分别对所述第一Spark流对象和所述第二Spark流对象进行对象转化,将所述第一Spark流对象和所述第二Spark流对象转化为RDD对象;
通过所述流计算引擎合并转化为RDD对象后的所述第一Spark流对象和所述第二Spark流对象,得到整合数据。
其中,第一调度频率和第二调度频率可以提前根据需求进行配置,数据同步指令携带有第一调度频率和第二调度频率。
具体的,服务器按照预设的第一调度频率采集数据仓库中的业务数据,生成第一Spark流对象。在一种具体的实施例中,加载数据仓库的业务数据,设置StreamingContext第二个参数为批次时间,即第一调度频率,因为数据仓库的存量数据不会频繁更新,批次时间可设置为8小时,得到第一Spark流对象如下:
“valconf:SparkConf=new SparkConf().setAppName
("hdfsWD").setMaster("local[2]");
valsc=new StreamingContext(conf,Seconds(28800));
valtextFS:DStream[String]=sc.textFileStream
("hdfs://node01:8020/data");”
服务器按照预设的第二度频率控制logstash工具从第一业务系统中采集与业务数据对应的执行日志,并由kafka消息队列消费执行日志,得到日志消费结果,服务器调用上述日志消费结果,生成第二Spark流对象。在一种具体的实施例中,加载kafka的日志消费结果,第一业务系统中的增量数据会不断更新,批次时间设置为1分钟,即第二调度频率,得到第二Spark流对象如下:
“val sparkConf=new SparkConf().setMaster
("local[*]").setAppName("StreamingWithKafka")
val ssc=new StreamingContext(sparkConf,Seconds(60));”
将上述述第一Spark流对象和第二Spark流对象转化为RDD对象,并通过流计算引擎合并转化为RDD对象后的第一Spark流对象和所述第二Spark流对象,得到整合数据。
其中,RDD对象为弹性分布式数据集(Resilient Distributed Dataset,RDD)是Spark中的核心概念,RDD本质是一个泛型的数据对象,可以理解为数据容器,RDD本身是一个复合型的数据结构,可以实现数据结构合并。
在上述实施例中,通过设定不同的调度频率分别对数据仓库的业务数据和日志消费结果进行采样,得到对应的Spark流对象,将流数据划分为批量数据,然后依次处理。将得到的Spark流对象转化为为RDD对象,并通过流计算引擎Spark Streaming合并转化为RDD对象后的Spark流对象,得到整合数据,通过流计算引擎Spark Streaming对数据的整合实现了流数据实时同步。
进一步地,所述执行预设的SQL更新语句,以将整合数据同步至所述第二业务库的步骤,具体包括:
调用RDDforeach工具对所述整合数据进行数据划分,得到若干个RDD分区;
在每一个所述RDD分区与所述第二业务库之间创建一个连接对象;
通过所述连接对象将每一个所述RDD分区内的数据同步至所述第二业务库内。
具体的,服务器调用RDDforeach工具遍历整合数据,以对整合数据划分,得到若干个RDD分区,然后调用foreachPartition工具在每个RDD分区和第二业务库之间均创建一个连接对象,通过连接对象将每一个RDD分区内的数据都写入第二业务库中。
在上述实施例中,通过对整合数据划分得到多个RDD分区和通过在每个RDD分区和第二业务库之间均创建一个连接对象,使得数据同步时,可以通过连接对象将每一个RDD分区内的数据都写入第二业务库中,这样可以大大减少创建的连接对象的数量,降低系统压力。
本申请公开了一种流数据同步的方法,属于大数据技术领域。本申请的数据同步平台在数据仓库的基础上,引入流计算引擎,通过创建离线计算与流计算结合的方式来实现流数据的实时同步,极大提高数据同步实时性,本申请通过获取业务数据,并将业务数据发送至数据仓库,其中,业务数据为流数据,通过logstash工具从第一业务系统中采集与业务数据对应的执行日志,通过kafka消息队列消费执行日志,得到日志消费结果,通过流计算引擎对业务数据和日志消费结果进行数据整合计算,得到整合数据,执行SQL更新语句,以将整合数据同步至第二业务库,以实现流数据的实时同步,提高流数据同步实时性。
需要强调的是,为进一步保证上述业务数据的私密和安全性,上述业务数据还可以存储于一区块链的节点中。
本申请所指区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。区块链(Blockchain),本质上是一个去中心化的数据库,是一串使用密码学方法相关联产生的数据块,每一个数据块中包含了一批次网络交易的信息,用于验证其信息的有效性(防伪)和生成下一个区块。区块链可以包括区块链底层平台、平台产品服务层以及应用服务层等。
本领域普通技术人员可以理解实现上述实施例方法中的全部或部分流程,是可以通过计算机可读指令来指令相关的硬件来完成,该计算机可读指令可存储于一计算机可读取存储介质中,该计算机可读指令在执行时,可包括如上述各方法的实施例的流程。其中,前述的存储介质可为磁碟、光盘、只读存储记忆体(Read-Only Memory,ROM)等非易失性存储介质,或随机存储记忆体(Random Access Memory,RAM)等。
应该理解的是,虽然附图的流程图中的各个步骤按照箭头的指示依次显示,但是这些步骤并不是必然按照箭头指示的顺序依次执行。除非本文中有明确的说明,这些步骤的执行并没有严格的顺序限制,其可以以其他的顺序执行。而且,附图的流程图中的至少一部分步骤可以包括多个子步骤或者多个阶段,这些子步骤或者阶段并不必然是在同一时刻执行完成,而是可以在不同的时刻执行,其执行顺序也不必然是依次进行,而是可以与其他步骤或者其他步骤的子步骤或者阶段的至少一部分轮流或者交替地执行。
进一步参考图3,作为对上述图2所示方法的实现,本申请提供了一种流数据同步的装置的一个实施例,该装置实施例与图2所示的方法实施例相对应,该装置具体可以应用于各种电子设备中。
如图3所示,本实施例所述的流数据同步的装置包括:
指令接收模块301,用于接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至所述数据仓库;
日志采集模块302,用于通过logstash工具从所述第一业务系统中采集与所述业务数据对应的执行日志;
日志消费模块303,用于将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果;
数据整合模块304,用于将所述数据仓库中的业务数据和所述日志消费结果发送至所述流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据;
数据同步模块305,用于执行预设的SQL更新语句,以将整合数据同步至所述第二业务库。
进一步地,所述流数据同步的装置还包括:
标识获取模块,用于从所述数据同步指令中获取与所述业务数据对应的数据加工标识;
脚本获取模块,用于在预设的脚本库中查找与所述数据加工标识对应的数据加工脚本;
数据加工模块,用于基于所述数据加工脚本对所述业务数据进行处理。
进一步地,所述日志采集模块302具体包括:
日志采集单元,用于对所述第一业务系统的数据库进行监测,当监测到所述数据库的事务发生改变时,通过所述logstash工具获取发生改变的所述事务的执行日志;
日志存储单元,用于在所述数据库中建立临时存储区,并将所述执行日志写入所述临时存储区中。
进一步地,所述日志采集单元具体包括:
关键字获取子单元,用于确定所述业务数据对应的日志数据,并获取所述日志数据的主关键字;
事务监测子单元,用于基于所述日志数据的主关键字对第一业务系统数据库中的事务进行监测;
时间戳生成子单元,用于当监测到所述第一业务系统的数据库中与所述业务数据具有相同主关键字的事务发生改变时,生成发生改变的所述事务的最新时间戳;
日志采集子单元,用于获取所述最新时间戳的日志数据,得到执行日志。
进一步地,所述日志消费模块303具体包括:
创建指令生成单元,用于解析所述执行日志,并基于所述执行日志的内容生成类别创建指令;
主题类别创建单元,用于基于所述类别创建指令在所述kafka消息队列中创建对应的主题类别;
转移存储单元,用于将所述执行日志从所述临时存储区发送至所述主题类别中进行存储;
日志消费单元,用于执行预设的日志消费指令以消费所述主题类别中的所述执行日志,得到日志消费结果。
进一步地,所述数据整合模块304具体包括:
第一对象采集单元,用于按照预设的第一调度频率采集所述数据仓库中的业务数据,生成第一Spark流对象;
第二对象采集单元,用于按照预设的第二调度频率采集所述日志消费结果,生成第二Spark流对象;
对象转化单元,用于分别对所述第一Spark流对象和所述第二Spark流对象进行对象转化,将所述第一Spark流对象和所述第二Spark流对象转化为RDD对象;
数据整合单元,用于通过所述流计算引擎合并转化为RDD对象后的所述第一Spark流对象和所述第二Spark流对象,得到整合数据。
进一步地,所述数据同步模块305具体包括:
数据划分单元,用于调用RDDforeach工具对所述整合数据进行数据划分,得到若干个RDD分区;
连接对象创建单元,用于在每一个所述RDD分区与所述第二业务库之间创建一个连接对象;
数据同步单元,用于通过所述连接对象将每一个所述RDD分区内的数据同步至所述第二业务库内。
本申请公开了一种流数据同步的装置,属于大数据技术领域。本申请的数据同步平台在数据仓库的基础上,引入流计算引擎,通过创建离线计算与流计算结合的方式来实现流数据的实时同步,极大提高数据同步实时性,本申请通过获取业务数据,并将业务数据发送至数据仓库,其中,业务数据为流数据,通过logstash工具从第一业务系统中采集与业务数据对应的执行日志,通过kafka消息队列消费执行日志,得到日志消费结果,通过流计算引擎对业务数据和日志消费结果进行数据整合计算,得到整合数据,执行SQL更新语句,以将整合数据同步至第二业务库,以实现流数据的实时同步,提高流数据同步实时性。
为解决上述技术问题,本申请实施例还提供计算机设备。具体请参阅图4,图4为本实施例计算机设备基本结构框图。
所述计算机设备4包括通过系统总线相互通信连接存储器41、处理器42、网络接口43。需要指出的是,图中仅示出了具有组件41-43的计算机设备4,但是应理解的是,并不要求实施所有示出的组件,可以替代的实施更多或者更少的组件。其中,本技术领域技术人员可以理解,这里的计算机设备是一种能够按照事先设定或存储的指令,自动进行数值计算和/或信息处理的设备,其硬件包括但不限于微处理器、专用集成电路(ApplicationSpecific Integrated Circuit,ASIC)、可编程门阵列(Field-Programmable GateArray,FPGA)、数字处理器(Digital Signal Processor,DSP)、嵌入式设备等。
所述计算机设备可以是桌上型计算机、笔记本、掌上电脑及云端服务器等计算设备。所述计算机设备可以与用户通过键盘、鼠标、遥控器、触摸板或声控设备等方式进行人机交互。
所述存储器41至少包括一种类型的可读存储介质,所述可读存储介质包括闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘等。在一些实施例中,所述存储器41可以是所述计算机设备4的内部存储单元,例如该计算机设备4的硬盘或内存。在另一些实施例中,所述存储器41也可以是所述计算机设备4的外部存储设备,例如该计算机设备4上配备的插接式硬盘,智能存储卡(Smart Media Card,SMC),安全数字(Secure Digital,SD)卡,闪存卡(FlashCard)等。当然,所述存储器41还可以既包括所述计算机设备4的内部存储单元也包括其外部存储设备。本实施例中,所述存储器41通常用于存储安装于所述计算机设备4的操作系统和各类应用软件,例如流数据同步的方法的计算机可读指令等。此外,所述存储器41还可以用于暂时地存储已经输出或者将要输出的各类数据。
所述处理器42在一些实施例中可以是中央处理器(Central Processing Unit,CPU)、控制器、微控制器、微处理器、或其他数据处理芯片。该处理器42通常用于控制所述计算机设备4的总体操作。本实施例中,所述处理器42用于运行所述存储器41中存储的计算机可读指令或者处理数据,例如运行所述流数据同步的方法的计算机可读指令。
所述网络接口43可包括无线网络接口或有线网络接口,该网络接口43通常用于在所述计算机设备4与其他电子设备之间建立通信连接。
本申请公开了一种流数据同步的方法、装置、计算机设备及存储介质,属于大数据技术领域。本申请的数据同步平台在数据仓库的基础上,引入流计算引擎,通过创建离线计算与流计算结合的方式来实现流数据的实时同步,极大提高数据同步实时性,本申请通过获取业务数据,并将业务数据发送至数据仓库,其中,业务数据为流数据,通过logstash工具从第一业务系统中采集与业务数据对应的执行日志,通过kafka消息队列消费执行日志,得到日志消费结果,通过流计算引擎对业务数据和日志消费结果进行数据整合计算,得到整合数据,执行SQL更新语句,以将整合数据同步至第二业务库,以实现流数据的实时同步,提高流数据同步实时性。
本申请还提供了另一种实施方式,即提供一种计算机可读存储介质,所述计算机可读存储介质存储有计算机可读指令,所述计算机可读指令可被至少一个处理器执行,以使所述至少一个处理器执行如上述的流数据同步的方法的步骤。
本申请公开了一种流数据同步的方法、装置、计算机设备及存储介质,属于大数据技术领域。本申请的数据同步平台在数据仓库的基础上,引入流计算引擎,通过创建离线计算与流计算结合的方式来实现流数据的实时同步,极大提高数据同步实时性,本申请通过获取业务数据,并将业务数据发送至数据仓库,其中,业务数据为流数据,通过logstash工具从第一业务系统中采集与业务数据对应的执行日志,通过kafka消息队列消费执行日志,得到日志消费结果,通过流计算引擎对业务数据和日志消费结果进行数据整合计算,得到整合数据,执行SQL更新语句,以将整合数据同步至第二业务库,以实现流数据的实时同步,提高流数据同步实时性。
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本申请的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质(如ROM/RAM、磁碟、光盘)中,包括若干指令用以使得一台终端设备(可以是手机,计算机,服务器,空调器,或者网络设备等)执行本申请各个实施例所述的方法。
显然,以上所描述的实施例仅仅是本申请一部分实施例,而不是全部的实施例,附图中给出了本申请的较佳实施例,但并不限制本申请的专利范围。本申请可以以许多不同的形式来实现,相反地,提供这些实施例的目的是使对本申请的公开内容的理解更加透彻全面。尽管参照前述实施例对本申请进行了详细的说明,对于本领域的技术人员来而言,其依然可以对前述各具体实施方式所记载的技术方案进行修改,或者对其中部分技术特征进行等效替换。凡是利用本申请说明书及附图内容所做的等效结构,直接或间接运用在其他相关的技术领域,均同理在本申请专利保护范围之内。
Claims (9)
1.一种流数据同步的方法,其特征在于,包括:
接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至数据仓库;
通过logstash工具从第一业务系统中采集与所述业务数据对应的执行日志;
将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果;
将所述数据仓库中的业务数据和所述日志消费结果发送至流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据;
所述将所述数据仓库中的业务数据和所述日志消费结果发送至所述流计算引擎,并通过流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据的步骤,具体包括:
按照预设的第一调度频率采集所述数据仓库中的业务数据,生成第一Spark流对象;
按照预设的第二调度频率采集所述日志消费结果,生成第二Spark流对象;
分别对所述第一Spark流对象和所述第二Spark流对象进行对象转化,将所述第一Spark流对象和所述第二Spark流对象转化为RDD对象;
通过所述流计算引擎合并转化为RDD对象后的所述第一Spark流对象和所述第二Spark流对象,得到整合数据;
执行预设的SQL更新语句,以将整合数据同步至第二业务库。
2.如权利要求1所述的流数据同步的方法,其特征在于,在所述接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至数据仓库之后,还包括:
从所述数据同步指令中获取与所述业务数据对应的数据加工标识;
在预设的脚本库中查找与所述数据加工标识对应的数据加工脚本;
基于所述数据加工脚本对所述业务数据进行处理。
3.如权利要求1所述的流数据同步的方法,其特征在于,所述通过logstash工具从第一业务系统中采集与所述业务数据对应的执行日志的步骤,具体包括:
对所述第一业务系统的数据库进行监测,当监测到所述数据库的事务发生改变时,通过所述logstash工具获取发生改变的所述事务的执行日志;
在所述数据库中建立临时存储区,并将所述执行日志写入所述临时存储区中。
4.如权利要求3所述的流数据同步的方法,其特征在于,所述对所述第一业务系统的数据库进行监测,当监测到所述数据库的数据库事务发生改变时,通过所述logstash工具获取发生改变的所述事务的执行日志的步骤,具体包括:
确定所述业务数据对应的日志数据,并获取所述日志数据的主关键字;
基于所述日志数据的主关键字对第一业务系统数据库中的事务进行监测;
当监测到所述第一业务系统的数据库中与所述业务数据具有相同主关键字的事务发生改变时,生成发生改变的所述事务的最新时间戳;
获取所述最新时间戳的日志数据,得到执行日志。
5.如权利要求3所述的流数据同步的方法,其特征在于,所述将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果的步骤,具体包括:
解析所述执行日志,并基于所述执行日志的内容生成类别创建指令;
基于所述类别创建指令在所述kafka消息队列中创建对应的主题类别;
将所述执行日志从所述临时存储区发送至所述主题类别中进行存储;
执行预设的日志消费指令以消费所述主题类别中的所述执行日志,得到日志消费结果。
6.如权利要求1所述的流数据同步的方法,其特征在于,所述执行预设的SQL更新语句,以将整合数据同步至第二业务库的步骤,具体包括:
调用RDDforeach工具对所述整合数据进行数据划分,得到若干个RDD分区;
在每一个所述RDD分区与所述第二业务库之间创建一个连接对象;
通过所述连接对象将每一个所述RDD分区内的数据同步至所述第二业务库内。
7.一种流数据同步的装置,其特征在于,包括:
指令接收模块,用于接收数据同步指令,获取与所述数据同步指令对应的业务数据,并将所述业务数据发送至数据仓库;
日志采集模块,用于通过logstash工具从第一业务系统中采集与所述业务数据对应的执行日志;
日志消费模块,用于将所述执行日志写入到kafka消息队列中,并通过所述kafka消息队列消费所述执行日志,得到日志消费结果;
数据整合模块,用于将所述数据仓库中的业务数据和所述日志消费结果发送至流计算引擎,并通过所述流计算引擎对所述业务数据和所述日志消费结果进行数据整合计算,得到整合数据;
所述数据整合模块具体包括:
第一对象采集单元,用于按照预设的第一调度频率采集所述数据仓库中的业务数据,生成第一Spark流对象;
第二对象采集单元,用于按照预设的第二调度频率采集所述日志消费结果,生成第二Spark流对象;
对象转化单元,用于分别对所述第一Spark流对象和所述第二Spark流对象进行对象转化,将所述第一Spark流对象和所述第二Spark流对象转化为RDD对象;
数据整合单元,用于通过所述流计算引擎合并转化为RDD对象后的所述第一Spark流对象和所述第二Spark流对象,得到整合数据;
数据同步模块,用于执行预设的SQL更新语句,以将整合数据同步至第二业务库。
8.一种计算机设备,其特征在于,包括存储器和处理器,所述存储器中存储有计算机可读指令,所述处理器执行所述计算机可读指令时实现如权利要求1至6中任一项所述的流数据同步的方法的步骤。
9.一种计算机可读存储介质,其特征在于,所述计算机可读存储介质上存储有计算机可读指令,所述计算机可读指令被处理器执行时实现如权利要求1至6中任一项所述的流数据同步的方法的步骤。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202110728683.2A CN113282611B (zh) | 2021-06-29 | 2021-06-29 | 一种流数据同步的方法、装置、计算机设备及存储介质 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202110728683.2A CN113282611B (zh) | 2021-06-29 | 2021-06-29 | 一种流数据同步的方法、装置、计算机设备及存储介质 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN113282611A CN113282611A (zh) | 2021-08-20 |
CN113282611B true CN113282611B (zh) | 2024-04-23 |
Family
ID=77286104
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202110728683.2A Active CN113282611B (zh) | 2021-06-29 | 2021-06-29 | 一种流数据同步的方法、装置、计算机设备及存储介质 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN113282611B (zh) |
Families Citing this family (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN113836235B (zh) * | 2021-09-29 | 2024-04-09 | 平安医疗健康管理股份有限公司 | 基于数据中台的数据处理方法及其相关设备 |
CN114328620A (zh) * | 2021-12-21 | 2022-04-12 | 京东科技控股股份有限公司 | 数据处理的方法及系统、设备及存储介质 |
CN114356995A (zh) * | 2022-01-04 | 2022-04-15 | 京东科技信息技术有限公司 | 区块链数据分析方法、装置及相关设备 |
CN115017223B (zh) * | 2022-08-04 | 2022-10-25 | 成都运荔枝科技有限公司 | 一种支持大数据量导入导出的系统 |
CN116089545B (zh) * | 2023-04-07 | 2023-08-22 | 云筑信息科技(成都)有限公司 | 一种采集存储介质变更数据入数据仓库的方法 |
Citations (8)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN104252466A (zh) * | 2013-06-26 | 2014-12-31 | 阿里巴巴集团控股有限公司 | 流计算处理方法、设备和系统 |
CN109862094A (zh) * | 2019-01-31 | 2019-06-07 | 福建智恒软件科技有限公司 | 一种基于流计算的水务设备数据共享方法及装置 |
CN109951463A (zh) * | 2019-03-07 | 2019-06-28 | 成都古河云科技有限公司 | 一种基于流计算和新型列式存储的物联网大数据分析方法 |
CN111127077A (zh) * | 2019-11-29 | 2020-05-08 | 中国建设银行股份有限公司 | 一种基于流计算的推荐方法和装置 |
CN111209258A (zh) * | 2019-12-31 | 2020-05-29 | 航天信息股份有限公司 | 税务端系统日志实时分析方法、设备、介质及系统 |
CN111414416A (zh) * | 2020-02-28 | 2020-07-14 | 平安科技(深圳)有限公司 | 数据处理方法、装置、设备和存储介质 |
CN112559638A (zh) * | 2021-02-20 | 2021-03-26 | 恒生电子股份有限公司 | 数据同步的方法、装置、设备和存储介质 |
CN112597205A (zh) * | 2020-12-30 | 2021-04-02 | 哈尔滨航天恒星数据系统科技有限公司 | 一种基于流及消息调度的实时数据计算及存储方法 |
Family Cites Families (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US11061926B2 (en) * | 2018-10-02 | 2021-07-13 | Target Brands, Inc. | Data warehouse management and synchronization systems and methods |
-
2021
- 2021-06-29 CN CN202110728683.2A patent/CN113282611B/zh active Active
Patent Citations (8)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN104252466A (zh) * | 2013-06-26 | 2014-12-31 | 阿里巴巴集团控股有限公司 | 流计算处理方法、设备和系统 |
CN109862094A (zh) * | 2019-01-31 | 2019-06-07 | 福建智恒软件科技有限公司 | 一种基于流计算的水务设备数据共享方法及装置 |
CN109951463A (zh) * | 2019-03-07 | 2019-06-28 | 成都古河云科技有限公司 | 一种基于流计算和新型列式存储的物联网大数据分析方法 |
CN111127077A (zh) * | 2019-11-29 | 2020-05-08 | 中国建设银行股份有限公司 | 一种基于流计算的推荐方法和装置 |
CN111209258A (zh) * | 2019-12-31 | 2020-05-29 | 航天信息股份有限公司 | 税务端系统日志实时分析方法、设备、介质及系统 |
CN111414416A (zh) * | 2020-02-28 | 2020-07-14 | 平安科技(深圳)有限公司 | 数据处理方法、装置、设备和存储介质 |
CN112597205A (zh) * | 2020-12-30 | 2021-04-02 | 哈尔滨航天恒星数据系统科技有限公司 | 一种基于流及消息调度的实时数据计算及存储方法 |
CN112559638A (zh) * | 2021-02-20 | 2021-03-26 | 恒生电子股份有限公司 | 数据同步的方法、装置、设备和存储介质 |
Also Published As
Publication number | Publication date |
---|---|
CN113282611A (zh) | 2021-08-20 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN113282611B (zh) | 一种流数据同步的方法、装置、计算机设备及存储介质 | |
CN112507027B (zh) | 基于Kafka的增量数据同步方法、装置、设备及介质 | |
CN111666490B (zh) | 基于kafka的信息推送方法、装置、设备及存储介质 | |
AU2020264374A1 (en) | Systems and methods for real-time processing of data streams | |
CN110019267A (zh) | 一种元数据更新方法、装置、系统、电子设备及存储介质 | |
CN112559475B (zh) | 数据实时捕获和传输方法及系统 | |
Jeong et al. | Anomaly teletraffic intrusion detection systems on hadoop-based platforms: A survey of some problems and solutions | |
CN110795499A (zh) | 基于大数据的集群数据同步方法、装置、设备及存储介质 | |
CN113010542B (zh) | 业务数据处理方法、装置、计算机设备及存储介质 | |
CN112948492A (zh) | 一种数据处理系统、方法、装置、电子设备及存储介质 | |
CN112182004B (zh) | 实时查看数据方法、装置、计算机设备及存储介质 | |
Elagib et al. | Big data analysis solutions using MapReduce framework | |
WO2023000785A1 (zh) | 用于处理数据的方法、装置、系统、服务器和介质 | |
CN113190517B (zh) | 数据集成方法、装置、电子设备和计算机可读介质 | |
Mishra et al. | Challenges in big data application: a review | |
CN117950850A (zh) | 一种数据传输方法、装置、电子设备及计算机可读介质 | |
CN116821493A (zh) | 消息推送方法、装置、计算机设备及存储介质 | |
CN110955709B (zh) | 一种数据的处理方法、装置及电子设备 | |
CN111753010B (zh) | 铁路接触网的数据采集网络架构及实现方法 | |
CN113553320B (zh) | 数据质量监控方法及装置 | |
CN117743291A (zh) | 数据处理方法、装置、计算机设备及存储介质 | |
CN115202837A (zh) | 延时任务处理方法、装置、计算机设备及存储介质 | |
Mehboob et al. | Analysis of issues and trends in Big Data Platforms | |
CN116680263A (zh) | 数据清洗方法、装置、计算机设备及存储介质 | |
CN115793970A (zh) | 数据存储方法、装置、电子设备及存储介质 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |