8000 实时同步希望增加过滤(例如:filter delete类型事件)某类事件功能 · Issue #415 · datavane/tis · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

实时同步希望增加过滤(例如:filter delete类型事件)某类事件功能 #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
fanggongbing opened this issue Mar 4, 2025 · 3 comments
Labels
enhancement New feature or request
Milestone

Comments

@fanggongbing
Copy link

实时同步希望增加过滤某事件功能,如过滤delete事件,或根据字段过滤某事件 如n_enable_state=0的delete 事件过滤掉

@baisui1981 baisui1981 added the enhancement New feature or request label Mar 4, 2025
@baisui1981 baisui1981 added this to the v4.2.0 milestone Mar 4, 2025
@baisui1981 baisui1981 modified the milestones: v4.2.0, v4.3.0 Mar 23, 2025
@baisui1981
Copy link
Member

cdc source组件设置页面添加 事件过滤选项

Image

@baisui1981 baisui1981 modified the milestones: v4.3.0, v4.2.0 Mar 29, 2025
@baisui1981 baisui1981 changed the title 实时同步希望增加过滤某类事件功能 实时同步希望增加过滤(例如:filter delete类型事件)某类事件功能 Mar 29, 2025
@baisui1981
Copy link
Member

执行过程中一旦有delete类型的event被过滤了,可以通过flink metrics 指标 rest API实时查询,例如:
http://192.168.28.201:8081/jobs/ab5c52bbce0ffedd94ba1f353441360a/vertices/cbc357ccb763df2852fee8c4fc7d55f2/metrics?get=0.skipUpdateBeforeEventOrSpecEvent.skipUpdateBeforeEventOrSpecEventCount

get参数说明

在 Flink REST API 中,通过 get 参数查询指标时,点号 . 分隔的路径层级反映了 Flink 指标系统的层级化命名规则。具体到你的 URL 参数:

0.skipUpdateBeforeEventOrSpecEvent.skipUpdateBeforeEventOrSpecEventCount

其中的 0 代表 子任务索引(Subtask Index),含义如下:


参数分解

分段 含义 解释
0 子任务索引 表示该指标属于算子的第 0 号并行子任务(即 subtask[0])。
skipUpdateBeforeEventOrSpecEvent 算子名称或用户自定义指标组 通常是算子(Operator)的名称,或用户通过 MetricGroup 自定义的指标组名。
skipUpdateBeforeEventOrSpecEventCount 具体指标名称 该指标的名称,例如计数器(Counter)、仪表(Gauge)等。

为什么需要子任务索引?

Flink 作业中的每个算子(Operator)可能有多个并行实例(Subtask),每个子任务独立运行并维护自己的本地指标。例如:

  • 若算子的并行度(Parallelism)为 3,则子任务索引为 012
  • 通过子任务索引,可以精确查询某个特定子任务的指标值(如 subtask[0] 的处理吞吐量)。

示例场景

假设你的作业拓扑如下:

Source -> MapOperator(parallelism=2) -> Sink
  • MapOperator 的两个子任务(索引 01)各自维护独立的指标。
  • 要查询子任务 0numRecordsIn 指标,URL 参数应为:
    0.MapOperator.numRecordsIn
    
  • 要查询子任务 1 的同一指标,则参数为:
    1.MapOperator.numRecordsIn
    

如何确定子任务索引?

  1. 通过 Flink Web UI
    访问作业的 Web UI(http://<jobmanager>:8081),进入作业的 Task Managers 页面,查看每个算子的子任务分布和索引。

  2. 通过 REST API 列出所有指标
    发送请求到 /jobs/<job-id>/vertices/<vertex-id>/metrics(不带 get 参数),返回所有可用指标及其完整路径,例如:

    {
      "0.skipUpdateBeforeEventOrSpecEvent.skipUpdateBeforeEventOrSpecEventCount": 42,
      "1.skipUpdateBeforeEventOrSpecEvent.skipUpdateBeforeEventOrSpecEventCount": 37
    }

    此处 01 表示两个子任务的同一指标值。


其他常见指标路径结构

场景 指标路径示例 说明
算子级别全局聚合 skipUpdateBeforeEventOrSpecEvent.skipUpdateBeforeEventOrSpecEventCount 省略子任务索引时,返回所有子任务该指标的总和(需算子支持聚合)。
TaskManager 级别指标 taskmanager.<hostname>.<metric-name> 例如 CPU 使用率、内存占用。
JobManager 级别指标 jobmanager.<metric-name> 例如 JVM 堆内存状态。

总结

  • 0 是子任务索引:用于定位特定并行实例的指标。
  • 点号分隔层级:反映 Flink 指标从子任务 → 算子/组 → 具体指标的层级关系。
  • 灵活查询:通过调整路径,可以查询单个子任务、全局聚合或系统级指标。

@baisui1981
Copy link
Member

为了在 ECharts 折线图中实时展示 Flink Count Metrics 的每秒增量(而非累计值),需将累计的 count 转换为每秒增量速率。以下是分步实现方案:


1. 数据结构设计

每次从 Flink REST API 获取的数据需包含 时间戳当前累计 Count 值

interface MetricData {
  timestamp: number; // 毫秒时间戳(如 Date.now())
  count: number;     // 当前累计值
}

2. 增量计算逻辑

维护一个 历史数据队列,保存最近两次的数据点,计算每秒增量:

export class MetricsProcessor {
  private previousData: MetricData | null = null;

  // 处理新数据点,返回每秒增量(若有效)
  process(newData: MetricData): number | null {
    if (!this.previousData) {
      // 第一次收到数据,无法计算增量
      this.previousData = newData;
      return null;
    }

    // 计算时间差(秒)
    const timeDiffSeconds = (newData.timestamp - this.previousData.timestamp) / 1000;
    if (timeDiffSeconds <= 0) {
      // 时间未推进(如重复数据),跳过
      return null;
    }

    // 计算增量并转换为每秒速率
    const countDiff = newData.count - this.previousData.count;
    const ratePerSecond = countDiff / timeDiffSeconds;

    // 更新历史数据
    this.previousData = newData;

    return ratePerSecond;
  }
}

3. 数据获取与处理流程

结合定时器或 WebSocket 获取数据,并调用处理器:

import { MetricsProcessor } from './metrics-processor';
import { HttpClient } from '@angular/common/http';

export class MetricsService {
  private processor = new MetricsProcessor();
  private dataPoints: { timestamp: number; rate: number }[] = [];

  constructor(private http: HttpClient) {}

  // 启动定时轮询(示例:每秒一次)
  startPolling(jobId: string, vertexId: string) {
    setInterval(async () => {
      const metricData = await this.fetchMetrics(jobId, vertexId);
      const rate = this.processor.process(metricData);
      if (rate !== null) {
        this.dataPoints.push({
          timestamp: metricData.timestamp,
          rate: rate
        });
        // 保持最多 60 个点(1分钟数据)
        if (this.dataPoints.length > 60) {
          this.dataPoints.shift();
        }
        // 触发图表更新(通过 Subject 或直接调用)
      }
    }, 1000); // 注意:实际间隔可能因网络延迟略有波动
  }

  private async fetchMetrics(jobId: string, vertexId: string): Promise<MetricData> {
    const url = `http://flink-jobmanager/jobs/${jobId}/vertices/${vertexId}/metrics?get=count`;
    const response = await this.http.get<any>(url).toPromise();
    return {
      timestamp: Date.now(), // 或用 API 返回的时间戳(如果提供)
      count: parseInt(response.value, 10)
    };
  }
}

4. ECharts 配置与更新

在 Angular 组件中将处理后的数据绑定到折线图:

// 在组件中
export class MetricsChartComponent implements OnInit {
  chartOptions: EChartsOption = {
    xAxis: { type: 'time' },
    yAxis: { name: '事件数/秒' },
    series: [{
      type: 'line',
      data: []
    }]
  };

  constructor(private metricsService: MetricsService) {}

  ngOnInit() {
    this.metricsService.dataUpdated$.subscribe(dataPoints => {
      this.updateChart(dataPoints);
    });
  }

  private updateChart(dataPoints: { timestamp: number; rate: number }[]) {
    this.chartOptions = {
      series: [{
        data: dataPoints.map(p => [p.timestamp, p.rate])
      }]
    };
  }
}

5. 处理边界条件

首次数据加载

  • 首次获取数据时无法计算增量,可忽略或显示为 0:
    process(newData: MetricData): number | null {
      if (!this.previousData) {
        this.previousData = newData;
        return 0; // 或 return null 并在图表中过滤
      }
      // ...原有逻辑
    }

时间间隔异常

  • 若两次数据时间差过大(如网络中断),重置计算:
    const MAX_ALLOWED_INTERVAL = 5; // 允许最大间隔(秒)
    if (timeDiffSeconds > MAX_ALLOWED_INTERVAL) {
      this.previousData = newData; // 重置基准点
      return null;
    }

数据波动平滑

  • 使用移动平均减少瞬时波动:
    // 在 MetricsService 中
    private movingAverage(windowSize = 5) {
      const rates = this.dataPoints.map(p => p.rate);
      const smoothedRates = rates.map((_, i) => {
        const start = Math.max(0, i - windowSize + 1);
        const subset = rates.slice(start, i + 1);
        return subset.reduce((a, b) => a + b, 0) / subset.length;
      });
      // 更新 dataPoints 中的 rate 值
    }

6. 最终效果

  • 折线图显示每秒处理的事件数,而非累计值。
  • 数据每秒钟动态更新,曲线反映实时吞吐量波动。

完整代码示例

metrics-processor.ts

export interface MetricData {
  timestamp: number;
  count: number;
}

export class MetricsProcessor {
  private previousData: MetricData | null = null;

  process(newData: MetricData): number | null {
    if (!this.previousData) {
      this.previousData = newData;
      return null;
    }

    const timeDiff = newData.timestamp - this.previousData.timestamp;
    if (timeDiff <= 0) return null;

    const countDiff = newData.count - this.previousData.count;
    const rate = countDiff / (timeDiff / 1000);

    this.previousData = newData;
    return rate;
  }
}

metrics-chart.component.ts

import { EChartsOption } from 'echarts';
import { MetricsService } from './metrics.service';

@Component({
  selector: 'app-metrics-chart',
  template: '<div echarts [options]="chartOptions" class="chart"></div>'
})
export class MetricsChartComponent implements OnInit {
  chartOptions: EChartsOption;

  constructor(private metricsService: MetricsService) {}

  ngOnInit() {
    this.metricsService.dataPoints$.subscribe(points => {
      this.chartOptions = {
        series: [{
          data: points.map(p => [p.timestamp, p.rate]),
          type: 'line'
        }]
      };
    });
  }
}

注意事项

  1. 时间戳同步:确保 Flink 返回的计数与采集时间戳严格对应,若 API 响应延迟较大,需考虑时间补偿。
  2. 错误处理:添加重试逻辑应对网络波动,避免因单次失败导致数据中断。
  3. 性能优化:对于高频更新(如毫秒级),使用 appendData 代替全量更新:
    // 在 ECharts 实例中
    this.echartsInstance.appendData({
      seriesIndex: 0,
      data: [[newTimestamp, newRate]]
    });
  4. Y轴动态范围:根据数据自动调整 Y 轴范围,避免固定值导致曲线扁平化:
    yAxis: {
      min: 'dataMin',
      max: 'dataMax'
    }

通过以上步骤,即可将 Flink 的累计 Count 指标转换为实时增量速率,并在 ECharts 折线图中动态展示每秒处理能力。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants
0