Closed
Description
现象
启动之后报错
2025-03-17 09:15:40,042 WARN org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader [] - fetch data failed.
io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275) ~[flink-connector-oracle-cdc-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118) ~[flink-connector-oracle-cdc-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) [flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) [flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) [flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) [flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
2025-03-17 09:15:40,046 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) [flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
... 6 more
Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275) ~[flink-connector-oracle-cdc-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118) ~[flink-connector-oracle-cdc-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-cdc-base-3.1.0.jar:3.1.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-tis-1.18.1.jar:tis-1.18.1]
... 6 more
原因分析
由于 Oracle选择的表为带Schema,如:Schema.table1 ,因此在SourceChannel.java 的取schema实体中会重复取schema(最终,Schema.Schema.table1
),导致CDC无法正常监听到。
public static List<ReaderSource> getSourceFunction(
DataSourceFactory dsFactory, List<ISelectedTab> tabs, ReaderSourceCreator sourceFunctionCreator) {
final Optional<DataSourceFactory.ISchemaSupported> schemaSupport = DataSourceFactory.ISchemaSupported.schemaSupported(dsFactory);
return getSourceFunction(dsFactory, (tab) -> {
TableInDB tabsInDB = dsFactory.getTablesInDB();
DataXJobInfo jobInfo = tabsInDB.createDataXJobInfo(DataXJobSubmit.TableDataXEntity.createTableEntity(null, tab.jdbcUrl, tab.getTabName()), true);
Optional<String[]> targetTableNames = jobInfo.getTargetTableNames();
List<String> physicsTabNames = targetTableNames
.map((tabNames) -> Lists.newArrayList(tabNames))
.orElseGet(() -> Lists.newArrayList(tab.getTabName()));
return physicsTabNames.stream().map((t) -> {
// 需要加以下这一段
EntityName entity = EntityName.parse(t, true);
if (!entity.useDftDbName()) {
return entity.getFullName();
}
// 以上这一段
return schemaSupport.map((schema) -> schema.getDBSchema()).orElse(tab.dbNanme) + "." + t;
});
}, tabs, sourceFunctionCreator);
}