Basic framework of Asynq:
We are going to write two programs
- client.go → This file will create and schedule tasks to be processed asynchronously by the background workers.
- workers.go → This file will start multiple concurrent workers to process the task created by the client
Asynq uses client as redis broker. Both client.go and workers.go need to connect with redis to write and read from it respectively.
In Asynq, a unit or work is encapsulated in a type called Task, which conceptually has two fields: Type and Payload
//Type is a string value that indicates the type of the task
func (t *Task) Type() string
//Payload is the data needed for the task execution
func (t *Task) Payload() []byte
In client.go
, we are going to create a few tasks and enqueue them using asynq.Client
.
To create a task, use NewTask
function and pass type and payload for the task.
The [Enqueue](https://godoc.org/github.com/hibiken/asynq#Client.Enqueue)
method takes a task and any number of options.
Use [ProcessIn](https://godoc.org/github.com/hibiken/asynq#ProcessIn)
or [ProcessAt](https://godoc.org/github.com/hibiken/asynq#ProcessAt)
option to schedule tasks to be processed in the future.
Client
A Client is responsible for scheduling tasks.
A Client is used to register tasks that should be processed immediately or sometime in the future.
Clients are safe for concurrent use by multiple goroutines.
type Client struct {
broker base.Broker
}
// Broker is a message broker that supports operations to manage task queues
type Broker interface {
Ping() error
Close() error
***Enqueue***(ctx context.Context, msg *TaskMessage) error
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
***Dequeue***(qnames ...string) (*TaskMessage, time.Time, error)
***Done***(ctx context.Context, msg *TaskMessage) error
***MarkAsComplete***(ctx context.Context, msg *TaskMessage) error
***Requeue***(ctx context.Context, msg *TaskMessage) error
Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error
ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error
***Retry***(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
Archive(ctx context.Context, msg *TaskMessage, errMsg string) error
ForwardIfReady(qnames ...string) error
// Group aggregation related methods
AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error
AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error
ListGroups(qname string) ([]string, error)
AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error)
ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error)
DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error
ReclaimStaleAggregationSets(qname string) error
// Task retention related method
DeleteExpiredCompletedTasks(qname string) error
// Lease related methods
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
ExtendLease(qname string, ids ...string) (time.Time, error)
// State snapshot related methods
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
ClearServerState(host string, pid int, serverID string) error
// Cancelation related methods
***CancelationPubSub***() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
PublishCancelation(id string) error
WriteResult(qname, id string, data []byte) (n int, err error)
}
→ RedisClientOpt is used to create a redis client that connects to a redis server directly.
Task
Task basically represents a unit of work to be performed.
// Task represents a unit of work to be performed.
type Task struct {
// typename indicates the type of task to be performed.
typename string
// payload holds data needed to perform the task.
payload []byte
// opts holds options for the task.
opts []Option
// w is the ResultWriter for the task.
w *ResultWriter
}
// NewTask returns a new Task given a type name and payload data.
// Options can be passed to configure task processing behavior.
func NewTask(typename string, payload []byte, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
opts: opts,
}
}
By passing the opts …Option, we have made it kind of a optional parameter.
Queues that are formed in Redis
-
asynq:queues →
- this is a set of all the queues
- ⇒ SMEMBERS asynq:queues
-
asynq:{default}:pending →
- this is a list of all the task ids which are pending in default queue
- ⇒ LRANGE asynq:{default}:pending 0 -1 > "d5b05d66-ff91-49a0-b35f-47962b1996be”
-
asynq:{default}:t:d5b05d66-ff91-49a0-b35f-47962b1996be →
- ****this is the hashmap which contains all the details about the task
- ⇒ hgetall "asynq:{default}:t:d5b05d66-ff91-49a0-b35f-47962b1996be” >
- Values in the hashmap
"pending_since" : "1706164254626464900" "state" : "pending" "msg" : "\n\remail:welcome\x12\x0e{\"user_id\":42}\x1a$d5b05d66-ff91-49a0-b35f-47962b1996be\"\adefault(\x19@\x88\x0e"
Queue details after enqueing
{
ID:14818436-3f9e-4be1-97e5-a5f2393b9707
Queue:default // gets enqued in default queue
Type:email:welcome // type of the task
Payload:[123 34 117 115 101 114 95 105 100 34 58 52 50 125]
State:pending
MaxRetry:25
Retried:0
LastErr:
LastFailedAt:0001-01-01 00:00:00 +0000 UTC
Timeout:30m0s
Deadline:0001-01-01 00:00:00 +0000 UTC
Group:
NextProcessAt:2024-01-25 12:57:07.1329176 +0530 IST m=+0.006314401
IsOrphaned:false
Retention:0s
CompletedAt:0001-01-01 00:00:00 +0000 UTC
Result:[]
}
- Orphaned: In the context of task queues, an orphaned task is one that has become disconnected from its designated queue. This can happen for various reasons, such as:
- The queue itself might have been deleted while the task was still waiting to be processed.
- The broker storing the queue data might have experienced an error or failure, leading to the task being lost or detached.
- The task might have been manually moved or copied outside the regular queueing flow.
IsOrphaned:false This explicitly states that the specific task under consideration hasn't encountered any of the scenarios mentioned above. It remains firmly associated with its original queue and is part of the regular processing flow.
// Enqueue enqueues the given task to a queue.
//
// Enqueue returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// By default, **max retry is set to 25** and **timeout is set to 30 minutes**.
//
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
//
// Enqueue uses context.Background internally; to specify the context, use EnqueueContext.
func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
return c.EnqueueContext(context.Background(), task, opts...)
}
- By default, max retry is set to 25 and timeout is set to 30 minutes.
- If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
- ProcessIn returns an option to specify when to process the given task relative to the current time.
- ProcessAt returns an option to specify when to process the given task.
**// ProcessIn returns an option to specify when to process the given task relative to the current time.**
//
// If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
func ProcessIn(d time.Duration) Option {
return processInOption(d)
**// ProcessAt returns an option to specify when to process the given task.**
//
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
func ProcessAt(t time.Time) Option {
return processAtOption(t)
}
What is timeout???
Task State
// TaskState denotes the state of a task.
type TaskState int
const (
// Indicates that the task is currently being processed by Handler.
TaskStateActive TaskState = iota + 1
// Indicates that the task is ready to be processed by Handler.
TaskStatePending
// Indicates that the task is scheduled to be processed some time in the future.
TaskStateScheduled
// Indicates that the task has previously ***failed*** and scheduled to be processed some time in the future.
TaskStateRetry
// Indicates that the task is ***archived*** and stored for inspection purposes.
TaskStateArchived
// ***Indicates that the task is processed successfully and retained until the retention TTL expires.***
TaskStateCompleted
// Indicates that the task is waiting in a group to be aggregated into one task.
TaskStateAggregating
)
Task Info (returned by enqueue function)
// A TaskInfo describes a task and its metadata.
type TaskInfo struct {
// ID is the identifier of the task.
ID string
// Queue is the name of the queue in which the task belongs.
Queue string
// Type is the type name of the task.
Type string
// Payload is the payload data of the task.
Payload []byte
// State indicates the task state.
State TaskState
// MaxRetry is the maximum number of times the task can be retried.
MaxRetry int
// Retried is the number of times the task has retried so far.
Retried int
// LastErr is the error message from the last failure.
LastErr string
// LastFailedAt is the time time of the last failure if any.
// If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}).
LastFailedAt time.Time
***// Timeout is the duration the task can be processed by Handler before being retried,
// zero if not specified***
Timeout time.Duration
***// Deadline is the deadline for the task, zero value if not specified.***
Deadline time.Time
// Group is the name of the group in which the task belongs.
//
// Tasks in the same queue can be grouped together by Group name and will be aggregated into one task
// by a Server processing the queue.
//
// Empty string (default) indicates task does not belong to any groups, and no aggregation will be applied to the task.
Group string
// NextProcessAt is the time the task is scheduled to be processed,
// zero if not applicable.
NextProcessAt time.Time
***// IsOrphaned describes whether the task is left in active state with no worker processing it.
// An orphaned task indicates that the worker has crashed or experienced network failures and was not able to
// extend its lease on the task.***
//
***// This task will be recovered by running a server against the queue the task is in.
// This field is only applicable to tasks with TaskStateActive.***
IsOrphaned bool
***// Retention is duration of the retention period after the task is successfully processed.***
Retention time.Duration
***// CompletedAt is the time when the task is processed successfully.
// Zero value (i.e. time.Time{}) indicates no value.***
CompletedAt time.Time
// Result holds the result data associated with the task.
// Use ResultWriter to write result data from the Handler.
Result []byte
}
127.0.0.1:6379> keys *
1) "asynq:{default}:scheduled"
2) "asynq:{default}:pending"
3) "asynq:{default}:t:77005fca-225f-452d-adbe-66aac5561ad5"
4) "asynq:{default}:t:4974258a-d77a-4552-b8fa-16200a70380f"
5) "asynq:queues"
127.0.0.1:6379> zrange asynq:{default}:scheduled 0 -1
1) "77005fca-225f-452d-adbe-66aac5561ad5"
127.0.0.1:6379> hgetall "asynq:{default}:t:77005fca-225f-452d-adbe-66aac5561ad5"
1) "state"
2) "scheduled"
3) "msg"
4) "\n\x0eemail:reminder\x12\x0e{\"user_id\":42}\x1a$77005fca-225f-452d-adbe-66aac5561ad5\"\adefault(\x19@\x88\x0e"
127.0.0.1:6379> hgetall "asynq:{default}:t:4974258a-d77a-4552-b8fa-16200a70380f"
1) "pending_since"
2) "1706169155430582500" // January 25, 2024 1:22:35.430 PM GMT+05:30
3) "state"
4) "pending"
5) "msg"
6) "\n\remail:welcome\x12\x0e{\"user_id\":42}\x1a$4974258a-d77a-4552-b8fa-16200a70380f\"\adefault(\x19@\x88\x0e"
127.0.0.1:6379> smembers asynq:queues
1) "default"
127.0.0.1:6379> lrange asynq:{default}:pending 0 -1
1) "4974258a-d77a-4552-b8fa-16200a70380f"
127.0.0.1:6379>
[*] Successfully enqueued task: &{ID:4974258a-d77a-4552-b8fa-16200a70380f
Queue:default
Type:email:welcome
Payload:[123 34 117 115 101 114 95 105 100 34 58 52 50 125]
State:pending
MaxRetry:25
Retried:0
LastErr:
LastFailedAt:0001-01-01 00:00:00 +0000 UTC
Timeout:30m0s
Deadline:0001-01-01 00:00:00 +0000 UTC
Group:
NextProcessAt:2024-01-25 13:22:35.4181074 +0530 IST m=+0.005620601
IsOrphaned:false
Retention:0s
CompletedAt:0001-01-01 00:00:00 +0000 UTC
Result:[]}
[*] Successfully enqueued task: &{
ID:77005fca-225f-452d-adbe-66aac5561ad5
Queue:default
Type:email:reminder
Payload:[123 34 117 115 101 114 95 105 100 34 58 52 50 125]
State:scheduled
MaxRetry:25
Retried:0
LastErr:
LastFailedAt:0001-01-01 00:00:00 +0000 UTC
Timeout:30m0s
Deadline:0001-01-01 00:00:00 +0000 UTC
Group:
NextProcessAt:2024-01-26 13:22:35.432787 +0530 IST m=+86400.020300201 // will execute tomorrwo
IsOrphaned:false
Retention:0s
CompletedAt:0001-01-01 00:00:00 +0000 UTC
Result:[]}
Client Program
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
fmt.Println(client)
//create a task with a typename and a payload
payloadBytes, _ := json.Marshal(map[string]interface{}{"user_id": 42})
fmt.Println(payloadBytes)
fmt.Println(string(payloadBytes))
t1 := asynq.NewTask("email:welcome", payloadBytes)
fmt.Println(t1)
t2 := asynq.NewTask("email:reminder", payloadBytes)
fmt.Println(t2)
//Process the welcome email task immediately
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
fmt.Printf(" [*] Successfully enqueued task: %+v", info)
//Process the reminder email task 24 hrs later
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
fmt.Printf(" [*] Successfully enqueued task: %+v", info)
}
In workers.go
, we'll create a asynq.Server
instance to start the workers.
NewServer
function takes RedisConnOpt
and Config
- Server is reponsible for task processing and task lifecycle managment.
- Server pulls tasks off queues and processes them.
- If the processing of a task is unsuccessful, server will schedule it for a retry
- A task will be retried until either the task gets processed successfully or until it reaches its max retry count
- If a task exhausts its retries, it will be moved to the archive and will be kept in the archive set
- Note that the archive size is finite and once it reaches its max size, oldest tasks in the archive will be deleted
type Server struct {
logger *log.Logger
broker base.Broker
state *serverState
// wait group to wait for all goroutines to finish.
wg sync.WaitGroup
// A forwarder is responsible for moving scheduled and retry tasks to pending state
// so that the tasks get processed by the workers.
forwarder *forwarder
processor *processor
// syncer is responsible for queuing up failed requests to redis and retry
// those requests to sync state between the background process and redis.
syncer *syncer
// heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up.
heartbeater *heartbeater
subscriber *subscriber
recoverer *recoverer
// healthchecker is responsible for pinging broker periodically
// and call user provided HeathCheckFunc with the ping result.
healthchecker *healthchecker
// A janitor is responsible for deleting expired completed tasks from the specified
// queues. It periodically checks for any expired tasks in the completed set, and
// deletes them.
janitor *janitor
// An aggregator is responsible for checking groups and aggregate into one task
// if any of the grouping condition is met.
aggregator *aggregator
}
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// NewServer returns a new Server given a redis connection option
// and server configuration.
Config is used to tune the servers task processing behaviour
Config specifies the server’s background-task processing behaviour
-
Concurrency : Maximum number of concurrent processing of tasks. If set to a zero or negative value, NewServer will overwrite the value to the number of CPUs usable by the current process.
-
RetryDelayFunc : Function to calculate retry delay for a failed task. By default it uses exponential backoff algorithm to calculate the delay
-
IsFailure func(error) bool : Predicate function to determine whether the error returned from Handler is a failure. If the function returns false, Server will not increment the retried counter for the task, and Server won’t record the queue stats (processed and failed stats) to avoid skewing the error rate of the queue. By default, if the given error is non-nil the function returns true.
-
Queues map[string]int : List of queues to process with given priority value. Keys are the names of the queues and values are associated priority value. If set to nil or not specified, the server will process only the default queue. Priority is treated as follows to avoid starving low priority queues. Example
- Queues: map[string]int {
- “critical” : 6 ,
- “default” : 3 ,
- “low” : 1,
- }
- With the above config and given that all queues are not empty, the tasks in “critical”, “default”, “low” should be processed 60%, 30%, 10% of the time respectively.
- If a queue has a zero or negative priority value, the queue will be ignored
-
StrictPriority bool : StrictPriority indicates whether the queue priority should be treated strictly. If set to true, tasks in the queue with the highest priority is processed first. The tasks in lower priority queues are processed only when those queues with higher priorities are empty
-
ErrorHandler ErrorHandler,: ErrorHandler handles errors returned by the task handler. HandleError is invoked only if the task handler returns a non-nil error.
- ****Example
func reportError(ctx context, task *asynq.Task, err error) { retried, _ := asynq.GetRetryCount(ctx) maxRetry, _ := asynq.GetMaxRetry(ctx) if retried >= maxRetry { err = fmt.Errorf("retry exhausted for task %s: %w", task.Type, err) } errorReportingService.Notify(err) } ErrorHandler: asynq.ErrorHandlerFunc(reportError)
-
Logger Logger
-
LogLevel LogLevel
-
ShutDownTimeout time.Duration : ShutdownTimeout specifies the duration to wait to let workers finish their tasks before forcing them to abort when stopping the server. If unset or zero, default timeout of 8 seconds is used.
-
HealthCheckFunc func(error) : HealthCheckFunc is called periodically with any errors encountered during ping to the connected redis server
-
HealthCheckInterval time.Duration : HealthCheckInterval specifies the interval between healthchecks. If unset or zero, the interval is set to 15 seconds.
-
DelayedTaskCheckInterval time.Duration : DelayedTaskCheckInterval specifies the interval between checks run on ‘scheduled’ and ‘retry’ tasks, and forwarding them to ‘pending’ state if they are ready to be processed. If unset or zero, the interval is set to 5 seconds.
-
GroupGracePeriod time.Duration
-
GroupMaxDelay time.Duration
-
GroupMaxSize int
-
GroupAggregator
Exponential Backoff Stradey
// DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config.
// It uses exponential back-off strategy to calculate the retry delay.
func DefaultRetryDelayFunc(n int, e error, t *Task) time.Duration {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Formula taken from https://github.com/mperham/sidekiq.
s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1))
return time.Duration(s) * time.Second
}
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
var defaultQueueConfig = map[string]int{
base.DefaultQueueName: 1,
}
defaultShutdownTimeout = 8 * time.Second
defaultHealthCheckInterval = 15 * time.Second
defaultDelayedTaskCheckInterval = 5 * time.Second
defaultGroupGracePeriod = 1 * time.Minute
The argument to (*Server.Run) is an interface Handler which has one method ProcessTask.
Handler
- A Handler processes task
- ProcessTask should return nil if the processing of a task is successful
- If ProcessTask returns a non nil error or panics, the task will be retried after delay if retry-count is remaining, otherwise the task will be archived
- One exception to this rule is when ProcessTask returns a SkipRetry error. If the returned error is SkipRetry or an error wraps SkipRetry, retry is skipped and task will be immediately archived instead.
type Handler interface {
// ProcessTask should return nil if the task was processed successfully
// If ProcessTask returns a non-nil error or panics, the task will be retried again later
ProcessTask(context.Context, *Task) error
}
The simplest way to implement a handler, is to define a function with the same signature *(context.Context, Task)
and use asynq.HandlerFunc adapter type when passing it to Run
HandlerFunc Adapter
- The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler
- If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.
// The HandlerFunc type is an adapter to allow the use of
// ordinary functions as a Handler. If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f.
type HandlerFunc func(context.Context, *Task) error
Run
Run starts the task processing and blocks
until an os signal to exit the program is received.
Once it receives a signal, it gracefully shuts down all active workers and other goroutines to process the tasks.
Run returns any error encountered at server startup time.
If the server has already been shutdown, (Run is called after the server has already shutdown), ErrServerClosed is returned
func (srv *Server) Run(handler Handler) error {
if err := srv.Start(handler); err != nil {
return err
}
srv.waitForSignals()
srv.Shutdown()
return nil
}
// ErrServerClosed indicates that the operation is now illegal because of the server
// has been shutdown
var ErrServerClosed = errors.New("asynq: Server closed")
Impementation using asynq.HandlerFunc adapter
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
fmt.Printf("%+v", srv)
// Use asynq.HandlerFunc adapter for a handler function
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
/* type Handler interface {
// ProcessTask should return nil if the task was processed successfully
// If ProcessTask returns a non-nil error or panics, the task will be retried again later
ProcessTask(context.Context, *Task) error
} */
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
fmt.Printf(" [*] Send Welcome Email to User %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
fmt.Printf(" [*] Send Reminder Email to User %d", p.UserID)
default:
return fmt.Errorf("unexpected task type: %s", t.Type())
}
return nil
}
Keys that are formed in redis when the worker server starts
127.0.0.1:6379> keys *
1) "asynq:workers"
2) "deepak"
3) "asynq:servers"
4) "asynq:servers:{DESKTOP-KE1MMG0:8936:eb4e2f3b-0f38-4504-9475-84d2364299c8}"
127.0.0.1:6379> type "asynq:workers"
zset
127.0.0.1:6379> zrange "asynq:workers" 0 -1
1) "asynq:workers:{DESKTOP-KE1MMG0:8936:eb4e2f3b-0f38-4504-9475-84d2364299c8}"
127.0.0.1:6379> type "asynq:servers"
zset
127.0.0.1:6379> zrange "asynq:servers"
(error) ERR wrong number of arguments for 'zrange' command
127.0.0.1:6379> zrange "asynq:servers" 0 -1
1) "asynq:servers:{DESKTOP-KE1MMG0:8936:eb4e2f3b-0f38-4504-9475-84d2364299c8}"
127.0.0.1:6379> type asynq:servers:{DESKTOP-KE1MMG0:8936:eb4e2f3b-0f38-4504-9475-84d2364299c8}
string
127.0.0.1:6379> get asynq:servers:{DESKTOP-KE1MMG0:8936:eb4e2f3b-0f38-4504-9475-84d2364299c8}
"\n\x0fDESKTOP-KE1MMG0\x10\xe8E\x1a$eb4e2f3b-0f38-4504-9475-84d2364299c8 \n*\x0b\n\adefault\x10\x01:\x06activeB\x0b\b\xf4\xb5\xcd\xad\x06\x10\x90\xcd\xed="
127.0.0.1:6379>
We could keep adding switch cases to this handler function, but in a realistic application, it is convenient to define the logic for each case in a different function.
We can use ServeMux
to create our handler. Just like the ServeMux
from net/http
package, you register a handler by calling the Handle
or HandleFunc.
(HandleFunc internally calls Handle) ServeMux
satisfies the [Handler](https://www.notion.so/Asynq-4657740a10be417d91b32c90e7de32bf?pvs=21)
interface so that you can pass it to (*Server).Run
Implementation of Handler Interface ⬇️
// ProcessTask dispatches the task to the handler whose
// pattern most closely matches the task type.
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error {
h, _ := mux.Handler(task)
return h.ProcessTask(ctx, task)
}
There are 5 things
-
Handler
(interface) -
Handler
(function called from here)- Handler returns the handler to use for the given task. It always returns a non-nil handler.
- Handler also returns the registered pattern that matches the task
- If there is no registered handler that applies to the task, handler returns a ‘not found’ handler which returns a error
- Implementation
func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) { mux.mu.RLock() defer mux.mu.RUnlock() h, pattern = mux.match(t.Type()) if h == nil { h, pattern = NotFoundHandler(), "" } for i := len(mux.mws) - 1; i >= 0; i-- { h = mux.mws[i](h) } return h, pattern }
-
HandlerFunc
adapter -
Handle
function- Handle Registers the handler for the given pattern
- If a handler already exists for pattern, Handle panics
- Implementation
func (mux *ServeMux) Handle(pattern string, handler Handler) { mux.mu.Lock() defer mux.mu.Unlock() if strings.TrimSpace(pattern) == "" { panic("asynq: invalid pattern") } if handler == nil { panic("asynq: nil handler") } if _, exist := mux.m[pattern]; exist { panic("asynq: multiple registrations for " + pattern) } if mux.m == nil { mux.m = make(map[string]muxEntry) } e := muxEntry{h: handler, pattern: pattern} mux.m[pattern] = e mux.es = appendSorted(mux.es, e) }
-
HandleFunc
function- Wrapper around Handle
- Internally calls Handle
- HandleFunc registers the handler function for the given pattern
- Internally also uses the HandlerFunc adapter
- Implementation
func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) { if handler == nil { panic("asynq: nil handler") } mux.Handle(pattern, HandlerFunc(handler)) }
-
This also works
- You can use either HandleFunc or Handle
mux.HandleFunc("email:welcome", sendWelcomeEmail) mux.Handle("email:welcome", asynq.HandlerFunc(sendWelcomeEmail))
ServeMux
ServeMux is a multiplexer for asynchronous tasks.
~ A multiplexer in electronics
It matches the type of each task against a list of registered patterns
and calls the handler for the pattern that most closely matches the task’s type name
[task type] → [pattern] → [handler for each pattern]
Longer patterns take precedence over shorter ones,
so that if there are handlers registered for both “images
” and “images:thumbnails
” patterns,
-
the
images:thumbnails
handler will be called for tasks with the type beginning with “images:thumbanils
”[ it does not matter that the task type
images:thumbanils
contains the wordimages
,images
handler would not be called and onlyimages:thumbnails
handler would be called ] -
and the
images
handler will receive tasks with the type name beginning with “images
”
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
mws []MiddlewareFunc
}
// NewServeMux allocates and returns a new ServeMux
func NewServeMux() *ServeMux {
return new(ServeMux)
}
// The new built-in function allocates memory. The first argument is a type, not a value,
// and the value returned is a pointer to a newly allocated zero value of that type
func new(Type) *Type
Worker Program
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/hibiken/asynq"
)
// Task payload for any email related tasks.
type EmailTaskPayload struct {
// ID for the email recipient.
UserID int
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
fmt.Printf("%+v", srv)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
/*
This also works
mux.Handle("email:welcome", asynq.HandlerFunc(sendWelcomeEmail))
*/
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
fmt.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
fmt.Printf(" [*] Send Reminder Email to User %d", p.UserID)
return nil
}
Life of a Task
Points
• Asynq supports multiple queues with different priorities, allowing you to manage the order in which tasks are processed.
- Note that the archive size is finite and once it reaches its max size, oldest tasks in the archive will be deleted
Asynq CLI is a command line tool to monitor the queues and tasks managed by asynq
package.
In order to use the tool, compile it using the following command:
go install github.com/hibiken/asynq/tools/asynq
This will create the asynq executable under your $GOPATH/bin
directory.
To view details on any command, use asynq help <command> <subcommand>
.
asynq dash
asynq stats
asynq queue [ls inspect history rm pause unpause]
asynq task [ls cancel delete archive run delete-all archive-all run-all]
asynq server [ls]
# with Docker (connect to a Redis server running on the host machine)
docker run --rm \
--name asynqmon \
-p 3000:3000 \
hibiken/asynqmon --port=3000 --redis-addr=host.docker.internal:6380
# with Docker (connect to a Redis server running in the Docker container)
docker run --rm \
--name asynqmon \
--network dev-network \
-p 8080:8080 \
hibiken/asynqmon --redis-addr=dev-redis:6379
To use the defaults, simply run and open http://localhost:8080.
# with a binary
./asynqmon
# with a docker image
docker run --rm \
--name asynqmon \
-p 8080:8080 \
hibiken/asynqmon
By default, Asynqmon web server listens on port 8080
and connects to a Redis server running on 127.0.0.1:6379
To see all available flags, run:
# with a binary
./asynqmon --help
# with a docker image
docker run hibiken/asynqmon --help
Here's the available flags:
Note: Use --redis-url
to specify address, db-number, and password with one flag value; Alternatively, use --redis-addr
, --redis-db
, and --redis-password
to specify each value.
Flag | Env | Description | Default |
---|---|---|---|
--port(int) | PORT | port number to use for web ui server | 8080 |
---redis-url(string) | REDIS_URL | URL to redis or sentinel server. See https://pkg.go.dev/github.com/hibiken/asynq#ParseRedisURI for supported format | "" |
--redis-addr(string) | REDIS_ADDR | address of redis server to connect to | "127.0.0.1:6379" |
--redis-db(int) | REDIS_DB | redis database number | 0 |
--redis-password(string) | REDIS_PASSWORD | password to use when connecting to redis server | "" |
--redis-cluster-nodes(string) | REDIS_CLUSTER_NODES | comma separated list of host:port addresses of cluster nodes | "" |
--redis-tls(string) | REDIS_TLS | server name for TLS validation used when connecting to redis server | "" |
--redis-insecure-tls(bool) | REDIS_INSECURE_TLS | disable TLS certificate host checks | false |
--enable-metrics-exporter(bool) | ENABLE_METRICS_EXPORTER | enable prometheus metrics exporter to expose queue metrics | false |
--prometheus-addr(string) | PROMETHEUS_ADDR | address of prometheus server to query time series | "" |
--read-only(bool) | READ_ONLY | use web UI in read-only mode | false |
To connect to a single redis server, use either --redis-url
or (--redis-addr
, --redis-db
, and --redis-password
).
Example:
$ ./asynqmon --redis-url=redis://:mypassword@localhost:6380/2
$ ./asynqmon --redis-addr=localhost:6380 --redis-db=2 --redis-password=mypassword
To connect to redis-sentinels, use --redis-url
.
Example:
$ ./asynqmon --redis-url=redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002?master=mymaster
To connect to a redis-cluster, use --redis-cluster-nodes
.
Example:
$ ./asynqmon --redis-cluster-nodes=localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7006
Note: Use --redis-url
to specify address, db-number, and password with one flag value; Alternatively, use --redis-addr
, --redis-db
, and --redis-password
to specify each value.
Flag | Env | Description | Default |
---|---|---|---|
--port(int) | PORT | port number to use for web ui server | 8080 |
---redis-url(string) | REDIS_URL | URL to redis or sentinel server. See https://pkg.go.dev/github.com/hibiken/asynq#ParseRedisURI for supported format | "" |
--redis-addr(string) | REDIS_ADDR | address of redis server to connect to | "127.0.0.1:6379" |
--redis-db(int) | REDIS_DB | redis database number | 0 |
--redis-password(string) | REDIS_PASSWORD | password to use when connecting to redis server | "" |
--redis-cluster-nodes(string) | REDIS_CLUSTER_NODES | comma separated list of host:port addresses of cluster nodes | "" |
--redis-tls(string) | REDIS_TLS | server name for TLS validation used when connecting to redis server | "" |
--redis-insecure-tls(bool) | REDIS_INSECURE_TLS | disable TLS certificate host checks | false |
--enable-metrics-exporter(bool) | ENABLE_METRICS_EXPORTER | enable prometheus metrics exporter to expose queue metrics | false |
--prometheus-addr(string) | PROMETHEUS_ADDR | address of prometheus server to query time series | "" |
--read-only(bool) | READ_ONLY | use web UI in read-only mode | false |
To connect to a single redis server, use either --redis-url
or (--redis-addr
, --redis-db
, and --redis-password
).
Example:
$ ./asynqmon --redis-url=redis://:mypassword@localhost:6380/2
$ ./asynqmon --redis-addr=localhost:6380 --redis-db=2 --redis-password=mypassword
To connect to redis-sentinels, use --redis-url
.
Example:
$ ./asynqmon --redis-url=redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002?master=mymaster
To connect to a redis-cluster, use --redis-cluster-nodes
.
Example:
$ ./asynqmon --redis-cluster-nodes=localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7006