-
Notifications
You must be signed in to change notification settings - Fork 241
实时同步希望增加过滤(例如:filter delete类型事件)某类事件功能 #415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
执行过程中一旦有delete类型的event被过滤了,可以通过flink metrics 指标 rest API实时查询,例如: get参数说明在 Flink REST API 中,通过
其中的 参数分解
为什么需要子任务索引?Flink 作业中的每个算子(Operator)可能有多个并行实例(Subtask),每个子任务独立运行并维护自己的本地指标。例如:
示例场景假设你的作业拓扑如下:
如何确定子任务索引?
其他常见指标路径结构
总结
|
为了在 ECharts 折线图中实时展示 Flink Count Metrics 的每秒增量(而非累计值),需将累计的 1. 数据结构设计每次从 Flink REST API 获取的数据需包含 时间戳 和 当前累计 Count 值: interface MetricData {
timestamp: number; // 毫秒时间戳(如 Date.now())
count: number; // 当前累计值
} 2. 增量计算逻辑维护一个 历史数据队列,保存最近两次的数据点,计算每秒增量: export class MetricsProcessor {
private previousData: MetricData | null = null;
// 处理新数据点,返回每秒增量(若有效)
process(newData: MetricData): number | null {
if (!this.previousData) {
// 第一次收到数据,无法计算增量
this.previousData = newData;
return null;
}
// 计算时间差(秒)
const timeDiffSeconds = (newData.timestamp - this.previousData.timestamp) / 1000;
if (timeDiffSeconds <= 0) {
// 时间未推进(如重复数据),跳过
return null;
}
// 计算增量并转换为每秒速率
const countDiff = newData.count - this.previousData.count;
const ratePerSecond = countDiff / timeDiffSeconds;
// 更新历史数据
this.previousData = newData;
return ratePerSecond;
}
} 3. 数据获取与处理流程结合定时器或 WebSocket 获取数据,并调用处理器: import { MetricsProcessor } from './metrics-processor';
import { HttpClient } from '@angular/common/http';
export class MetricsService {
private processor = new MetricsProcessor();
private dataPoints: { timestamp: number; rate: number }[] = [];
constructor(private http: HttpClient) {}
// 启动定时轮询(示例:每秒一次)
startPolling(jobId: string, vertexId: string) {
setInterval(async () => {
const metricData = await this.fetchMetrics(jobId, vertexId);
const rate = this.processor.process(metricData);
if (rate !== null) {
this.dataPoints.push({
timestamp: metricData.timestamp,
rate: rate
4. ECharts 配置与更新在 Angular 组件中将处理后的数据绑定到折线图: // 在组件中
export class MetricsChartComponent implements OnInit {
chartOptions: EChartsOption = {
xAxis: { type: 'time' },
yAxis: { name: '事件数/秒' },
series: [{
type: 'line',
data: []
}]
};
constructor(private metricsService: MetricsService) {}
ngOnInit() {
this.metricsService.dataUpdated$.subscribe(dataPoints => {
this.updateChart(dataPoints);
});
}
private updateChart(dataPoints: { timestamp: number; rate: number }[]) {
this.chartOptions = {
series: [{
data: dataPoints.map(p => [p.timestamp, p.rate])
}]
};
}
} 5. 处理边界条件首次数据加载
时间间隔异常
数据波动平滑
6. 最终效果
完整代码示例metrics-processor.tsexport interface MetricData {
timestamp: number;
count: number;
}
export class MetricsProcessor {
private previousData: MetricData | null = null;
process(newData: MetricData): number | null {
if (!this.previousData) {
this.previousData = newData;
return null;
}
const timeDiff = newData.timestamp - this.previousData.timestamp;
if (timeDiff <= 0) return null;
const countDiff = newData.count - this.previousData.count;
const rate = countDiff / (timeDiff / 1000);
this.previousData = newData;
return rate;
}
} metrics-chart.component.tsimport { EChartsOption } from 'echarts';
import { MetricsService } from './metrics.service';
@Component({
selector: 'app-metrics-chart',
template: '<div echarts [options]="chartOptions" class="chart"></div>'
})
export class MetricsChartComponent implements OnInit {
chartOptions: EChartsOption;
constructor(private metricsService: MetricsService) {}
ngOnInit() {
this.metricsService.dataPoints$.subscribe(points => {
this.chartOptions = {
series: [{
data: points.map(p => [p.timestamp, p.rate]),
type: 'line'
}]
};
});
}
} 注意事项
通过以上步骤,即可将 Flink 的累计 Count 指标转换为实时增量速率,并在 ECharts 折线图中动态展示每秒处理能力。 |
实时同步希望增加过滤某事件功能,如过滤delete事件,或根据字段过滤某事件 如n_enable_state=0的delete 事件过滤掉
The text was updated successfully, but these errors were encountered: