DuckQ is a persistent queue implementation in Go using DuckDB as the storage backend. It provides efficient enqueue and dequeue operations and maintains persistence across application restarts.
- Efficient enqueue and dequeue operations
- Persistence via DuckDB storage
- Support for acknowledgment-based processing
- Simple and clean API following the Queue interface
- Built as a persistence adapter for varmq
go get github.com/goptics/duckq
package main
import (
"fmt"
"log"
"github.com/goptics/duckq"
)
func main() {
// Create a new DuckDB queues manager
// The parameter is the path to the DuckDB database file
queuesManager := duckq.New("queue.db")
defer queuesManager.Close()
// Create a new queue
// The parameter is the name of the table to use for the queue
queue, err := queuesManager.NewQueue("my_queue")
if err != nil {
log.Fatalf("Failed to create queue: %v", err)
}
// You can also create a queue with custom options
// For example, to keep acknowledged items in the database:
queueWithOptions, err := queuesManager.NewQueue("my_other_queue",
duckq.WithRemoveOnComplete(false)) // Set to false to keep acknowledged items
if err != nil {
log.Fatalf("Failed to create queue: %v", err)
}
// Create a priority queue
priorityQueue, err := queuesManager.NewPriorityQueue("my_priority_queue")
if err != nil {
log.Fatalf("Failed to create priority queue: %v", err)
}
// Enqueue items
queue.Enqueue([]byte("item 1"))
queue.Enqueue([]byte("item 2"))
// Get queue length
fmt.Printf("Queue length: %d\n", queue.Len())
// Get all pending items
items := queue.Values()
fmt.Printf("All items: %v\n", items)
// Simple dequeue
item, success := queue.Dequeue()
if success {
fmt.Printf("Dequeued item: %v\n", string(item.([]byte)))
}
// Dequeue with acknowledgment
item, success, ackID := queue.DequeueWithAckId()
if success {
fmt.Printf("Dequeued item: %v with ack ID: %s\n", string(item.([]byte)), ackID)
// Process the item...
// Acknowledge the item after processing
acknowledged := queue.Acknowledge(ackID)
fmt.Printf("Item acknowledged: %v\n", acknowledged)
// Note: By default, acknowledged items are removed from the database
// With WithRemoveOnComplete(false), they would be marked as completed instead
}
// Purge the queue
queue.Purge()
}
DuckQ uses a DuckDB database to store queue items with the following schema:
id
: Unique identifier for each item (auto-increment primary key using DuckDB sequences)data
: The serialized item data (stored as a JSON blob)status
: The status of the item ("pending", "processing", or "completed")ack
: Boolean flag indicating whether the item has been acknowledgedpriority
: The priority of the item (only for priority queues)ack_id
: A unique ID for acknowledging processed itemscreated_at
: When the item was added to the queueupdated_at
: When the item was last updated
NOTE: By default, when an item is acknowledged, it is removed from the database. However, you can configure the queue to keep acknowledged items by using the
WithRemoveOnComplete(false)
option when creating the queue. In this case, acknowledged items will be marked as "completed" but will remain in the database.
- The queue is optimized for efficient enqueue and dequeue operations that scale well with queue size
- Operations leverage DuckDB's indexing for logarithmic time complexity rather than true constant-time
- DuckDB automatically optimizes for performance, eliminating the need for manual WAL configuration
- Proper indexing is set up on the status and creation time columns for efficient querying
- GitHub: @fahimfaisaal
- LinkedIn: in/fahimfaisaal
- Twitter: @FahimFaisaal