- PostgreSQL Logical Replication client for node.js(
>=16.9.0
) - Supported plugins
- pgoutput (Native to
PostgreSQL, Recommended)
- Use the pgoutput plugin to process huge transactions.
- wal2json
- decoderbufs
- test_decoding (Not recommended)
- pgoutput (Native to
PostgreSQL, Recommended)
- Document for old version(1.x)
PostgreSQL Versions | on Node.js 16, 18, 20, 22, 24 |
---|---|
PostgreSQL 14 | |
PostgreSQL 15 | |
PostgreSQL 16 | |
PostgreSQL 17 |
- pg-logical-replication depends on pq(node-postgres) >= 6.2.2 and eventemitter2
$ npm install pg-logical-replication
- This is an example using
wal2json
. A replication slot(test_slot_wal2json
) must be created on the PostgreSQL server.SELECT * FROM pg_create_logical_replication_slot('test_slot_wal2json', 'wal2json')
const slotName = 'test_slot_wal2json';
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
{
database: 'playground',
// ...
},
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
{
acknowledge: {
auto: true,
timeoutSeconds: 10
}
}
)
// `TestDecodingPlugin` for test_decoding and `ProtocolBuffersPlugin` for decoderbufs are also available.
const plugin = new Wal2JsonPlugin({
/**
* Plugin options for wal2json
* https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-options.type.ts
*/
//...
});
/**
* Wal2Json.Output
* https://github.com/kibae/pg-logical-replication/blob/ts-main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
*/
service.on('data', (lsn: string, log: Wal2Json.Output) => {
// Do something what you want.
// log.change.filter((change) => change.kind === 'insert').length;
});
// Start subscribing to data change events.
(function proc() {
service.subscribe(plugin, slotName)
.catch((e) => {
console.error(e);
})
.then(() => {
setTimeout(proc, 100);
});
})();
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
clientConfig
:
{
user ? : string | undefined;
database ? : string | undefined;
password ? : string | (() => string | Promise<string>) | undefined;
port ? : number | undefined;
host ? : string | undefined;
connectionString ? : string | undefined;
keepAlive ? : boolean | undefined;
stream ? : stream.Duplex | undefined;
statement_timeout ? : false | number | undefined;
parseInputDatesAsUTC ? : boolean | undefined;
ssl ? : boolean | ConnectionOptions | undefined;
query_timeout ? : number | undefined;
keepAliveInitialDelayMillis ? : number | undefined;
idle_in_transaction_session_timeout ? : number | undefined;
application_name ? : string | undefined;
connectionTimeoutMillis ? : number | undefined;
types ? : CustomTypesConfig | undefined;
options ? : string | undefined;
}
,
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
config ? : Partial<{
acknowledge?: {
/**
* If the value is false, acknowledge must be done manually.
* Default: true
*/
auto: boolean;
/**
* Acknowledge is performed every set time (sec). If 0, do not do it.
* Default: 10
*/
timeoutSeconds: 0 | 10 | number;
};
}>
)
- Receive changes from the server.
plugin
output plugins.slotName
Logical replication slot name. You can create slot via pg_create_logical_replication_slot function.uptoLsn
(optional) The starting point of the data to be streamed.
- After processing the data, it signals the PostgreSQL server that it is OK to clear the WAL log.
- Usually this is done automatically.
- Manually use only when
new LogicalReplicationService({}, {acknowledge: {auto: false}})
.
on(event: 'start', listener: () => Promise<void> | void)
- Emitted when start replication.
on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)
- Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
on(event: 'error', listener: (err: Error) => void)
on(event: 'acknowledge', listener: (lsn: string) => Promise<void> | void)
- Emitted when acknowledging automatically.
on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)
- A heartbeat check signal has been received from the server. You may need to run
service.acknowledge()
.
- A heartbeat check signal has been received from the server. You may need to run
stop(): Promise<this>
- Terminate the server's connection and stop replication.
isStop(): boolean
- Returns false when replication starts from the server.
lastLsn(): string
- Returns the last LSN(Log Sequence Number) received from the server.
PgoutputPlugin
for pgoutput (Native to PostgreSQL)
- Use the pgoutput plugin to process large-scale transactions.
4-2. Wal2JsonPlugin
for wal2json
4-3. ProtocolBuffersPlugin
for decoderbufs
TestDecodingPlugin
for test_decoding (Not recommended)