CN105335376B - 一种流处理方法、装置及系统 - Google Patents
一种流处理方法、装置及系统 Download PDFInfo
- Publication number
- CN105335376B CN105335376B CN201410284343.5A CN201410284343A CN105335376B CN 105335376 B CN105335376 B CN 105335376B CN 201410284343 A CN201410284343 A CN 201410284343A CN 105335376 B CN105335376 B CN 105335376B
- Authority
- CN
- China
- Prior art keywords
- stream process
- component
- stream
- computing resource
- task
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
Classifications
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L41/00—Arrangements for maintenance, administration or management of data switching networks, e.g. of packet switching networks
- H04L41/50—Network service management, e.g. ensuring proper service fulfilment according to agreements
- H04L41/5041—Network service management, e.g. ensuring proper service fulfilment according to agreements characterised by the time relationship between creation and deployment of a service
- H04L41/5054—Automatic deployment of services triggered by the service manager, e.g. service implementation by automatic configuration of network components
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L65/00—Network arrangements, protocols or services for supporting real-time applications in data packet communication
- H04L65/60—Network streaming of media packets
- H04L65/61—Network streaming of media packets for supporting one-way streaming services, e.g. Internet radio
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5005—Allocation of resources, e.g. of the central processing unit [CPU] to service a request
- G06F9/5027—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5061—Partitioning or combining of resources
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L65/00—Network arrangements, protocols or services for supporting real-time applications in data packet communication
- H04L65/60—Network streaming of media packets
- H04L65/75—Media network packet handling
- H04L65/765—Media network packet handling intermediate
Landscapes
- Engineering & Computer Science (AREA)
- Multimedia (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Computer Networks & Wireless Communication (AREA)
- Signal Processing (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Data Exchanges In Wide-Area Networks (AREA)
- Information Transfer Between Computers (AREA)
Abstract
本发明实施例公开了一种流处理方法、装置及系统,该方法包括:接收第一流处理任务,第一流处理任务中包含流处理组件、流处理组件的数据输入及输出关系、流数据源的标识;计算第一流处理任务包含的一个或多个流处理组件中的每一个流处理组件所需要的计算资源;若第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则通过复制至少一个与第一流处理组件具有相同计算逻辑的第二流处理组件,使得原本输入到第一流处理组件的数据可以在第一流处理组件及第二流处理组件间分配,一定程度避免因分配给计算节点的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率。
Description
技术领域
本发明涉及计算机技术领域,尤其涉及一种流处理方法、装置及系统。
背景技术
流处理技术广泛应用在各种领域的实时处理系统中,例如股票证券交易中心、网络监视、web应用、通信数据管理,这类系统的共同特点是,数据实时性强、数据量极大,具有相当高的突发性、连续发生并不断变化。流处理技术需要实时监测连续的数据流,在数据不断变化的过程中实时地进行数据分析,捕捉到可能对用户有用的信息,对紧急情况快速响应,实时处理。
目前,流数据处理主要采用分布式的计算方式。分布式流处理系统中包含多个计算节点,可以由该多个计算节点完成流数据的处理过程。用户提交流处理任务之后,该分布式流系统将该流处理任务中的流处理组件分配给该多个计算节点,其中,流处理组件包含数据的计算逻辑,使得该多台计算节点能够按照分配得到的流处理组件的计算逻辑对流数据进行处理。
然而利用上述的分布式流处理系统处理流数据时经常会出现分布在计算节点的流处理组件所需要的计算资源超过该计算节点能够提供的计算资源的情况,容易造成系统不稳定及数据处理的故障,降低系统性能。
发明内容
本发明实施例提供了一种流处理方法、装置及系统,用于对流处理任务中包含的流处理组件进行分配,能够有效的降低因计算节点分配的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
本发明第一方面提供了一种流处理方法,所述方法包括:
接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;
计算所述第一流处理任务包含的所述一个或多个流处理组件中的每一个流处理组件所需要的计算资源;
若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与所述第一流处理组件具有相同计算逻辑的第二流处理组件,所述第二流处理组件的个数为一个或多个,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且所述第一流处理任务中的所述第一流处理组件所需要的计算资源根据资源分配策略划分给所述第二流处理任务中的所述第一流处理组件和所述第二流处理组件;
将所述第二流处理任务中的流处理组件分配给流处理系统中的满足所述流处理组件所需要的计算资源的计算节点。
在第一方面第一种可能的实现方式中,所述第一流处理任务中还包括流处理组件的算子估计计算量及流传输估计计算量;
所述计算所述第一流处理任务中的每一个流处理组件所需要的计算资源包括:
根据所述第一流处理任务中的每一个流处理组件的算子估计计算量及流传输估计计算量,计算所述每一个流处理组件所需要的计算资源。
结合第一方面第一种可能的实现方式,在第一方面第二种可能的实现方式中,所述根据所述第一流处理任务中的每一个流处理组件的算子估计计算量及流传输估计计算量,计算所述每一个流处理组件所需要的计算资源,包括:
根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;
根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;
将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
结合第一方面或者第一方面第一种可能的实现方式或者第一方面第二种可能的实现方式,在第一方面第三种可能的实现方式中,所述将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点包括:
按照所述第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序将所述流处理组件排序;
将所述流处理组件按照所述排序分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点,其中所述计算节点为所述流处理组件在各个计算节点上的计算资源比例最小的计算节点,所述计算资源比例为所述流处理组件所需要的计算资源与所述计算节点已使用的计算资源的和占所述计算节点总的计算资源的比例。
结合第一方面或者第一方面第一种可能的实现方式或者第一方面第二种可能的实现方式,在第一方面第四种可能的实现方式中,所述将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点包括:
根据预先设置的分类模型确定所述第二流处理任务的类型;
查找预先设置的任务类型与分配方式的对应关系表,确定与所述第二流处理任务的类型对应的分配方式;
按照所述分配方式,为所述第二流处理任务中的流处理组件分配满足所述流处理组件所需要的计算资源的计算节点。
结合第一方面或者第一方面第一种可能的实现方式或者第一方面第二种可能的实现方式或者第一方面第三种可能的实现方式或者第一方面第四种可能的实现方式,在第一方面第五种可能的实现方式中,所述约束条件为所述流处理组件所需要的计算资源小于或等于预先设置的数值,或者,所述流处理组件所需要的计算资源小于各个计算节点能提供的最大的空闲计算资源,或者,所述流处理组件所需要的计算资源小于各个计算节点的空闲计算资源的平均值。
结合第一方面或者第一方面第一种可能的实现方式或者第一方面第二种可能的实现方式或者第一方面第三种可能的实现方式或者第一方面第四种可能的实现方式,在第一方面第六种可能的实现方式中,所述第一数据分配策略为平均分配策略,所述第二数据分配策略为平均分配策略,所述资源分配策略为平均分配策略。
本发明第二方面提供了一种流处理装置,包括:
接收单元,用于接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;
计算单元,用于在所述接收单元接收所述第一流处理任务之后,计算所述第一流处理任务中包含的所述一个或多个流处理组件中的每一个流处理组件所需要的计算资源;
复制更新单元,用于在所述计算单元得到所述每一个流处理组件所需要的计算资源之后,若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与所述第一流处理组件具有相同计算逻辑的第二流处理组件,所述第二流处理组件的个数为一个或多个,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且所述第一流处理任务中的所述第一流处理组件所需要的计算资源根据资源分配策略划分给所述第二流处理任务中的所述第一流处理组件和所述第二流处理组件;
分配单元,用于在所述复制更新单元得到所述第二流处理任务之后,将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点。
在第二方面第一种可能的实现方式中,所述第一流处理任务中还包括流处理组件的算子估计计算量及流处理组件的流传输估计计算量;
则所述计算单元具体用于根据所述第一流处理任务中的每一个流处理组件对应的所述流处理组件的算子估计计算量及所述流处理组件的流传输估计计算量,计算所述每一个流处理组件所需要的计算资源。
结合第二方面第二种可能的实现方式,所述计算单元包括:
第一计算单元,用于在所述接收单元接收所述第一流处理任务之后,根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;
第二计算单元,用于在所述第一计算单元计算所述每一个流处理组件的算子计算量之后,根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;
第三计算单元,用于在所述第二计算单元计算所述每一个流处理组件的流传输计算量之后,将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
结合第二方面或者第二方面第一种可能的实现方式或者第一方面第二种可能的实现方式,在第二方面第三种可能的实现方式中,所述分配单元包括:
排序单元,用于在所述复制更新单元得到所述第二流处理任务之后,按照所述第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序将所述流处理组件排序;
组件分配单元,用于在所述排序单元进行排序后,将所述流处理组件按照所述排序分配给所述流处理系统中满足所述流处理组件所需要的计算资源的计算节点,所述计算节点为所述流处理组件在各个计算节点上的计算资源比例最小的计算节点,所述计算资源比例为所述流处理组件所需要的计算资源与所述计算资源节点已使用的计算资源的和占所述计算节点总的计算资源的比例。
结合第二方面或者第二方面第一种可能的实现方式或者第一方面第二种可能的实现方式,在第二方面第四种可能的实现方式中,所述分配单元包括:
确定单元,用于在所述复制更新单元得到所述第二流处理任务之后,根据预先设置的分类模型确定所述第二流处理任务的类型;
查找单元,用于在所述确定单元确定所述第二流处理任务的类型之后,查找预先设置的任务类型与分配方式的对应关系表,确定与所述第二流处理任务的类型对应的分配方式;
节点分配单元,用于在所述查找单元确定所述对应的分配方式之后,按照所述对应的分配方式,为所述第二流处理任务中的流处理组件分配满足所述流处理组件所需要的计算资源的计算节点。
结合第二方面或者第二方面第一种可能的实现方式或者第一方面第二种可能的实现方式或者第二方面第三种可能的实现方式或者第二方面第四种可能的实现方式,在第二方面第五种可能的实现方式中,所述约束条件为流处理组件所需要的计算资源小于或等于预先设置的数值,或者,流处理组件所需要的计算资源小于所有计算节点中剩余计算资源最大的计算节点所能提供的计算资源,或者流处理组件所需要的计算资源小于计算节点的剩余计算资源的平均值。
结合第二方面或者第二方面第一种可能的实现方式或者第一方面第二种可能的实现方式或者第二方面第三种可能的实现方式或者第二方面第四种可能的实现方式,在第二方面第六种可能的实现方式中,所述第一数据分配策略为平均分配策略,所述第二数据分配策略为平均分配策略,且所述资源分配策略为平均分配策略。
本发明第三方面提供了一种流处理系统,包括:流处理装置和多个计算节点,其中:
所述流处理装置用于:接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;计算所述第一流处理任务包含的所述一个或多个流处理组件中的每一个流处理组件所需要的计算资源;若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与所述第一流处理组件具有相同计算逻辑的第二流处理组件,所述第二流处理组件的个数为一个或多个,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且,所述第一流处理任务中的所述第一流处理组件所需要的计算资源根据资源分配策略划分给所述第二流处理任务中的所述第一流处理组件和所述第二流处理组件;
所述计算节点用于:接受所述流处理装置分配的流处理组件,按照所述流处理组件的计算逻辑对发送给所述流处理组件的数据进行处理。
在第三方面第一种可能的实现方式中,所述第一流处理任务中还包括流处理组件的算子估计计算量及流传输估计计算量;
则所述流处理组件具体用于:根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
结合第三方面或者第三方面第一种可能的实现方式,在第三方面第二种可能的实现方式中,所述约束条件为所述流处理组件所需要的计算资源小于或等于预先设置的数值,或者,所述流处理组件所需要的计算资源小于各个计算节点能提供的最大的空闲计算资源,或者,所述流处理组件所需要的计算资源小于各个计算节点的空闲计算资源的平均值。
结合第三方面或者第三方面第一种可能的实现方式,在第三方面第三种可能的实现方式中,所述第一数据分配策略为平均分配策略,所述第二数据分配策略为平均分配策略,且所述资源分配策略为平均分配策略。
从以上技术方案可以看出,本发明实施例具有以下优点:
流处理装置接收第一流处理任务,该第一流处理任务中包含一个或多个流处理组件、流处理组件的数据输入及输出关系、流数据源的标识,计算该第一流处理任务中的每一个流处理组件所需要的计算资源,若该第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与该第一流处理组件有相同计算逻辑的第二流处理组件,该第二流处理组件的个数为一个或多个,将该第二流处理组件添加到该第一流处理任务中,得到第二流处理任务;在该第二流处理任务中,第二流处理组件具有与第一流处理组件相同的数据输入及输出关系,且若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件;将第二流处理任务中的流处理组件分配给流处理装置中的满足流处理组件所需要的计算资源的计算节点。通过复制至少一个与第一流处理组件具有相同计算逻辑的第二流处理组件,使得原本输入到第一流处理组件的数据可以在第一流处理组件及第二流处理组件间分配,第一流处理组件所需要的计算资源减少,一定程度避免因分配给计算节点的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
附图说明
图1为现有技术中流处理任务的示意图;
图2为本发明实施例中流处理方法的一个示意图;
图3a为本发明实施例中第一流处理任务的示意图;
图3b为本发明实施例中第二流处理任务的示意图;
图4为本发明实施例中流处理方法的另一示意图;
图5为本发明实施例中流处理装置的结构的一个示意图;
图6为本发明实施例中流处理装置的结构的另一示意图;
图7为本发明实施例中流处理系统的结构的一个示意图;
图8为本发明实施例中流处理装置的结构的另一示意图。
具体实施方式
本发明实施例提供了一种流处理方法、装置及系统,用于对流处理任务中包含的流处理组件进行分配,能够有效的降低因计算节点分配的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
下面通过具体实施例,分别进行详细的说明。
为使得本发明的发明目的、特征、优点能够更加的明显和易懂,下面将结合本发明实施例中的附图,对本发明实施例中的技术方案进行清楚、完整地描述,显然,下面所描述的实施例仅仅是本发明一部分实施例,而非全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其它实施例,都属于本发明保护的范围。
本发明的说明书和权利要求书及上述附图中的术语“第一”、“第二”、“第三”“第四”等(如果存在)是用于区别类似的对象,而不必用于描述特定的顺序或先后次序。应该理解这样使用的数据在适当情况下可以互换,以便这里描述的本发明的实施例例如能够以除了在这里图示或描述的那些以外的顺序实施。此外,术语“包括”和“具有”以及他们的任何变形,意图在于覆盖不排他的包含,例如,包含了一系列步骤或单元的过程、方法、系统、产品或设备不必限于清楚地列出的那些步骤或单元,而是可包括没有清楚地列出的或对于这些过程、方法、产品或设备固有的其它步骤或单元。
请参阅图1,为现有技术中流处理任务的示意图,其中,流处理组件可以接收流数据源或者其他流处理组件的输出数据,流数据源提供数据流,其中,流数据源包含流数据源A和B,其中,流数据源A的输出数据发送给流处理组件A,流数据源B的输出数据发送给流处理组件A和B,且流处理组件A和B将输出数据发送给流处理组件C,其中,流处理组件可以有多个流处理单元构成,在图1中,流处理组件包含流处理单元A1至Ai。
请参阅图2,为本发明实施例中一种流处理方法,该流处理方法应用于流处理装置,该方法包括:
201、接收第一流处理任务;
在本发明实施例中,用户可向流处理装置提交第一流处理任务,该第一流处理任务中包含一个或多个流处理组件、流处理组件的数据输入及输出关系、流数据源的标识,此外,第一流处理任务中还可包括存储设备的标识,其中,流处理组件承载对数据进行处理的计算逻辑,例如计算逻辑可以是数据筛选、求和、求平均值、选取特征值等等。其中,流处理组件的数据输入及输出关系是指流处理组件的输入数据是由哪个流处理组件或者流数据源输入的,及流处理组件的输出数据是发送给哪个流处理组件或者存储设备的,或者,流处理组件的数据输入及输出关系时指流处理组件的输入数据是由哪个流处理组件和流数据源输入的,及流处理组件的输出数据是发送给哪个流处理组件或者存储设备的。例如:流处理组件的A和流数据源B将数据输入至流处理组件C,数据经过流处理组件C之后发送给流处理组件D,则流处理组件C的输入关系包括流处理组件A和流数据源,流处理组件C的输出关系包括流处理组件D。
202、计算第一流处理任务包含的一个或多个流处理组件中的每一个流处理组件所需要的计算资源;
在本发明实施例中,流处理装置可计算第一流处理任务包含的一个或多个流处理组件中的每一个流处理组件所需要的计算资源,具体可以是,流处理装置根据预先设置的预测函数计算流处理任务中的每个流处理组件所需要的计算资源,其中,预先设置的预测函数可以根据需要设置,本发明实施例对此不作限定。
203、若第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与第一流处理组件具有相同计算逻辑的第二流处理组件,第二流处理组件的个数为一个或多个,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务;
在本发明实施例中,流处理装置在得到第一流处理任务中的每一个流处理组件所需要的计算资源之后,若该第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与该第一流处理组件具有相同计算逻辑的第二流处理组件,该第二流处理组件的个数为一个或多个,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务,在该第二流处理任务中,第二流处理组件与第一流处理组件具有相同的数据输入及输出关系,且,若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件。
在本发明实施例中,第一数据分配策略可以为平均分配策略,则若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据平均分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件;例如:若第一流处理任务中的第一流处理组件所需要的计算资源为K,第二流处理任务中第一流处理组件及与该第一流处理组件具有相同的计算逻辑的第二流处理组件的个数和为N,则第二流处理任务中的第一流处理组件和第二流处理组件所需要的计算资源均为K/N。
其中,第二数据分配策略也可以为平均分配策略,则若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据平均分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件。
在第一数据分配策略和第二数据分配策略均为平均分配策略的时,资源分配策略也可以为平均分配策略,第一流处理任务中的第一流处理组件所需要的计算资源按照平均分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件。
需要说明的是,在本发明实施例中,第一数据分配策略还可以为随机分配策略,奇偶分配策略,或者按照预先设置的比例进行分配等等,且第二数据分配策略也可以为随机分配策略、奇偶分配策略或者参照预先设置的比例进行分配等等,且第一数据分配策略和第二数据分配策略可以相同也可以不同。
需要说明的是,在本发明实施例中,资源分配策略与第一数据分配策略及第二数据分配策略有关,例如,若第一数据分配策略和第二数据分配策略均为平均分配策略,则资源分配策略也为平均分配策略,若第一数据分配策略为随机分配策略,第二数据分配策略为奇偶分配策略,则资源分配策略为平均分配策略。
为了更好的理解,请参阅图3a,为本发明实施例中第一流处理任务一个示例的结构示意图,其中,节点A和B表示流数据源,节点C、D、E、F、H表示流处理组件,箭头方向表示数据的流向,节点G表示存储设备,若节点D所需要的计算资源不满足预先设置的约束条件,则复制一个与节点D具有相同计算逻辑流处理组件,表示为节点D’,得到第二流处理任务,请参阅图3b,为本发明实施例中第二流处理组件的一个示例的结构示意图,在图3b中,节点D’与节点D具有相同的数据输入及输出关系,且节点D的上一层节点C、H将按照平均分配策略的方式将数据发送给节点D和D’。
在本发明实施例中,可根据具体的需要预先设置约束条件,例如:该预先设置的约束条件可以为:流处理组件所需要的计算资源小于或等于预先设置的数值,或者,流处理组件所需要的计算资源小于流处理装置中的各个计算节点能够提供的最大的空闲计算资源,或者流处理组件所需要的计算资源小于流处理装置中的各个计算节点的空闲计算资源的平均值。在实际应用中,可根据具体的需要设置约束条件,此处不做限定。
需要说明的是,在本发明实施例中,若预先设置的约束条件为:流处理组件所需要的计算资源大于预先设置的数值,或者流处理组件所需要的计算资源大于或等于流处理装置中的各个计算节点能够提供的最大的空闲计算资源,或者流处理组件所需要的计算资源大于或等于流处理装置中的各个计算节点的空闲计算资源的平均值,则流处理组件可在第一流处理任务中包含所需要的计算资源满足预先设置的约束条件的第一流处理组件的情况下,复制与第一流处理组件具有相同计算逻辑的第二流处理组件。
204、将第二流处理任务中的流处理组件分配给流处理系统中的满足流处理组件所需要的计算资源的计算节点。
在本发明实施例中,流处理装置将得到的第二流处理任务中的流处理组件分配给流处理装置中的满足流处理组件所需要的计算资源的计算节点,需要说明的是,数据流在计算节点上的数据输入及输出的方向与计算节点分配的流处理组件之间的数据输入输出的方向一致。
在本发明实施例中,流处理装置接收第一流处理任务,计算该第一流处理任务中的每一个流处理组件所需要的计算资源,若该第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与该第一流处理组件有相同计算逻辑的第二流处理组件,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务,并将该第二流处理任务中的流处理组件分配给流处理装置中的满足该流处理组件所需要的计算资源的计算节点,通过复制至少一个与不满足约束条件的第一流处理组件具有相同计算逻辑的第二流处理组件,且由于第二流处理组件与第一流处理组件具有相同的数据输入及输出关系,且若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件按照第一数据分配策略将数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源将根据第二数据分配策略将数据分组发送给第一流组件和第二流处理组件,使得可将不满足约束条件的第一流处理组件所需要的计算资源在第一流处理组件及第二流处理组件间分配,降低第一流处理组件所需要的计算资源,能够有效的降低因计算节点分配的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
为了更好的理解本发明实施例中的技术方案,请参阅图4,为本发明实施例中一种流处理方法的实施例,包括:
401、接收第一流处理任务;
在本发明实施例中,流处理装置可接收用户提交的第一流处理任务,该第一流处理任务中包含一个或多个流处理组件、流处理组件的数据输入及输出关系、流数据源的标识、流处理组件的算子计算量及流传输计算量。
其中,算子估计计算量是指按照流处理组件的计算逻辑对单位数据进行处理估计需要的计算量,流传输估计计算量是指对单位数据进行传输估计需要的计算量,其中,单位数据是指单位时间内传输的数据,该单位数据与流数据源输出数据的速度有关。
需要说明的是,第一流处理任务中包含的流处理组件的算子估计计算量及流处理组件的流传输估计计算量可以用于计算流处理组件所需要的计算资源,若需要计算流处理组件的其他资源,例如内存资源、网络带宽资源等等,可以在流处理任务中携带与所需要计算的资源类型相关的参数,本发明实施例中是以计算流处理组件所需要的计算资源描述的技术方案,在实际应用中,用户可通过设置流处理任务中的参数设置流处理组件所需要资源的具体类型,此处不做限定。
402、根据第一流处理任务中的每一个流处理组件的算子估计计算量及流传输估计计算量,计算每一个流处理组件所需要的计算资源;
在本发明实施例中,流处理任务中包含了第一流处理任务中的每一个流处理组件的算子估计计算量及流处理组件的流传输估计计算量,流处理装置将利用第一流处理任务中的每一个流处理组件的算子估计计算量及流传输估计计算量,计算每一个流处理组件所需要的计算资源。
需要说明的是,在本发明实施例中,可使用预先设置的预测函数计算每一个流处理组件所需要的计算资源,且需要计算的流处理组件的资源的类型不同,所需要使用的预测函数也是不同的,在实际应用中,流处理组件中已预先设置计算不同资源所需要使用的预测函数,此处不做限定。
在本发明实施例中,流处理装置计算每一个流处理组件所需要的计算资源可以是:
1)根据每一个流处理组件的算子估计计算量及流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算每一个流处理组件的算子计算量;
其中,流处理装置在接收到流处理任务之后,可预先估计该流处理任务中包含的第一流处理组件中的每一个流处理组件的源代码的估计计算量,且作为可参考的计算方式,预先设置的算子计算量预测函数可以为:
Vi=a*Pi+b*Mi
其中,i表示第一流处理任务中的第i个流处理组件,a和b为预先设置的调整参数,Vi表示第i个流处理组件的算子计算量,Pi表示的第i个流处理组件的算子估计计算量,Mi表示流处理装置第i个流处理组件的源代码的估计计算量。
优选的,为了在流处理装置执行接收到的流处理任务的过程中能够对流处理组件的分配情况进行调整,流处理装置还可监测每一个流处理组件,获取每一个流处理组件的监测到的算子计算量,并基于该监测到的算子计算量确定流处理组件的算子计算量,则预先设置的算子计算量预测函数可以为:
Vi=a*Pi+b*Mi+c*Ki
其中,c为预先设置的调整参数,Ki表示第i个流处理组件的监测到的算子计算量。
2)根据每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数得到每一个流处理组件的流传输计算量;
其中,流传输计算量预测函数可参考如下:
Ei=d*Fi
其中,Ei表示第i个流处理组件的流传输计算量,d为预先设置的调整参数,Fi表示第i个流处理组件的流传输估计计算量。
优选的,为了在流处理装置执行接收到的流处理任务的过程中能够对流处理组件的分配情况进行调整,流处理装置还可监测每一个流处理组件,获取每一个流处理组件的监测到的流传输计算量,并基于该监测到的流传输计算量确定流处理组件的流传输计算量,则流传输计算量预测函数可参考如下:
Ei=d*Fi+e*Gi
其中,e为预先设置的调整参数,Gi表示监测到的第i个流处理组件的流传输计算量。
3)将每一个流处理组件的算子计算量与每一个流处理组件的流传输计算量的和作为每一个流处理组件所需要的计算资源。
其中,流处理装置可将Ei与Vi的和作为第i个流处理组件所需要的计算资源。
403、若第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,复制与第一流处理组件具有相同计算逻辑的第二流处理组件,第二流处理组件的个数为一个或多个,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务;
在本发明实施例中,流处理装置在得到第一流处理任务中每一个流处理组件所需要的计算资源之后,将判断第一流处理任务中是否包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,且若第一流处理任务不满足预先设置的约束条件,则复制与第一流处理组件具有相同计算逻辑的第二流处理组件,且第二流处理组件的个数为一个或多个,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务。在该第二流处理任务中,第二流处理组件与第一流处理组件具有相同的数据输入及输出关系,且,若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第二流处理任务中的第一流处理组件和第二流处理组件所需要的计算资源是根据资源分配策略将第一流处理任务中第一流处理组件所需要的计算资源划分得到的。
需要说明的是,本发明实施例中的第一流处理组件可以是一个流处理组件,也可以是多个流处理组件,且若为多个流处理组件,流处理装置将分别为该多个流处理组件中的每一个流处理组件复制与其具有相同计算逻辑的流处理组件。
需要说明的是,在本发明实施例中,复制的与第一流处理组件具有相同计算逻辑的第二流处理组件的个数可以预先设置或者根据需要进行设置,此处不做限定。
404、将第二流处理任务中的流处理组件分配给流处理系统中的满足流处理组件所需要的计算资源的计算节点。
在本发明实施例中,流处理装置将第二流处理任务中的流处理组件分配给流处理系统中的满足该流处理组件所需要的计算资源的计算节点。具体的包括:按照第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序将流处理组件排序,将流处理组件按照排序分配给流处理系统中的满足流处理组件所需要的计算资源的计算节点,其中,流处理组件分配的计算节点为该流处理组件在各个计算节点上的计算资源比例最小的计算节点,该计算资源比例为流处理组件所需要的计算资源与该计算资源已使用的计算资源的和占该计算节点总的计算资源的比例。
其中,步骤404还按照以下流程执行:
1)流处理装置按照第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序排序,得到排序后的流处理组件集合S;
i初始值1,i小于或等于N,N为流处理组件集合S中包含的流处理组件的个数,H为计算节点的计算资源的集合,执行以下步骤:
2)流处理装置计算流处理组件Si所需要的计算资源在集合Hi中的各个计算节点上占用的计算资源比例;
其中,Si所需要的计算资源在第K个计算节点上占用的计算资源比例的计算公式为:
Tik=(B’k+SCost(Si))/Bk
其中,Tik表示流处理组件Si在集合Hi的第k个计算节点上的计算资源比例,B’k表示集合Hi的第k个计算节点已使用的计算资源,Bk表示集合Hi的第k个计算节点的总的计算资源,SCost(Si)表示流处理组件Si所需要的计算资源。
3)将流处理组件Si分配给集合Hi中计算资源比例最小的计算节点;
其中,流处理装置得到流处理组件Si在集合Hi中的各个计算节点上占用的计算资源比例之后,将流处理组件Si分配给集合Hi中计算资源比例最小的计算节点。
4)更新集合Hi,得到更新后的集合Hi+1中,且已分配给流处理组件Si的计算算节点上,分配给流处理组件Si的计算资源为已使用的计算资源,若i小于N,令i=i+1,返回执行计算流处理组件Si所需要的计算资源在集合Hi中的各个计算节点上的占用的计算资源比例的步骤;
5)若i=N,则停止流处理组件的分配。
在本发明实施例中,流处理装置可按照上述步骤1)至5)将第二流处理任务中的每一个流处理组件分配给计算节点,且计算节点均能够满足分配得到的流处理组件所需要的计算资源。且在流处理装置的计算节点中可以存在分配两个或多个流处理组件的计算节点。
需要说明的是,在本发明实施例中,上述步骤1)至5)仅为流处理组件可行的一种分配方式,在实际应用中,还可按照第二流处理任务的类型对第二流处理任务中的流处理组件进行分配,因此,上述的将第二流处理任务中的流处理组件分配给流处理装置中的满足该流处理组件所需要的计算资源的计算节点,具体为:根据预先设置的分类模型确定第二流处理任务的类型,并查找预先设置的任务类型与分配方式的对应关系表,确定与第二流处理任务的类型对应的分配方式,按照该对应的分配方式,为第二流处理任务中的流处理组件分配满足该流处理组件所需要的计算资源的计算节点。其中,预先设置的分类模型是基于多个流处理任务的特征分类算法得到的模型,其中,分类算法可包括决策树、叶贝斯分类器、支持向量机等,且流处理装置在使用该分类模型确定流处理任务的过程中,还可通过学习的方式改善该分类模型。
其中,可根据具体的需要设置与任务类型对应的分配方式,上述步骤1)至5)中为其中可行的一种分配方式,此处不做限定。
需要说明的是,在本发明实施例中,若在将第二流处理任务中的流处理组件分配给计算节点的过程中,第二流处理任务中,仍然存在不满足于预先设置的约束条件的流处理组件,则可继续复制与该不满足流处理组件具有相同计算逻辑的流处理组件。
在本发明实施例中,流处理装置在接收第一流处理任务之后,利用该第一流处理任务中包含的每一个流处理组件的算子估计计算量及流传输估计计算量,计算每一个流处理组件所需要的计算资源,且若第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制至少一个与该第一流处理组件具有相同计算逻辑的第二流处理组件,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务,由于在第二流处理任务中,第二流处理组件的数据输入及输出关系与第一流处理组件相同,且若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件按照第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源将根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件,使得本来发送给第一流处理组件的数据可以在第一流处理组件和第二流处理组件之间分配,降低了第一流处理组件所需要的计算资源,能够有效的降低因计算节点分配的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
请参阅图5,为本发明实施例中流处理系统的结构的示意图,包括:
接收单元501,用于接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;
计算单元502,用于在所述接收单元501接收所述第一流处理任务之后,计算所述第一流处理任务包含的所述一个或多个流处理组件中的每一个流处理组件所需要的计算资源;
复制更新单元503,用于在所述计算单元502得到所述每一个流处理组件所需要的计算资源之后,若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与所述第一流处理组件具有相同计算逻辑的第二流处理组件,第二流处理组件的个数为一个或多个,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且,所述第一流处理任务中的所述第一流处理组件所需要的计算资源根据资源分配策略划分给所述第二流处理任务中的所述第一流处理组件和所述第二流处理组件;
分配单元504,用于在所述复制更新单元503得到所述第二流处理任务之后,将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点。
在本发明实施例中,接收单元501接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;接着,计算单元502计算所述第一流处理任务中的每一个流处理组件所需要的计算资源;且若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制更新单元503复制至少一个与所述第一流处理组件具有相同计算逻辑的第二流处理组件,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件;最后分配单元504将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点。
在本发明实施例中,流处理装置接收第一流处理任务,计算该第一流处理任务中的每一个流处理组件所需要的计算资源,若该第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制至少一个与该第一流处理组件有相同计算逻辑的第二流处理组件,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务,并将该第二流处理任务中的流处理组件分配给流处理装置中的满足该流处理组件所需要的计算资源的计算节点,通过复制至少一个与不满足约束条件的第一流处理组件具有相同计算逻辑的第二流处理组件,且由于第二流处理组件与第一流处理组件具有相同的数据输入及输出关系,且若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件,使得可将不满足约束条件的第一流处理组件所需要的计算资源在第一流处理组件及第二流处理组件间分配,降低第一流处理组件所需要的计算资源,能够有效的降低因计算节点分配的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
请参阅图6,为本发明实施例中流处理系统的结构的实施例,包括如图5所示实施例中描述的接收单元501,计算单元502,复制更新单元503及分配单元504,且与图5所示实施例中描述的方案相似,此处不再赘述。
在本发明实施例中,所述第一流处理任务中还包括流处理组件的算子估计计算量及流处理组件的流传输估计计算量;
则所述计算单元502具体用于根据所述第一流处理任务中的每一个流处理组件对应的所述流处理组件的算子估计计算量及所述流处理组件的流传输估计计算量,计算所述每一个流处理组件所需要的计算资源。
其中,所述计算单元502包括:
第一计算单元601,用于在所述接收单元501接收所述第一流处理任务之后,根据所述每一个流处理组件算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;
第二计算单元602,用于在所述第一计算单元601计算所述每一个流处理组件的算子计算量之后,根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;
第三计算单元603,用于在所述第二计算单元602计算所述每一个流处理组件的流传输计算量之后,将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
在本发明实施例中,所述分配单元504包括:
排序单元604,用于在所述复制更新单元503得到所述第二流处理任务之后,按照所述第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序排序;
组件分配单元605,用于在所述排序单元604进行排序后,将所述流处理组件按照所述排序分配给所述流处理系统中满足所述流处理组件所需要的计算资源的计算节点,所述计算节点为所述流处理组件在各个计算节点上的计算资源比例最小的计算节点,所述计算资源比例为所述流处理组件所需要的计算资源与所述计算资源节点已使用的计算资源的和占所述计算节点总的计算资源的比例。
或者,在本发明实施例中,所述分配单元包括:
确定单元606,用于在所述复制更新单元503得到所述第二流处理任务之后,利用预先设置的分类模型确定所述第二流处理任务的类型;
查找单元607,用于在所述确定单元606确定所述第二流处理任务的类型之后,查找预先设置的任务类型与分配方式的对应关系表,确定与所述第二流处理任务的类型对应的分配方式;
节点分配单元608,用于在所述查找单元607确定所述对应的分配方式之后,按照所述对应的分配方式,为所述第二流处理任务中的流处理组件分配满足所述流处理组件所需要的计算资源的计算节点。
在本发明实施例中,接收单元501接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系;接着,计算单元502计算所述第一流处理任务中的每一个流处理组件所需要的计算资源,具体的,计算单元502中的第一计算单元601根据所述每一个流处理组件算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;接着第二计算单元602根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;并由第三计算单元603根据所述每一个流处理组件的算子计算量及所述每一个流处理组件的流传输计算量得到所述每一个流处理组件所需要的计算资源。
且若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制更新单元503复制至少一个与所述第一流处理组件具有相同计算逻辑的第二流处理组件,将所述第二流处理组件添加到第一流处理任务中,得到第二流处理任务在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件;最后分配单元504将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点。
其中分配单元504可按照如下方式分配流处理组件:
排序单元604按照所述第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序排序,并由组件分配单元605将所述流处理组件按照所述排序分配给所述流处理系统中满足所述流处理组件所需要的计算资源的计算节点,所述计算节点为所述流处理组件在各个计算节点上的计算资源比例最小的计算节点,所述计算资源比例为所述流处理组件所需要的计算资源与所述计算资源节点已使用的计算资源的和占所述计算节点总的计算资源的比例。
或者,分配单元504可以按照如下方式分配流处理组件:
确定单元606利用预先设置的分类模型确定所述第二流处理任务的类型;接着,查找单元607查找预先设置的任务类型与分配方式的对应关系表,确定与所述第二流处理任务的类型对应的分配方式;并由节点分配单元608按照所述对应的分配方式,为所述第二流处理任务中的流处理组件分配满足所述流处理组件所需要的计算资源的计算节点。
在本发明实施例中,流处理装置在接收第一流处理任务之后,利用该第一流处理任务中包含的每一个流处理组件的算子估计计算量及流传输估计计算量,计算每一个流处理组件所需要的计算资源,且若第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制至少一个与该第一流处理组件具有相同计算逻辑的第二流处理组件,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务,由于在第二流处理任务中,第二流处理组件的数据输入及输出关系与第一流处理组件相同,且若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件,使得本来发送给第一流处理组件的数据可以在第一流处理组件和第二流处理组件之间分配,降低了第一流处理组件所需要的计算资源,能够有效的降低因计算节点分配的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
请参阅图7、为本发明实施例中流处理系统的结构图,包括:
流处理装置701和多个计算节点702。本发明实施例提供的计算节点可以是云计算中心的云服务器,或者普通数据处理中心的数据处理服务器,或者大数据处理中心的数据处理服务器等,本发明实施例对此不做限定。
所述流处理装置701用于:接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;计算所述第一流处理任务中的每一个流处理组件所需要的计算资源;若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制至少一个与所述第一流处理组件具有相同计算逻辑的第二流处理组件,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件;将所述第二流处理任务中的流处理组件分配给满足所述流处理组件所需要的计算资源的计算节点;
所述计算节点702用于:接受所述流处理装置701分配的流处理组件,按照所述流处理组件的计算逻辑对发送给所述流处理组件的数据进行处理。
在本发明实施例中,第一流处理任务中还包括流处理组件的算子估计计算量及流传输估计计算量。则流处理组件具体用于:根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
在本发明实施中,所述第一数据分配策略可以为平均分配策略,所述第二数据分配策略可以为平均分配策略,且所述资源分配策略可以为平均分配策略。在其他实施例中可采取其它分配策略,这里不再赘述。
在本发明实施例中,约束条件为流处理组件所需要的计算资源小于或等于预先设置的数值,或者,流处理组件所需要的计算资源小于各个计算节点能提供的最大的空闲计算资源,或者,流处理组件所需要的计算资源小于各个计算节点的空闲计算资源的平均值。
需要说明的是,流处理装置701的其它功能、功能的具体实现、或模块划分等可参考前述实施例所述。
可见,在本发明实施例中,流处理装置701接收第一流处理任务,计算该第一流处理任务中的每一个流处理组件所需要的计算资源,若该第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制至少一个与该第一流处理组件有相同计算逻辑的第二流处理组件,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务,并将该第二流处理任务中的流处理组件分配给流处理装置中的满足该流处理组件所需要的计算资源的计算节点,通过复制至少一个与不满足约束条件的第一流处理组件具有相同计算逻辑的第二流处理组件,且由于第二流处理组件与第一流处理组件具有相同的数据输入及输出关系,且若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件,使得第一流处理组件所需要处理的数据减少且所需要的计算资源也减少,能够有效的降低因计算节点分配的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
请参阅图8,为本发明实施例中流处理器的结构的实施例,包括:
处理器801、接收装置802、发送装置803、存储器804;
其中,接收装置802用于接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;
存储器804用于存储计算机程序,处理器801用于读取存储器中存储的计算机程序并执行如下处理:计算所述第一流处理任务中的每一个流处理组件所需要的计算资源;若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制至少一个与所述第一流处理组件具有相同计算逻辑的第二流处理组件,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件;将所述第二流处理任务中的流处理组件分配给满足所述流处理组件所需要的计算资源的计算节点。
具体的,作为一个实施例,处理器801具体用于根据预先设置的预测函数计算所述第一流处理任务中的每一个流处理组件所需要的计算资源。其中预测函数可以根据实际情况设定,本发明实施例对此不做限定。
可选的,作为一个实施例,处理器801还用于:若所述第一流处理任务中还包括流处理组件的算子计算量及流传输估计计算量;则根据所述第一流处理任务中的每一个流处理组件的算子估计计算量及流传输估计计算量,计算所述每一个流处理组件所需要的计算资源。
进一步的,处理器801还用于根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;
根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;
将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
作为一个可选实施例,处理器801还用于按照所述第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序将所述流处理组件排序;
将所述流处理组件按照所述排序分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点,其中所述计算节点为所述流处理组件在各个计算节点上的计算资源比例最小的计算节点,所述计算资源比例为所述流处理组件所需要的计算资源与所述计算节点已使用的计算资源的和占所述计算节点总的计算资源的比例。
作为一个可选实施例,处理器801还用于根据预先设置的分类模型确定所述第二流处理任务的类型;查找预先设置的任务类型与分配方式的对应关系表,确定与所述第二流处理任务的类型对应的分配方式;按照所述分配方式,为所述第二流处理任务中的流处理组件分配满足所述流处理组件所需要的计算资源的计算节点。
进一步的,上述的约束条件为所述流处理组件所需要的计算资源小于或等于预先设置的数值,或者,所述流处理组件所需要的计算资源小于各个计算节点能提供的最大的空闲计算资源,或者,所述流处理组件所需要的计算资源小于各个计算节点的空闲计算资源的平均值。
在本发明实施中,所述第一数据分配策略可以为平均分配策略,所述第二数据分配策略可以为平均分配策略,且所述资源分配策略可以为平均分配策略。在其他实施例中可采取其它分配策略,这里不再赘述。
其中,发送装置803用于将所述第二流处理任务中的流处理组件发送给该流处理组件分配的计算节点;
其中,存储器804还可以用于存储第一流处理任务、第二流处理任务、计算节点的标识及计算节点已用的计算资源及总的计算资源。
可见,采用上述方案后,流处理器在接收第一流处理任务之后,利用该第一流处理任务中包含的每一个流处理组件的算子估计计算量及流传输估计计算量,计算每一个流处理组件所需要的计算资源,且若第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制至少一个与该第一流处理组件具有相同计算逻辑的第二流处理组件,并将第二流处理组件添加到第一流处理任务中,得到第二流处理任务,由于在第二流处理任务中,第二流处理组件的数据输入及输出关系与第一流处理组件相同,且若在第一流处理任务中存在向第一流处理组件发送数据的第三流处理组件,则第三流处理组件根据第一数据分配策略将第三流处理组件发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,若在第一流处理任务中存在向第一流处理组件发送数据的流数据源标识对应的流数据源,则流数据源根据第二数据分配策略将流数据源发送给第一流处理组件的数据发送给第一流处理组件和第二流处理组件,且第一流处理任务中的第一流处理组件所需要的计算资源根据资源分配策略划分给第二流处理任务中的第一流处理组件和第二流处理组件,使得本来发送给第一流处理组件的数据可以在第一流处理组件和第二流处理组件之间分配,降低了第一流处理组件所需要的计算资源,能够一定程度上降低因分配给计算节点的流处理组件所需要的计算资源超过该计算节点能提供的计算资源所导致出现系统不稳定及数据处理故障的概率,从而改善系统性能。
需要说明的是,上述的流处理器可以应用于个人电脑或者服务器中。
本说明书中的各个实施例均采用递进的方式描述,各个实施例之间相同相似的部分互相参见即可,每个实施例重点说明的都是与其他实施例的不同之处。尤其,对于装置实施例而言,由于其基本相似于方法实施例,所以描述得比较简单,相关之处参见方法实施例的部分说明即可。
需说明的是,以上所描述的装置实施例仅仅是示意性的,其中所述作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到多个网络单元上。可以根据实际的需要选择其中的部分或者全部模块来实现本实施例方案的目的。另外,本发明提供的装置实施例附图中,模块之间的连接关系表示它们之间具有通信连接,具体可以实现为一条或多条通信总线或信号线。本领域普通技术人员在不付出创造性劳动的情况下,即可以理解并实施。
通过以上的实施方式的描述,所属领域的技术人员可以清楚地了解到本发明可借助软件加必需的通用硬件的方式来实现,当然也可以通过专用硬件包括专用集成电路、专用CPU、专用存储器、专用元器件等来实现。一般情况下,凡由计算机程序完成的功能都可以很容易地用相应的硬件来实现,而且,用来实现同一功能的具体硬件结构也可以是多种多样的,例如模拟电路、数字电路或专用电路等。但是,对本发明而言更多情况下软件程序实现是更佳的实施方式。基于这样的理解,本发明的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在可读取的存储介质中,如计算机的软盘,U盘、移动硬盘、只读存储器(ROM,Read-Only Memory)、随机存取存储器(RAM,Random Access Memory)、磁碟或者光盘等,包括若干指令用以使得一台计算机设备(可以是个人计算机,服务器,或者网络设备等)执行本发明各个实施例所述的方法。
以上所述,仅为本发明的具体实施方式,但本发明的保护范围并不局限于此,任何熟悉本技术领域的技术人员在本发明揭露的技术范围内,可轻易想到变化或替换,都应涵盖在本发明的保护范围之内。因此,本发明的保护范围应以所述权利要求的保护范围为准。
Claims (18)
1.一种流处理方法,其特征在于,所述方法包括:
接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;
计算所述第一流处理任务包含的所述一个或多个流处理组件中的每一个流处理组件所需要的计算资源;
若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与所述第一流处理组件具有相同计算逻辑的第二流处理组件,所述第二流处理组件的个数为一个或多个,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且所述第一流处理任务中的所述第一流处理组件所需要的计算资源根据资源分配策略划分给所述第二流处理任务中的所述第一流处理组件和所述第二流处理组件;
将所述第二流处理任务中的流处理组件分配给流处理系统中的满足所述流处理组件所需要的计算资源的计算节点。
2.根据权利要求1所述的方法,其特征在于,所述第一流处理任务中还包括流处理组件的算子估计计算量及流传输估计计算量;
所述计算所述第一流处理任务中的每一个流处理组件所需要的计算资源包括:
根据所述第一流处理任务中的每一个流处理组件的算子估计计算量及流传输估计计算量,计算所述每一个流处理组件所需要的计算资源。
3.根据权利要求2所述的方法,其特征在于,所述根据所述第一流处理任务中的每一个流处理组件的算子估计计算量及流传输估计计算量,计算所述每一个流处理组件所需要的计算资源,包括:
根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;
根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;
将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
4.根据权利要求1至3任一项所述的方法,其特征在于,所述将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点包括:
按照所述第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序将所述流处理组件排序;
将所述流处理组件按照所述排序分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点,其中所述计算节点为所述流处理组件在各个计算节点上的计算资源比例最小的计算节点,所述计算资源比例为所述流处理组件所需要的计算资源与所述计算节点已使用的计算资源的和占所述计算节点总的计算资源的比例。
5.根据权利要求1至3任一项所述的方法,其特征在于,所述将所述第二流处理任务中的流处理组件分配给所述流处理系统中的满足所述流处理组件所需要的计算资源的计算节点包括:
根据预先设置的分类模型确定所述第二流处理任务的类型;
查找预先设置的任务类型与分配方式的对应关系表,确定与所述第二流处理任务的类型对应的分配方式;
按照所述分配方式,为所述第二流处理任务中的流处理组件分配满足所述流处理组件所需要的计算资源的计算节点。
6.根据权利要求1至3任一项所述的方法,其特征在于,所述约束条件为所述流处理组件所需要的计算资源小于或等于预先设置的数值,或者,所述流处理组件所需要的计算资源小于各个计算节点能提供的最大的空闲计算资源,或者,所述流处理组件所需要的计算资源小于各个计算节点的空闲计算资源的平均值。
7.根据权利要求1至3任一项所述的方法,其特征在于,所述第一数据分配策略为平均分配策略,所述第二数据分配策略为平均分配策略,所述资源分配策略为平均分配策略。
8.一种流处理装置,其特征在于,包括:
接收单元,用于接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;
计算单元,用于在所述接收单元接收所述第一流处理任务之后,计算所述第一流处理任务中包含的所述一个或多个流处理组件中的每一个流处理组件所需要的计算资源;
复制更新单元,用于在所述计算单元得到所述每一个流处理组件所需要的计算资源之后,若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与所述第一流处理组件具有相同计算逻辑的第二流处理组件,所述第二流处理组件的个数为一个或多个,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且所述第一流处理任务中的所述第一流处理组件所需要的计算资源根据资源分配策略划分给所述第二流处理任务中的所述第一流处理组件和所述第二流处理组件;
分配单元,用于在所述复制更新单元得到所述第二流处理任务之后,将所述第二流处理任务中的流处理组件分配给流处理系统中的满足所述流处理组件所需要的计算资源的计算节点。
9.根据权利要求8所述的装置,其特征在于,所述第一流处理任务中还包括流处理组件的算子估计计算量及流处理组件的流传输估计计算量;
则所述计算单元具体用于根据所述第一流处理任务中的每一个流处理组件对应的所述流处理组件的算子估计计算量及所述流处理组件的流传输估计计算量,计算所述每一个流处理组件所需要的计算资源。
10.根据权利要求9所述的装置,其特征在于,所述计算单元包括:
第一计算单元,用于在所述接收单元接收所述第一流处理任务之后,根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;
第二计算单元,用于在所述第一计算单元计算所述每一个流处理组件的算子计算量之后,根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;
第三计算单元,用于在所述第二计算单元计算所述每一个流处理组件的流传输计算量之后,将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
11.根据权利要求8至10任一项所述的装置,其特征在于,所述分配单元包括:
排序单元,用于在所述复制更新单元得到所述第二流处理任务之后,按照所述第二流处理任务中的流处理组件所需要的计算资源从大到小的顺序将所述流处理组件排序;
组件分配单元,用于在所述排序单元进行排序后,将所述流处理组件按照所述排序分配给所述流处理系统中满足所述流处理组件所需要的计算资源的计算节点,所述计算节点为所述流处理组件在各个计算节点上的计算资源比例最小的计算节点,所述计算资源比例为所述流处理组件所需要的计算资源与所述计算资源节点已使用的计算资源的和占所述计算节点总的计算资源的比例。
12.根据权利要求8至10任一项所述的装置,其特征在于,所述分配单元包括:
确定单元,用于在所述复制更新单元得到所述第二流处理任务之后,根据预先设置的分类模型确定所述第二流处理任务的类型;
查找单元,用于在所述确定单元确定所述第二流处理任务的类型之后,查找预先设置的任务类型与分配方式的对应关系表,确定与所述第二流处理任务的类型对应的分配方式;
节点分配单元,用于在所述查找单元确定所述对应的分配方式之后,按照所述对应的分配方式,为所述第二流处理任务中的流处理组件分配满足所述流处理组件所需要的计算资源的计算节点。
13.根据权利要求8至10任一项所述的装置,其特征在于,所述约束条件为流处理组件所需要的计算资源小于或等于预先设置的数值,或者,流处理组件所需要的计算资源小于所有计算节点中剩余计算资源最大的计算节点所能提供的计算资源,或者流处理组件所需要的计算资源小于计算节点的剩余计算资源的平均值。
14.根据权利要求8至10任一项所述的装置,其特征在于,所述第一数据分配策略为平均分配策略,所述第二数据分配策略为平均分配策略,且所述资源分配策略为平均分配策略。
15.一种流处理系统,其特征在于,包括:流处理装置和多个计算节点,其中:
所述流处理装置用于:接收第一流处理任务,所述第一流处理任务中包含一个或多个流处理组件、所述流处理组件的数据输入及输出关系、流数据源的标识;计算所述第一流处理任务包含的所述一个或多个流处理组件中的每一个流处理组件所需要的计算资源;若所述第一流处理任务中包含所需要的计算资源不满足预先设置的约束条件的第一流处理组件,则复制与所述第一流处理组件具有相同计算逻辑的第二流处理组件,所述第二流处理组件的个数为一个或多个,并将所述第二流处理组件添加到所述第一流处理任务中,得到第二流处理任务;在所述第二流处理任务中,所述第二流处理组件与所述第一流处理组件具有相同的数据输入及输出关系,且,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的第三流处理组件,则所述第三流处理组件根据第一数据分配策略将所述第三流处理组件发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,若在所述第一流处理任务中存在向所述第一流处理组件发送数据的所述流数据源标识对应的流数据源,则所述流数据源根据第二数据分配策略将所述流数据源发送给所述第一流处理组件的数据发送给所述第一流处理组件和所述第二流处理组件,且,所述第一流处理任务中的所述第一流处理组件所需要的计算资源根据资源分配策略划分给所述第二流处理任务中的所述第一流处理组件和所述第二流处理组件;
所述计算节点用于:接受所述流处理装置分配的流处理组件,按照所述流处理组件的计算逻辑对发送给所述流处理组件的数据进行处理。
16.根据权利要求15所述的系统,其特征在于,所述第一流处理任务中还包括流处理组件的算子估计计算量及流传输估计计算量;
则所述流处理组件具体用于:根据所述每一个流处理组件的算子估计计算量及所述流处理组件的源代码的估计计算量,按照预先设置的算子计算量预测函数计算所述每一个流处理组件的算子计算量;根据所述每一个流处理组件的流传输估计计算量,按照预先设置的流传输计算量预测函数计算所述每一个流处理组件的流传输计算量;将所述每一个流处理组件的算子计算量与所述每一个流处理组件的流传输计算量的和作为所述每一个流处理组件所需要的计算资源。
17.根据权利要求15或16所述的系统,其特征在于,所述约束条件为所述流处理组件所需要的计算资源小于或等于预先设置的数值,或者,所述流处理组件所需要的计算资源小于各个计算节点能提供的最大的空闲计算资源,或者,所述流处理组件所需要的计算资源小于各个计算节点的空闲计算资源的平均值。
18.根据权利要求15或16所述的系统,其特征在于,所述第一数据分配策略为平均分配策略,所述第二数据分配策略为平均分配策略,且所述资源分配策略为平均分配策略。
Priority Applications (4)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201410284343.5A CN105335376B (zh) | 2014-06-23 | 2014-06-23 | 一种流处理方法、装置及系统 |
PCT/CN2015/081533 WO2015196940A1 (zh) | 2014-06-23 | 2015-06-16 | 一种流处理方法、装置及系统 |
EP15173022.3A EP2966568A1 (en) | 2014-06-23 | 2015-06-19 | Stream processing method, apparatus, and system |
US14/744,815 US9692667B2 (en) | 2014-06-23 | 2015-06-19 | Stream processing method, apparatus, and system |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201410284343.5A CN105335376B (zh) | 2014-06-23 | 2014-06-23 | 一种流处理方法、装置及系统 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN105335376A CN105335376A (zh) | 2016-02-17 |
CN105335376B true CN105335376B (zh) | 2018-12-07 |
Family
ID=53502443
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN201410284343.5A Active CN105335376B (zh) | 2014-06-23 | 2014-06-23 | 一种流处理方法、装置及系统 |
Country Status (4)
Country | Link |
---|---|
US (1) | US9692667B2 (zh) |
EP (1) | EP2966568A1 (zh) |
CN (1) | CN105335376B (zh) |
WO (1) | WO2015196940A1 (zh) |
Families Citing this family (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
EP3458959B1 (en) * | 2016-05-17 | 2023-03-22 | AB Initio Technology LLC | Reconfigurable distributed processing |
US11397624B2 (en) * | 2019-01-22 | 2022-07-26 | Arm Limited | Execution of cross-lane operations in data processing systems |
CN113366464A (zh) * | 2019-02-15 | 2021-09-07 | 华为技术有限公司 | 用于在数据库中嵌入流处理执行的系统 |
CN110347489B (zh) * | 2019-07-12 | 2021-08-03 | 之江实验室 | 一种基于Spark的多中心数据协同计算的流处理方法 |
Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN102904919A (zh) * | 2011-07-29 | 2013-01-30 | 国际商业机器公司 | 流处理方法和实现流处理的分布式系统 |
WO2013145310A1 (ja) * | 2012-03-30 | 2013-10-03 | 富士通株式会社 | データストリームの並列処理プログラム、方法、及びシステム |
CN103595651A (zh) * | 2013-10-15 | 2014-02-19 | 北京航空航天大学 | 基于分布式的数据流处理方法和系统 |
CN103782270A (zh) * | 2013-10-28 | 2014-05-07 | 华为技术有限公司 | 流处理系统的管理方法和相关设备及系统 |
Family Cites Families (11)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US5925102A (en) * | 1997-03-28 | 1999-07-20 | International Business Machines Corporation | Managing processor resources in a multisystem environment in order to provide smooth real-time data streams, while enabling other types of applications to be processed concurrently |
US7990968B2 (en) * | 2005-05-31 | 2011-08-02 | Broadcom Corporation | Method and apparatus for demultiplexing, merging, and duplicating packetized elementary stream/program stream/elementary stream data |
KR100740210B1 (ko) * | 2005-10-21 | 2007-07-18 | 삼성전자주식회사 | 듀얼 전송 스트림 생성 장치 및 그 방법 |
KR100813000B1 (ko) * | 2005-12-01 | 2008-03-13 | 한국전자통신연구원 | 데이터 중복 처리 방지 기능을 가지는 스트림 데이터 처리시스템 및 그 방법 |
US8131840B1 (en) * | 2006-09-12 | 2012-03-06 | Packet Plus, Inc. | Systems and methods for data stream analysis using embedded design logic |
US7826365B2 (en) * | 2006-09-12 | 2010-11-02 | International Business Machines Corporation | Method and apparatus for resource allocation for stream data processing |
US7889651B2 (en) * | 2007-06-06 | 2011-02-15 | International Business Machines Corporation | Distributed joint admission control and dynamic resource allocation in stream processing networks |
US8291006B2 (en) * | 2008-05-30 | 2012-10-16 | International Business Machines Corporation | Method for generating a distributed stream processing application |
KR20120072252A (ko) * | 2010-12-23 | 2012-07-03 | 한국전자통신연구원 | 분산 데이터 스트림 처리 시스템에서 연속 처리 태스크를 병렬 처리하기 위한 장치 및 그 방법 |
JP5836229B2 (ja) * | 2012-09-04 | 2015-12-24 | 株式会社日立製作所 | ストリーム処理装置、サーバ、及び、ストリーム処理方法 |
JP6021120B2 (ja) * | 2014-09-29 | 2016-11-09 | インターナショナル・ビジネス・マシーンズ・コーポレーションInternational Business Machines Corporation | データをストリーム処理する方法、並びに、そのコンピュータ・システム及びコンピュータ・システム用プログラム |
-
2014
- 2014-06-23 CN CN201410284343.5A patent/CN105335376B/zh active Active
-
2015
- 2015-06-16 WO PCT/CN2015/081533 patent/WO2015196940A1/zh active Application Filing
- 2015-06-19 EP EP15173022.3A patent/EP2966568A1/en not_active Ceased
- 2015-06-19 US US14/744,815 patent/US9692667B2/en active Active
Patent Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN102904919A (zh) * | 2011-07-29 | 2013-01-30 | 国际商业机器公司 | 流处理方法和实现流处理的分布式系统 |
WO2013145310A1 (ja) * | 2012-03-30 | 2013-10-03 | 富士通株式会社 | データストリームの並列処理プログラム、方法、及びシステム |
CN103595651A (zh) * | 2013-10-15 | 2014-02-19 | 北京航空航天大学 | 基于分布式的数据流处理方法和系统 |
CN103782270A (zh) * | 2013-10-28 | 2014-05-07 | 华为技术有限公司 | 流处理系统的管理方法和相关设备及系统 |
Also Published As
Publication number | Publication date |
---|---|
EP2966568A1 (en) | 2016-01-13 |
WO2015196940A1 (zh) | 2015-12-30 |
US9692667B2 (en) | 2017-06-27 |
US20150372882A1 (en) | 2015-12-24 |
CN105335376A (zh) | 2016-02-17 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
Pei et al. | Optimal VNF placement via deep reinforcement learning in SDN/NFV-enabled networks | |
CN102546379B (zh) | 一种虚拟化资源调度的方法及虚拟化资源调度系统 | |
CN104038540B (zh) | 一种应用代理服务器自动选择方法及系统 | |
CN102724103B (zh) | 代理服务器、分层次网络系统及分布式工作负载管理方法 | |
CN105242956B (zh) | 虚拟功能服务链部署系统及其部署方法 | |
CN112153700A (zh) | 一种网络切片资源管理方法及设备 | |
CN103401947A (zh) | 多个服务器的任务分配方法和装置 | |
Moens et al. | Hierarchical network-aware placement of service oriented applications in clouds | |
CN108270805B (zh) | 用于数据处理的资源分配方法及装置 | |
CN110365748A (zh) | 业务数据的处理方法和装置、存储介质及电子装置 | |
CN105335376B (zh) | 一种流处理方法、装置及系统 | |
CN104981782A (zh) | 用于控制资源的系统、控制模式生成装置、控制装置、用于控制资源的方法和程序 | |
CN105791151B (zh) | 一种动态流量控制方法,及装置 | |
CN106095582B (zh) | 云平台的任务执行方法 | |
CN106537824B (zh) | 用于减少信息中心网络的响应时间的方法和装置 | |
CN107846371A (zh) | 一种多媒体业务QoE资源分配方法 | |
CN109189548A (zh) | 资源调度方法、装置、计算机设备及计算机可读存储介质 | |
Feng et al. | An aggressive migration strategy for service function chaining in the core cloud | |
CN110351376A (zh) | 一种基于负反馈机制的边缘计算节点选择方法 | |
CN104301241B (zh) | 一种soa动态负载分发方法与系统 | |
CN110113376A (zh) | 一种基于移动边缘计算的多径传输负载均衡优化算法 | |
Guillen et al. | The PerSyst monitoring tool: A transport system for performance data using quantiles | |
CN108737268A (zh) | 软件定义工业物联网资源调度方法 | |
CN106407007A (zh) | 面向弹性分析流程的云资源配置优化方法 | |
Fereydooni et al. | EDLT: An Extended DLT to Enhance Load Balancing in Cloud Computing |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
C06 | Publication | ||
PB01 | Publication | ||
C10 | Entry into substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |