10000 GitHub - kibae/pg-logical-replication: PostgreSQL Logical Replication client for node.js
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

kibae/pg-logical-replication

Repository files navigation

pg-logical-replication

NPM Version License

PostgreSQL Versions on Node.js 16, 18, 20, 22, 24
PostgreSQL 14 Node.js(16, 18, 20, 22, 24) w/Postgres 14
PostgreSQL 15 Node.js(16, 18, 20, 22, 24) w/Postgres 15
PostgreSQL 16 Node.js(16, 18, 20, 22, 24) w/Postgres 16
PostgreSQL 17 Node.js(16, 18, 20, 22, 24) w/Postgres 17

1. Install

$ npm install pg-logical-replication

2. Usage

  • This is an example using wal2json. A replication slot(test_slot_wal2json) must be created on the PostgreSQL server.
    • SELECT * FROM pg_create_logical_replication_slot('test_slot_wal2json', 'wal2json')
const slotName = 'test_slot_wal2json';

const service = new LogicalReplicationService(
  /**
   * node-postgres Client options for connection
   * https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
   */
  {
    database: 'playground',
    // ...
  },
  /**
   * Logical replication service config
   * https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
   */
  {
    acknowledge: {
      auto: true,
      timeoutSeconds: 10
    }
  }
)

// `TestDecodingPlugin` for test_decoding and `ProtocolBuffersPlugin` for decoderbufs are also available.
const plugin = new Wal2JsonPlugin({
  /**
   * Plugin options for wal2json
   * https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-options.type.ts
   */
  //...
});

/**
 * Wal2Json.Output
 * https://github.com/kibae/pg-logical-replication/blob/ts-main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
 */
service.on('data', (lsn: string, log: Wal2Json.Output) => {
  // Do something what you want.
  // log.change.filter((change) => change.kind === 'insert').length;
});

// Start subscribing to data change events.
(function proc() {
  service.subscribe(plugin, slotName)
    .catch((e) => {
      console.error(e);
    })
    .then(() => {
      setTimeout(proc, 100);
    });
})();

3. LogicalReplicationService

3-1. Constructor(clientConfig: ClientConfig, config?: Partial<LogicalReplicationConfig>)

const service = new LogicalReplicationService(
  /**
   * node-postgres Client options for connection
   * https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
   */
  clientConfig
:
{
  user ? : string | undefined;
  database ? : string | undefined;
  password ? : string | (() => string | Promise<string>) | undefined;
  port ? : number | undefined;
  host ? : string | undefined;
  connectionString ? : string | undefined;
  keepAlive ? : boolean | undefined;
  stream ? : stream.Duplex | undefined;
  statement_timeout ? : false | number | undefined;
  parseInputDatesAsUTC ? : boolean | undefined;
  ssl ? : boolean | ConnectionOptions | undefined;
  query_timeout ? : number | undefined;
  keepAliveInitialDelayMillis ? : number | undefined;
  idle_in_transaction_session_timeout ? : number | undefined;
  application_name ? : string | undefined;
  connectionTimeoutMillis ? : number | undefined;
  types ? : CustomTypesConfig | undefined;
  options ? : string | undefined;
}
,
/**
 * Logical replication service config
 * https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
 */
config ? : Partial<{
  acknowledge?: {
    /**
     * If the value is false, acknowledge must be done manually.
     * Default: true
     */
    auto: boolean;
    /**
     * Acknowledge is performed every set time (sec). If 0, do not do it.
     * Default: 10
     */
    timeoutSeconds: 0 | 10 | number;
  };
}>
)

3-2. subscribe(plugin: AbstractPlugin, slotName: string, uptoLsn?: string): Promise<this>

3-3. acknowledge(lsn: string): Promise<boolean>

  • After processing the data, it signals the PostgreSQL server that it is OK to clear the WAL log.
  • Usually this is done automatically.
  • Manually use only when new LogicalReplicationService({}, {acknowledge: {auto: false}}).

3-4. Event

  • on(event: 'start', listener: () => Promise<void> | void)
    • Emitted when start replication.
  • on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)
    • Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
  • on(event: 'error', listener: (err: Error) => void)
  • on(event: 'acknowledge', listener: (lsn: string) => Promise<void> | void)
    • Emitted when acknowledging automatically.
  • on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)
    • A heartbeat check signal has been received from the server. You may need to run service.acknowledge().

3-5. Misc. method

  • stop(): Promise<this>
    • Terminate the server's connection and stop replication.
  • isStop(): boolean
    • Returns false when replication starts from the server.
  • lastLsn(): string

4. Output Plugins

4-1.

PgoutputPlugin for pgoutput (Native to PostgreSQL)

  • Use the pgoutput plugin to process large-scale transactions.

4-2. Wal2JsonPlugin for wal2json

4-3. ProtocolBuffersPlugin for decoderbufs

4-4.

TestDecodingPlugin for test_decoding (Not recommended)

Contributors

0