A lightweight, TypeScript-native Pub/Sub implementation designed for use with Mercurius GraphQL, built on top of Valkey (Redis-compatible).
Supports broadcasting messages to multiple listeners and enables easy event-driven design for GraphQL subscriptions or internal messaging systems.
- 🔁 Broadcast-style delivery: All listeners receive each message
- ⚡ Fast and simple API: Push, subscribe, destroy
- ✅ Compatible with Mercurius PubSub interface
- 🧪 Tested with Jest
- 🧱 Type-safe and modular — written in pure TypeScript
pnpm add valkey-pubsub
import ValkeyPubSub, { PubSubGenericQueue } from "valkey-pubsub";
const pubsub = await ValkeyPubSub.create({
addresses: [{ host: "localhost", port: 6379 }],
clusterMode: false,
});
const queue = new PubSubGenericQueue<string>();
await pubsub.subscribe("my-topic", queue);
queue.onItem((message) => {
console.log("Received:", message);
});
await pubsub.publish({
topic: "my-topic",
payload: { hello: "world" },
});
The original/driving force for the design was the subscription models for Mercurius
const pubsub = await ValkeyPubSub.create();
fastify.decorate("pubsub", pubsub);
fastify.register(mercurius, {
schema,
resolvers: await resolvers,
loaders: await loaders,
context: async (request: FastifyRequest) => {
return {
request,
db: server.db,
valkey: server.valkey,
pubsub: server.pubsub,
logger: server.log,
} as ServerDecorators;
},
subscription: {
context: async (_server, request) => {
return {
request,
db: server.db,
valkey: server.valkey,
pubsub: server.pubsub,
logger: server.log,
} as ServerDecorators;
},
pubsub: server.pubsub,
},
graphiql: false, // ℹ️ cannot use in place with helmet
allowBatchedQueries: true,
path: "/graphql", // 👈 Restricts GraphQL to this endpoint
prefix: "/",
});
Creates a PubSub instance.
Config options:
- addresses: Array of { host, port } objects (defaults to Valkey on localhost:6379)
- protocol: 'RESP2' or 'RESP3' (default: 'RESP3')
- clusterMode: true or false (default: false)
- Subscribes a queue to a topic. Every published message will be pushed to the queue and delivered to all its listeners.
- Publishes a message to the specified topic. Payload will be stringified before being sent.
- Cleans up all open connections and subscriptions.
A generic in-memory delivery queue that stores items and allows multiple listeners.
Methods:
push(value: T)
: Pushes a value onto the queueonItem(callback: (value: T) => void)
: Registers a callback for new itemsisEmpty()
: Returns true if the queue is emptysize()
: Returns the current number of pending itemsdestroy()
: Destroys the queue and runs any registered close callbacks
MIT © Chris Schuld
- Email - twitter handle @ gmail.com
- X - @cbschuld
Yes, thank you! Please update the docs and tests and add your name to the package.json file.