8000 GitHub - goptics/duckq: A duckdb based queue written in go
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

goptics/duckq

Repository files navigation

DuckQ: A DuckDB-Based Persistent Queue for Go

Go Reference Go Report Card Go Version CI codecov

DuckQ is a persistent queue implementation in Go using DuckDB as the storage backend. It provides efficient enqueue and dequeue operations and maintains persistence across application restarts.

Features

  • Efficient enqueue and dequeue operations
  • Persistence via DuckDB storage
  • Support for acknowledgment-based processing
  • Simple and clean API following the Queue interface
  • Built as a persistence adapter for varmq

Installation

go get github.com/goptics/duckq

Usage

package main

import (
    "fmt"
    "log"

    "github.com/goptics/duckq"
)

func main() {
    // Create a new DuckDB queues manager
    // The parameter is the path to the DuckDB database file
    queuesManager := duckq.New("queue.db")
    defer queuesManager.Close()

    // Create a new queue
    // The parameter is the name of the table to use for the queue
    queue, err := queuesManager.NewQueue("my_queue")
    if err != nil {
        log.Fatalf("Failed to create queue: %v", err)
    }

    // You can also create a queue with custom options
    // For example, to keep acknowledged items in the database:
    queueWithOptions, err := queuesManager.NewQueue("my_other_queue",
        duckq.WithRemoveOnComplete(false)) // Set to false to keep acknowledged items
    if err != nil {
        log.Fatalf("Failed to create queue: %v", err)
    }

    // Create a priority queue
    priorityQueue, err := queuesManager.NewPriorityQueue("my_priority_queue")
    if err != nil {
        log.Fatalf("Failed to create priority queue: %v", err)
    }

    // Enqueue items
    queue.Enqueue([]byte("item 1"))
    queue.Enqueue([]byte("item 2"))

    // Get queue length
    fmt.Printf("Queue length: %d\n", queue.Len())

    // Get all pending items
    items := queue.Values()
    fmt.Printf("All items: %v\n", items)

    // Simple dequeue
    item, success := queue.Dequeue()
    if success {
        fmt.Printf("Dequeued item: %v\n", string(item.([]byte)))
    }

    // Dequeue with acknowledgment
    item, success, ackID := queue.DequeueWithAckId()
    if success {
        fmt.Printf("Dequeued item: %v with ack ID: %s\n", string(item.([]byte)), ackID)

        // Process the item...

        // Acknowledge the item after processing
        acknowledged := queue.Acknowledge(ackID)
        fmt.Printf("Item acknowledged: %v\n", acknowledged)

        // Note: By default, acknowledged items are removed from the database
        // With WithRemoveOnComplete(false), they would be marked as completed instead
    }

    // Purge the queue
    queue.Purge()
}

How It Works

DuckQ uses a DuckDB database to store queue items with the following schema:

  • id: Unique identifier for each item (auto-increment primary key using DuckDB sequences)
  • data: The serialized item data (stored as a JSON blob)
  • status: The status of the item ("pending", "processing", or "completed")
  • ack: Boolean flag indicating whether the item has been acknowledged
  • priority: The priority of the item (only for priority queues)
  • ack_id: A unique ID for acknowledging processed items
  • created_at: When the item was added to the queue
  • updated_at: When the item was last updated

NOTE: By default, when an item is acknowledged, it is removed from the database. However, you can configure the queue to keep acknowledged items by using the WithRemoveOnComplete(false) option when creating the queue. In this case, acknowledged items will be marked as "completed" but will remain in the database.

Performance Considerations

  • The queue is optimized for efficient enqueue and dequeue operations that scale well with queue size
  • Operations leverage DuckDB's indexing for logarithmic time complexity rather than true constant-time
  • DuckDB automatically optimizes for performance, eliminating the need for manual WAL configuration
  • Proper indexing is set up on the status and creation time columns for efficient querying

👤 Author

About

A duckdb based queue written in go

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

0