From 02acf6523f117621e303d983f887b80db42504fe Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Wed, 20 Mar 2024 15:24:12 -0400 Subject: [PATCH 01/10] feat: cas scaling --- cmd/cas/main.go | 256 +++++++++++++++---------------- common/aws/ddb/job.go | 14 +- common/aws/ddb/state.go | 2 + common/aws/queue/consumer.go | 47 ------ common/aws/queue/monitor.go | 2 + common/aws/queue/multiMonitor.go | 32 ++++ common/aws/queue/multiQueue.go | 86 +++++++++++ common/aws/queue/publisher.go | 60 -------- common/aws/queue/queue.go | 130 ++++++++++++++++ common/aws/queue/utils.go | 59 ++++--- common/aws/storage/s3.go | 50 ++++++ common/db/anchor.go | 2 + common/metrics/otl.go | 2 + common/notifs/discord.go | 2 + env/.env.dev | 1 - env/.env.prod | 5 +- env/.env.qa | 1 - env/.env.tnet | 3 +- go.mod | 47 +++--- go.sum | 109 +++++++------ models/constants.go | 4 +- models/job.go | 2 +- models/metrics.go | 1 + models/services.go | 13 +- services/batch.go | 31 +++- services/test_helpers.go | 4 +- services/validate.go | 14 +- 27 files changed, 611 insertions(+), 368 deletions(-) delete mode 100644 common/aws/queue/consumer.go create mode 100644 common/aws/queue/multiMonitor.go create mode 100644 common/aws/queue/multiQueue.go delete mode 100644 common/aws/queue/publisher.go create mode 100644 common/aws/queue/queue.go create mode 100644 common/aws/storage/s3.go diff --git a/cmd/cas/main.go b/cmd/cas/main.go index f864729..661dc6f 100644 --- a/cmd/cas/main.go +++ b/cmd/cas/main.go @@ -3,6 +3,7 @@ package main import ( "context" "log" + "math" "os" "os/signal" "strconv" @@ -13,12 +14,14 @@ import ( "github.com/joho/godotenv" "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/ceramicnetwork/go-cas" "github.com/ceramicnetwork/go-cas/common/aws/config" "github.com/ceramicnetwork/go-cas/common/aws/ddb" "github.com/ceramicnetwork/go-cas/common/aws/queue" + "github.com/ceramicnetwork/go-cas/common/aws/storage" "github.com/ceramicnetwork/go-cas/common/db" "github.com/ceramicnetwork/go-cas/common/loggers" "github.com/ceramicnetwork/go-cas/common/metrics" @@ -53,9 +56,11 @@ func main() { // HTTP clients dynamoDbClient := dynamodb.NewFromConfig(awsCfg) sqsClient := sqs.NewFromConfig(awsCfg) + s3Client := s3.NewFromConfig(awsCfg) stateDb := ddb.NewStateDb(serverCtx, logger, dynamoDbClient) jobDb := ddb.NewJobDb(serverCtx, logger, dynamoDbClient) + batchStore := storage.NewS3Store(logger, s3Client) discordHandler, err := notifs.NewDiscordHandler(logger) if err != nil { @@ -67,54 +72,88 @@ func main() { logger.Fatalf("error creating metric service: %v", err) } - // Queue publishers var visibilityTimeout *time.Duration = nil if configVisibilityTimeout, found := os.LookupEnv("QUEUE_VISIBILITY_TIMEOUT"); found { if parsedVisibilityTimeout, err := time.ParseDuration(configVisibilityTimeout); err == nil { visibilityTimeout = &parsedVisibilityTimeout } } - // Create the DLQ and prepare the redrive policy for the other queues - deadLetterQueue, err := queue.NewPublisher( + + // TODO: Rename queues/services as per the flow below + // Ref: https://linear.app/3boxlabs/issue/WS1-1586/rename-queuesservices-for-better-readability + // + // Data flow through the queues and services: + // - CAS API posts to the StreamConsolidate queue. + // - The StreamConsolidation service reads from the StreamConsolidate queue and posts to the ReadyRequest queue(s) + // or RequestStatus queue (when a request needs to be marked "Replaced" for some reason). + // - The RequestBatching service reads from the ReadyRequest queue(s) and collects them into a batch, which it then posts to the + // RequestBatch queue. For the duration that the RequestBatching service holds the requests it is batching, it will + // keep the SQS messages corresponding to these requests in-flight. + // - Anchor workers read from the RequestBatch queue and anchor the requests. + // - The RequestBatch queue is also monitored by the Worker service, which spawns workers to process the batch(es). + // - The RequestStatus service reads from the RequestStatus queue and updates the Anchor DB. + // - The Failure handling service reads from the Failure and Dead-Letter queues and posts alerts to Discord. + + // Set up all the services first, so we can create the queues and configure their callbacks. After that, we'll plumb + // the queues to the right services and start them. + // + // The Failure handling service reads from the Failure and Dead-Letter queues + failureHandlingService := services.NewFailureHandlingService(discordHandler, metricService, logger) + // The Validation service reads from the Validate queue and posts to the Ready and Status queues + validationService := services.NewValidationService(stateDb, metricService, logger) + // The Batching service reads from the Ready queue(s) and posts to the Batch queue. + batchingService := services.NewBatchingService(serverCtx, batchStore, metricService, logger) + // The Status service reads from the Status queue and updates the Anchor DB + statusService := services.NewStatusService(anchorDb, metricService, logger) + + // Now create all the queues. + // + // Create the DLQ and prepare the redrive options for the other queues + deadLetterQueue, dlqId, err := queue.NewQueue( serverCtx, + metricService, + logger, sqsClient, - queue.PublisherOpts{QueueType: queue.QueueType_DLQ, VisibilityTimeout: visibilityTimeout}, + queue.Opts{QueueType: queue.Type_DLQ, VisibilityTimeout: visibilityTimeout}, + failureHandlingService.DLQ, ) if err != nil { logger.Fatalf("error creating dead-letter queue: %v", err) } - dlqArn, err := queue.GetQueueArn(serverCtx, deadLetterQueue.GetUrl(), sqsClient) - if err != nil { - logger.Fatalf("error fetching dead-letter queue arn: %v", err) - } - redrivePolicy := &queue.QueueRedrivePolicy{ - DeadLetterTargetArn: dlqArn, - MaxReceiveCount: queue.DefaultMaxReceiveCount, + redriveOpts := &queue.RedriveOpts{ + DlqId: dlqId, + MaxReceiveCount: queue.DefaultMaxReceiveCount, } // Failure queue // TODO: Could this become recursive since the failure handler also consumes from the DLQ? The inability to handle // failures could put messages back in the DLQ that are then re-consumed by the handler. - failureQueue, err := queue.NewPublisher( + failureQueue, _, err := queue.NewQueue( serverCtx, + metricService, + logger, sqsClient, - queue.PublisherOpts{ - QueueType: queue.QueueType_Failure, + queue.Opts{ + QueueType: queue.Type_Failure, VisibilityTimeout: visibilityTimeout, - RedrivePolicy: redrivePolicy, + RedriveOpts: redriveOpts, }, + failureHandlingService.Failure, ) if err != nil { logger.Fatalf("error creating failure queue: %v", err) } // Validate queue - validateQueue, err := queue.NewPublisher( + validateQueue, _, err := queue.NewQueue( serverCtx, + metricService, + logger, sqsClient, - queue.PublisherOpts{ - QueueType: queue.QueueType_Validate, + queue.Opts{ + QueueType: queue.Type_Validate, VisibilityTimeout: visibilityTimeout, - RedrivePolicy: redrivePolicy, + RedriveOpts: redriveOpts, }, + validationService.Validate, ) if err != nil { logger.Fatalf("error creating validate queue: %v", err) @@ -122,135 +161,98 @@ func main() { // The Ready and Batch queues will need larger visibility timeouts than the other queues. Requests pulled from the // Ready queue will remain in flight for the batch linger duration. Batches from the Batch queue will remain in // flight as long as it takes for them to get anchored. + // // These queues will thus allow a smaller maximum receive count before messages fall through to the DLQ. Detecting // failures is harder given the longer visibility timeouts, so it's important that they be detected as soon as // possible. - longQueueVisibilityTimeout := visibilityTimeout - if configVisibilityTimeout, found := os.LookupEnv("LONG_QUEUE_VISIBILITY_TIMEOUT"); found { - if parsedVisibilityTimeout, err := time.ParseDuration(configVisibilityTimeout); err == nil { - longQueueVisibilityTimeout = &parsedVisibilityTimeout + anchorBatchLinger := models.DefaultAnchorBatchLinger + if configAnchorBatchLinger, found := os.LookupEnv("ANCHOR_BATCH_LINGER"); found { + if parsedAnchorBatchLinger, err := time.ParseDuration(configAnchorBatchLinger); err == nil { + anchorBatchLinger = parsedAnchorBatchLinger } } - longQueueMaxReceiveCount := redrivePolicy.MaxReceiveCount + // Add one hour to the anchor batch linger to get the long queue visibility timeout + longQueueVisibilityTimeout := anchorBatchLinger + time.Hour + longQueueMaxReceiveCount := redriveOpts.MaxReceiveCount if configMaxReceiveCount, found := os.LookupEnv("LONG_QUEUE_MAX_RECEIVE_COUNT"); found { if parsedMaxReceiveCount, err := strconv.Atoi(configMaxReceiveCount); err == nil { longQueueMaxReceiveCount = parsedMaxReceiveCount } } - longQueueRedrivePolicy := &queue.QueueRedrivePolicy{ - DeadLetterTargetArn: dlqArn, - MaxReceiveCount: longQueueMaxReceiveCount, + longQueueRedriveOpts := &queue.RedriveOpts{ + DlqId: dlqId, + MaxReceiveCount: longQueueMaxReceiveCount, } - readyQueue, err := queue.NewPublisher( + anchorBatchSize := models.DefaultAnchorBatchSize + if configAnchorBatchSize, found := os.LookupEnv(models.Env_AnchorBatchSize); found { + if parsedAnchorBatchSize, err := strconv.Atoi(configAnchorBatchSize); err == nil { + anchorBatchSize = parsedAnchorBatchSize + } + } + // Ready queue + // + // Create a minimum of 10 Ready queue publishers, or as many needed to process an anchor batch while keeping each + // queue below the maximum number of inflight SQS messages (120,000). + numReadyPublishers := int(math.Max(10, float64(anchorBatchSize/120_000))) + readyQueue, err := queue.NewMultiQueue( serverCtx, + metricService, + logger, sqsClient, - queue.PublisherOpts{ - QueueType: queue.QueueType_Ready, - VisibilityTimeout: longQueueVisibilityTimeout, - RedrivePolicy: longQueueRedrivePolicy, + queue.Opts{ + QueueType: queue.Type_Ready, + VisibilityTimeout: &longQueueVisibilityTimeout, + RedriveOpts: longQueueRedriveOpts, }, + batchingService.Batch, + numReadyPublishers, ) if err != nil { logger.Fatalf("error creating ready queue: %v", err) } - batchQueue, err := queue.NewPublisher( + // Batch queue + // + // Launch a number of workers greater than the batch size. This prevents a small number of workers from waiting on + // an incomplete batch to fill up because there aren't any workers available to add to the batch even when messages + // are available in the queue. The 2 multiplier is arbitrary but will allow two batches worth of requests to be read + // and processed in parallel. + maxBatchQueueWorkers := anchorBatchSize * 2 + batchQueue, _, err := queue.NewQueue( serverCtx, + metricService, + logger, sqsClient, - queue.PublisherOpts{ - QueueType: queue.QueueType_Batch, - VisibilityTimeout: longQueueVisibilityTimeout, - RedrivePolicy: longQueueRedrivePolicy, + queue.Opts{ + QueueType: queue.Type_Batch, + VisibilityTimeout: &longQueueVisibilityTimeout, + RedriveOpts: longQueueRedriveOpts, + NumWorkers: &maxBatchQueueWorkers, }, + batchingService.Batch, ) if err != nil { logger.Fatalf("error creating batch queue: %v", err) } // Status queue - statusQueue, err := queue.NewPublisher( + statusQueue, _, err := queue.NewQueue( serverCtx, + metricService, + logger, sqsClient, - queue.PublisherOpts{ - QueueType: queue.QueueType_Status, + queue.Opts{ + QueueType: queue.Type_Status, VisibilityTimeout: visibilityTimeout, - RedrivePolicy: redrivePolicy, + RedriveOpts: redriveOpts, }, + statusService.Status, ) if err != nil { logger.Fatalf("error creating status queue: %v", err) } - // ipfs queue - ipfsQueue, err := queue.NewPublisher( - serverCtx, - sqsClient, - queue.PublisherOpts{ - QueueType: queue.QueueType_IPFS, - VisibilityTimeout: visibilityTimeout, - RedrivePolicy: redrivePolicy, - }, - ) - if err != nil { - logger.Fatalf("error creating ipfs queue: %v", err) - } - - // Create utilization gauges for all the queues - if err = metricService.QueueGauge(serverCtx, deadLetterQueue.GetName(), queue.NewMonitor(deadLetterQueue.GetUrl(), sqsClient)); err != nil { - logger.Fatalf("error creating gauge for dead-letter queue: %v", err) - } - if err = metricService.QueueGauge(serverCtx, failureQueue.GetName(), queue.NewMonitor(failureQueue.GetUrl(), sqsClient)); err != nil { - logger.Fatalf("error creating gauge for failure queue: %v", err) - } - if err = metricService.QueueGauge(serverCtx, validateQueue.GetName(), queue.NewMonitor(validateQueue.GetUrl(), sqsClient)); err != nil { - logger.Fatalf("error creating gauge for validate queue: %v", err) - } - if err = metricService.QueueGauge(serverCtx, readyQueue.GetName(), queue.NewMonitor(readyQueue.GetUrl(), sqsClient)); err != nil { - logger.Fatalf("error creating gauge for ready queue: %v", err) - } - batchMonitor := queue.NewMonitor(batchQueue.GetUrl(), sqsClient) - if err = metricService.QueueGauge(serverCtx, batchQueue.GetName(), batchMonitor); err != nil { - logger.Fatalf("error creating gauge for batch queue: %v", err) - } - if err = metricService.QueueGauge(serverCtx, statusQueue.GetName(), queue.NewMonitor(statusQueue.GetUrl(), sqsClient)); err != nil { - logger.Fatalf("error creating gauge for status queue: %v", err) - } - if err = metricService.QueueGauge(serverCtx, ipfsQueue.GetName(), queue.NewMonitor(ipfsQueue.GetUrl(), sqsClient)); err != nil { - logger.Fatalf("error creating gauge for ipfs queue: %v", err) - } - - // Create the queue consumers. These consumers will be responsible for scaling event processing up based on load and - // also maintaining backpressure on the queues. - - // The Failure handling service reads from the Failure and Dead-Letter queues - failureHandlingService := services.NewFailureHandlingService(discordHandler, metricService, logger) - dlqConsumer := queue.NewConsumer(logger, deadLetterQueue, failureHandlingService.DLQ, nil) - failureConsumer := queue.NewConsumer(logger, failureQueue, failureHandlingService.Failure, nil) - - // The Status service reads from the Status queue and updates the Anchor DB - statusService := services.NewStatusService(anchorDb, metricService, logger) - statusConsumer := queue.NewConsumer(logger, statusQueue, statusService.Status, nil) - - // The IPFS service reads from the IPFS queue and puts or publishes - ipfsService := services.NewIpfsService(logger, metricService) - ipfsConsumer := queue.NewConsumer(logger, ipfsQueue, ipfsService.Run, nil) - - // The Batching service reads from the Ready queue and posts to the Batch queue - anchorBatchSize := models.DefaultAnchorBatchSize - if configAnchorBatchSize, found := os.LookupEnv(models.Env_AnchorBatchSize); found { - if parsedAnchorBatchSize, err := strconv.Atoi(configAnchorBatchSize); err == nil { - anchorBatchSize = parsedAnchorBatchSize - } - } - // Launch a number of workers greater than the batch size. This prevents a small number of workers from waiting on - // an incomplete batch to fill up because there aren't any workers available to add to the batch even when messages - // are available in the queue. The 2 multiplier is arbitrary but will allow two batches worth of requests to be read - // and processed in parallel. - maxBatchQueueWorkers := anchorBatchSize * 2 - batchingService := services.NewBatchingService(serverCtx, logger, batchQueue, metricService) - batchingConsumer := queue.NewConsumer(logger, readyQueue, batchingService.Batch, &maxBatchQueueWorkers) - - // The Validation service reads from the Validate queue and posts to the Ready and Status queues - validationService := services.NewValidationService(logger, stateDb, readyQueue, statusQueue, metricService) - validationConsumer := queue.NewConsumer(logger, validateQueue, validationService.Validate, nil) + // Wire up the queues and services, then start the services. + validationService.Start(readyQueue.Publisher(), statusQueue.Publisher()) + batchingService.Start(batchQueue.Publisher()) wg := sync.WaitGroup{} wg.Add(2) @@ -270,7 +272,7 @@ func main() { // - status updates // - failure handling // - DLQ - validationConsumer.Shutdown() + validateQueue.Shutdown() // The Batching service needs a special shutdown procedure: // - Start shutting down the queue consumer, which will prevent any new receive requests from being initiated. @@ -282,20 +284,19 @@ func main() { batchWg.Add(2) go func() { defer batchWg.Done() - batchingConsumer.Shutdown() + batchQueue.Shutdown() }() go func() { defer batchWg.Done() - batchingConsumer.WaitForRxShutdown() + batchQueue.WaitForRxShutdown() time.Sleep(5 * time.Second) batchingService.Flush() }() batchWg.Wait() - statusConsumer.Shutdown() - ipfsConsumer.Shutdown() - failureConsumer.Shutdown() - dlqConsumer.Shutdown() + statusQueue.Shutdown() + failureQueue.Shutdown() + deadLetterQueue.Shutdown() // Flush metrics metricService.Shutdown(serverCtx) @@ -310,15 +311,14 @@ func main() { go func() { defer wg.Done() // Monitor the Batch queue and spawn anchor workers accordingly - services.NewWorkerService(logger, batchMonitor, jobDb, metricService).Run(serverCtx) + services.NewWorkerService(logger, batchQueue.Monitor(), jobDb, metricService).Run(serverCtx) }() - dlqConsumer.Start() - failureConsumer.Start() - statusConsumer.Start() - ipfsConsumer.Start() - batchingConsumer.Start() - validationConsumer.Start() + deadLetterQueue.Start() + failureQueue.Start() + statusQueue.Start() + batchQueue.Start() + validateQueue.Start() if configAnchorAuditEnabled, found := os.LookupEnv(models.Env_AnchorAuditEnabled); found { if anchorAuditEnabled, err := strconv.ParseBool(configAnchorAuditEnabled); (err == nil) && anchorAuditEnabled { @@ -326,7 +326,7 @@ func main() { go func() { defer wg.Done() // Enable auditing of the anchor DB to check for pending anchor requests that might have been missed - services.NewRequestPoller(logger, anchorDb, stateDb, validateQueue, discordHandler).Run(serverCtx) + services.NewRequestPoller(logger, anchorDb, stateDb, validateQueue.Publisher(), discordHandler).Run(serverCtx) }() } } diff --git a/common/aws/ddb/job.go b/common/aws/ddb/job.go index 4e0db9e..b52b34e 100644 --- a/common/aws/ddb/job.go +++ b/common/aws/ddb/job.go @@ -20,6 +20,8 @@ import ( "github.com/ceramicnetwork/go-cas/models" ) +var _ models.JobRepository = &JobDatabase{} + type JobDatabase struct { ddbClient *dynamodb.Client table string @@ -43,17 +45,17 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error { func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { jobParams := map[string]interface{}{ - job.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker - job.JobParam_Overrides: map[string]string{}, + job.AnchorJobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker + job.AnchorJobParam_Overrides: map[string]string{}, } // Only enable continuous anchoring on Clay and Prod if (jdb.env == cas.EnvTag_Tnet) || (jdb.env == cas.EnvTag_Prod) { - jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_AppMode] = models.AnchorAppMode_ContinualAnchoring - jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_SchedulerStopAfterNoOp] = "true" + jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_AppMode] = models.AnchorAppMode_ContinualAnchoring + jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_SchedulerStopAfterNoOp] = "true" } // If an override anchor contract address is available, pass it through to the job. if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found { - jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress + jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress } newJob := models.NewJob(job.JobType_Anchor, jobParams) attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) { @@ -74,7 +76,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { if err != nil { return "", err } else { - return newJob.Job, nil + return newJob.JobId, nil } } } diff --git a/common/aws/ddb/state.go b/common/aws/ddb/state.go index 035c241..04d5909 100644 --- a/common/aws/ddb/state.go +++ b/common/aws/ddb/state.go @@ -19,6 +19,8 @@ import ( "github.com/ceramicnetwork/go-cas/models" ) +var _ models.StateRepository = &StateDatabase{} + type StateDatabase struct { client *dynamodb.Client checkpointTable string diff --git a/common/aws/queue/consumer.go b/common/aws/queue/consumer.go deleted file mode 100644 index 12d095b..0000000 --- a/common/aws/queue/consumer.go +++ /dev/null @@ -1,47 +0,0 @@ -package queue - -import ( - "math" - - "github.com/abevier/go-sqs/gosqs" - "github.com/ceramicnetwork/go-cas/models" -) - -const defaultNumConsumerWorkers = 100 - -type Consumer struct { - queueType QueueType - consumer *gosqs.SQSConsumer - logger models.Logger -} - -func NewConsumer(logger models.Logger, publisher *Publisher, callback gosqs.MessageCallbackFunc, numWorkers *int) *Consumer { - var maxWorkers float64 = defaultNumConsumerWorkers - if numWorkers != nil { - // Don't go below the default number of workers - maxWorkers = math.Max(maxWorkers, float64(*numWorkers)) - } - maxReceivedMessages := math.Ceil(float64(maxWorkers) * 1.2) - maxInflightRequests := math.Ceil(maxReceivedMessages / 10) - qOpts := gosqs.Opts{ - MaxReceivedMessages: int(maxReceivedMessages), - MaxWorkers: int(maxWorkers), - MaxInflightReceiveMessageRequests: int(maxInflightRequests), - } - return &Consumer{publisher.queueType, gosqs.NewConsumer(qOpts, publisher.publisher, callback), logger} -} - -func (c Consumer) Start() { - c.consumer.Start() - c.logger.Infof("%s: started", c.queueType) -} - -func (c Consumer) Shutdown() { - c.consumer.Shutdown() - c.logger.Infof("%s: stopped", c.queueType) -} - -func (c Consumer) WaitForRxShutdown() { - c.consumer.WaitForRxShutdown() - c.logger.Infof("%s: rx stopped", c.queueType) -} diff --git a/common/aws/queue/monitor.go b/common/aws/queue/monitor.go index 70ab45e..274858f 100644 --- a/common/aws/queue/monitor.go +++ b/common/aws/queue/monitor.go @@ -8,6 +8,8 @@ import ( "github.com/ceramicnetwork/go-cas/models" ) +var _ models.QueueMonitor = &Monitor{} + type Monitor struct { queueUrl string client *sqs.Client diff --git a/common/aws/queue/multiMonitor.go b/common/aws/queue/multiMonitor.go new file mode 100644 index 0000000..923437b --- /dev/null +++ b/common/aws/queue/multiMonitor.go @@ -0,0 +1,32 @@ +package queue + +import ( + "context" + + "github.com/ceramicnetwork/go-cas/models" +) + +var _ models.QueueMonitor = &MultiMonitor{} + +type MultiMonitor struct { + monitors []models.QueueMonitor +} + +func NewMultiMonitor(monitors []models.QueueMonitor) models.QueueMonitor { + return &MultiMonitor{monitors} +} + +func (m MultiMonitor) GetUtilization(ctx context.Context) (int, int, error) { + // Get the utilization of all sub-queues + totalMsgsUnprocessed := 0 + totalMsgsInFlight := 0 + for _, monitor := range m.monitors { + numMsgsUnprocessed, numMsgsInFlight, err := monitor.GetUtilization(ctx) + if err != nil { + return 0, 0, err + } + totalMsgsUnprocessed += numMsgsUnprocessed + totalMsgsInFlight += numMsgsInFlight + } + return totalMsgsUnprocessed, totalMsgsInFlight, nil +} diff --git a/common/aws/queue/multiQueue.go b/common/aws/queue/multiQueue.go new file mode 100644 index 0000000..246f800 --- /dev/null +++ b/common/aws/queue/multiQueue.go @@ -0,0 +1,86 @@ +package queue + +import ( + "context" + "math" + "sync/atomic" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + + "github.com/abevier/go-sqs/gosqs" + + "github.com/ceramicnetwork/go-cas/models" +) + +var _ models.Queue = &multiQueue{} +var _ models.QueuePublisher = &multiQueue{} + +type multiQueue struct { + queues []models.Queue + monitor models.QueueMonitor + index int32 +} + +func NewMultiQueue( + ctx context.Context, + metricService models.MetricService, + logger models.Logger, + sqsClient *sqs.Client, + opts Opts, + callback gosqs.MessageCallbackFunc, + numQueues int, +) (models.Queue, error) { + // Create at least one sub-queue + numQueues = int(math.Max(1, float64(numQueues))) + queues := make([]models.Queue, numQueues) + monitors := make([]models.QueueMonitor, numQueues) + if opts.NumWorkers != nil { + // Divide the workers evenly among the sub-queues + *opts.NumWorkers = int(math.Ceil(float64(*opts.NumWorkers / numQueues))) + } + for i := 0; i < numQueues; i++ { + if q, _, err := newQueue(ctx, metricService, logger, sqsClient, opts, callback, i); err != nil { + return nil, err + } else { + queues[i] = q + monitors[i] = q.Monitor() + } + } + return &multiQueue{queues, NewMultiMonitor(monitors), 0}, nil +} + +func (m *multiQueue) SendMessage(ctx context.Context, event any) (string, error) { + // Use an atomic counter to round-robin between sub-queues + i := atomic.AddInt32(&m.index, 1) - 1 + i = i % int32(len(m.queues)) + return m.queues[i].Publisher().SendMessage(ctx, event) +} + +func (m *multiQueue) Start() { + // Start all sub-queues + for _, q := range m.queues { + q.Start() + } +} + +func (m *multiQueue) Shutdown() { + // Shutdown all sub-queues + for _, q := range m.queues { + q.Shutdown() + } +} + +func (m *multiQueue) WaitForRxShutdown() { + // Wait for all sub-queues to shutdown + for _, q := range m.queues { + q.WaitForRxShutdown() + } +} + +func (m *multiQueue) Monitor() models.QueueMonitor { + return m.monitor +} + +func (m *multiQueue) Publisher() models.QueuePublisher { + return m +} diff --git a/common/aws/queue/publisher.go b/common/aws/queue/publisher.go deleted file mode 100644 index 1dbb288..0000000 --- a/common/aws/queue/publisher.go +++ /dev/null @@ -1,60 +0,0 @@ -package queue - -import ( - "context" - "encoding/json" - "time" - - "github.com/aws/aws-sdk-go-v2/service/sqs" - - "github.com/abevier/go-sqs/gosqs" -) - -const maxLinger = 250 * time.Millisecond - -type PublisherOpts struct { - QueueType QueueType - VisibilityTimeout *time.Duration - RedrivePolicy *QueueRedrivePolicy -} - -type Publisher struct { - queueType QueueType - queueUrl string - publisher *gosqs.SQSPublisher -} - -func NewPublisher(ctx context.Context, sqsClient *sqs.Client, opts PublisherOpts) (*Publisher, error) { - // Create the queue if it didn't already exist - if queueUrl, err := CreateQueue(ctx, sqsClient, opts); err != nil { - return nil, err - } else { - return &Publisher{ - opts.QueueType, - queueUrl, - gosqs.NewPublisher( - sqsClient, - queueUrl, - maxLinger, - ), - }, nil - } -} - -func (p Publisher) GetName() string { - return string(p.queueType) -} - -func (p Publisher) GetUrl() string { - return p.queueUrl -} - -func (p Publisher) SendMessage(ctx context.Context, event any) (string, error) { - if eventBody, err := json.Marshal(event); err != nil { - return "", err - } else if msgId, err := p.publisher.SendMessage(ctx, string(eventBody)); err != nil { - return "", err - } else { - return msgId, nil - } -} diff --git a/common/aws/queue/queue.go b/common/aws/queue/queue.go new file mode 100644 index 0000000..60600bc --- /dev/null +++ b/common/aws/queue/queue.go @@ -0,0 +1,130 @@ +package queue + +import ( + "context" + "encoding/json" + "math" + "time" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + + "github.com/abevier/go-sqs/gosqs" + + "github.com/ceramicnetwork/go-cas/models" +) + +var _ models.Queue = &queue{} +var _ models.QueuePublisher = &queue{} + +const maxLinger = 250 * time.Millisecond +const defaultNumConsumerWorkers = 1000 + +type RedriveOpts struct { + DlqId string + MaxReceiveCount int +} + +type Opts struct { + QueueType Type + VisibilityTimeout *time.Duration + RedriveOpts *RedriveOpts + NumWorkers *int +} + +type queue struct { + queueType Type + index int + publisher *gosqs.SQSPublisher + consumer *gosqs.SQSConsumer + monitor models.QueueMonitor + logger models.Logger +} + +func NewQueue( + ctx context.Context, + metricService models.MetricService, + logger models.Logger, + sqsClient *sqs.Client, + opts Opts, + callback gosqs.MessageCallbackFunc, +) (models.Queue, string, error) { + return newQueue(ctx, metricService, logger, sqsClient, opts, callback, 0) +} + +func newQueue( + ctx context.Context, + metricService models.MetricService, + logger models.Logger, + sqsClient *sqs.Client, + opts Opts, + callback gosqs.MessageCallbackFunc, + index int, +) (models.Queue, string, error) { + // Create the queue if it didn't already exist + if url, arn, name, err := CreateQueue(ctx, sqsClient, opts, index); err != nil { + return nil, "", err + } else { + monitor := NewMonitor(url, sqsClient) + if err = metricService.QueueGauge(ctx, name, monitor); err != nil { + logger.Fatalf("error creating gauge for %s queue: %v", name, err) + } + publisher := gosqs.NewPublisher( + sqsClient, + url, + maxLinger, + ) + var maxWorkers float64 = defaultNumConsumerWorkers + if opts.NumWorkers != nil { + // Don't go below the default number of workers + maxWorkers = math.Max(maxWorkers, float64(*opts.NumWorkers)) + } + maxReceivedMessages := math.Ceil(float64(maxWorkers) * 1.2) + maxInflightRequests := math.Ceil(maxReceivedMessages / 10) + qOpts := gosqs.Opts{ + MaxReceivedMessages: int(maxReceivedMessages), + MaxWorkers: int(maxWorkers), + MaxInflightReceiveMessageRequests: int(maxInflightRequests), + } + return &queue{ + opts.QueueType, + index, + publisher, + gosqs.NewConsumer(qOpts, publisher, callback), + monitor, + logger, + }, arn, nil + } +} + +func (p queue) SendMessage(ctx context.Context, event any) (string, error) { + if eventBody, err := json.Marshal(event); err != nil { + return "", err + } else if msgId, err := p.publisher.SendMessage(ctx, string(eventBody)); err != nil { + return "", err + } else { + return msgId, nil + } +} + +func (p queue) Start() { + p.consumer.Start() + p.logger.Infof("%s: started", p.queueType) +} + +func (p queue) Shutdown() { + p.consumer.Shutdown() + p.logger.Infof("%s: stopped", p.queueType) +} + +func (p queue) WaitForRxShutdown() { + p.consumer.WaitForRxShutdown() + p.logger.Infof("%s: rx stopped", p.queueType) +} + +func (p queue) Monitor() models.QueueMonitor { + return p.monitor +} + +func (p queue) Publisher() models.QueuePublisher { + return p +} diff --git a/common/aws/queue/utils.go b/common/aws/queue/utils.go index 850f245..020eec1 100644 --- a/common/aws/queue/utils.go +++ b/common/aws/queue/utils.go @@ -16,40 +16,48 @@ import ( "github.com/ceramicnetwork/go-cas/common" ) -type QueueType string +type Type string const ( - QueueType_Validate QueueType = "validate" - QueueType_Ready QueueType = "ready" - QueueType_Batch QueueType = "batch" - QueueType_Status QueueType = "status" - QueueType_Failure QueueType = "failure" - QueueType_DLQ QueueType = "dlq" - QueueType_IPFS QueueType = "ipfs" + Type_Validate Type = "validate" + Type_Ready Type = "ready" + Type_Batch Type = "batch" + Type_Status Type = "status" + Type_Failure Type = "failure" + Type_DLQ Type = "dlq" + Type_IPFS Type = "ipfs" ) const defaultVisibilityTimeout = 5 * time.Minute const DefaultMaxReceiveCount = 3 -type QueueRedrivePolicy struct { +type redrivePolicy struct { DeadLetterTargetArn string `json:"deadLetterTargetArn"` MaxReceiveCount int `json:"maxReceiveCount"` } -func CreateQueue(ctx context.Context, sqsClient *sqs.Client, opts PublisherOpts) (string, error) { +func CreateQueue(ctx context.Context, sqsClient *sqs.Client, opts Opts, index int) (string, string, string, error) { visibilityTimeout := defaultVisibilityTimeout if opts.VisibilityTimeout != nil { visibilityTimeout = *opts.VisibilityTimeout } + // Append a non-zero index to the queue name if specified + name := queueName(opts.QueueType) + if index > 0 { + name = fmt.Sprintf("%s-%d", name, index) + } createQueueIn := sqs.CreateQueueInput{ - QueueName: aws.String(queueName(opts.QueueType)), + QueueName: aws.String(name), Attributes: map[string]string{ string(types.QueueAttributeNameVisibilityTimeout): strconv.Itoa(int(visibilityTimeout.Seconds())), }, } // Configure redrive policy, if specified. - if opts.RedrivePolicy != nil && len(opts.RedrivePolicy.DeadLetterTargetArn) > 0 && opts.RedrivePolicy.MaxReceiveCount > 0 { - marshaledRedrivePolicy, _ := json.Marshal(opts.RedrivePolicy) + if opts.RedriveOpts != nil && len(opts.RedriveOpts.DlqId) > 0 && opts.RedriveOpts.MaxReceiveCount > 0 { + marshaledRedrivePolicy, _ := json.Marshal(redrivePolicy{ + DeadLetterTargetArn: opts.RedriveOpts.DlqId, + MaxReceiveCount: opts.RedriveOpts.MaxReceiveCount, + }) createQueueIn.Attributes[string(types.QueueAttributeNameRedrivePolicy)] = string(marshaledRedrivePolicy) } @@ -57,9 +65,11 @@ func CreateQueue(ctx context.Context, sqsClient *sqs.Client, opts PublisherOpts) defer httpCancel() if createQueueOut, err := sqsClient.CreateQueue(httpCtx, &createQueueIn); err != nil { - return "", err + return "", "", "", err + } else if arn, err := getQueueArn(ctx, *createQueueOut.QueueUrl, sqsClient); err != nil { + return "", "", "", err } else { - return *createQueueOut.QueueUrl, nil + return *createQueueOut.QueueUrl, arn, name, nil } } @@ -80,22 +90,7 @@ func GetQueueUtilization(ctx context.Context, queueUrl string, sqsClient *sqs.Cl return 0, 0, nil } -func GetQueueUrl(ctx context.Context, queueType QueueType, sqsClient *sqs.Client) (string, error) { - getQueueUrlIn := sqs.GetQueueUrlInput{ - QueueName: aws.String(queueName(queueType)), - } - - httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime) - defer httpCancel() - - if getQueueUrlOut, err := sqsClient.GetQueueUrl(httpCtx, &getQueueUrlIn); err != nil { - return "", nil - } else { - return *getQueueUrlOut.QueueUrl, nil - } -} - -func GetQueueArn(ctx context.Context, queueUrl string, sqsClient *sqs.Client) (string, error) { +func getQueueArn(ctx context.Context, queueUrl string, sqsClient *sqs.Client) (string, error) { if queueAttr, err := getQueueAttributes(ctx, queueUrl, sqsClient); err != nil { return "", err } else { @@ -119,6 +114,6 @@ func getQueueAttributes(ctx context.Context, queueUrl string, sqsClient *sqs.Cli } } -func queueName(queueType QueueType) string { +func queueName(queueType Type) string { return fmt.Sprintf("cas-anchor-%s-%s", os.Getenv(cas.Env_Env), string(queueType)) } diff --git a/common/aws/storage/s3.go b/common/aws/storage/s3.go new file mode 100644 index 0000000..8212c87 --- /dev/null +++ b/common/aws/storage/s3.go @@ -0,0 +1,50 @@ +package storage + +import ( + "bytes" + "context" + "encoding/json" + "os" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/ceramicnetwork/go-cas" + "github.com/ceramicnetwork/go-cas/common" + "github.com/ceramicnetwork/go-cas/models" +) + +var _ models.KeyValueRepository = &S3Store{} + +type S3Store struct { + client *s3.Client + logger models.Logger + bucket string +} + +func NewS3Store(logger models.Logger, s3Client *s3.Client) *S3Store { + bucket := "ceramic-" + os.Getenv(cas.Env_Env) + "-cas" + return &S3Store{s3Client, logger, bucket} +} + +func (s *S3Store) Store(ctx context.Context, key string, value interface{}) error { + if jsonBytes, err := json.Marshal(value); err != nil { + return err + } else { + httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime) + defer httpCancel() + + putObjectIn := s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: bytes.NewReader(jsonBytes), + ContentType: aws.String("application/json"), + } + if _, err = s.client.PutObject(httpCtx, &putObjectIn); err != nil { + return err + } else { + s.logger.Debugf("stored key: %s", key) + } + } + return nil +} diff --git a/common/db/anchor.go b/common/db/anchor.go index 5b9d5fd..16816d5 100644 --- a/common/db/anchor.go +++ b/common/db/anchor.go @@ -15,6 +15,8 @@ import ( "github.com/ceramicnetwork/go-cas/models" ) +var _ models.AnchorRepository = &AnchorDatabase{} + type AnchorDatabase struct { opts anchorDbOpts logger models.Logger diff --git a/common/metrics/otl.go b/common/metrics/otl.go index 9713a18..8f8562a 100644 --- a/common/metrics/otl.go +++ b/common/metrics/otl.go @@ -18,6 +18,8 @@ import ( "github.com/ceramicnetwork/go-cas/models" ) +var _ models.MetricService = &OtlMetricService{} + type OtlMetricService struct { caller string meterProvider *sdk.MeterProvider diff --git a/common/notifs/discord.go b/common/notifs/discord.go index 51a73a4..9c699ad 100644 --- a/common/notifs/discord.go +++ b/common/notifs/discord.go @@ -15,6 +15,8 @@ import ( "github.com/ceramicnetwork/go-cas/models" ) +var _ models.Notifier = &DiscordHandler{} + type DiscordColor int const ( diff --git a/env/.env.dev b/env/.env.dev index 6e849f0..e63ecf4 100644 --- a/env/.env.dev +++ b/env/.env.dev @@ -3,7 +3,6 @@ ANCHOR_BATCH_LINGER=10s ANCHOR_BATCH_SIZE=20 ANCHOR_BATCH_MONITOR_TICK=10s MAX_ANCHOR_WORKERS=2 -LONG_QUEUE_VISIBILITY_TIMEOUT=1h LONG_QUEUE_MAX_RECEIVE_COUNT=2 DB_NAME=ceramicanchorservicedev DB_PORT=5432 diff --git a/env/.env.prod b/env/.env.prod index b19c770..f425bb8 100644 --- a/env/.env.prod +++ b/env/.env.prod @@ -1,9 +1,8 @@ ANCHOR_AUDIT_ENABLED=true -ANCHOR_BATCH_LINGER=6h -ANCHOR_BATCH_SIZE=1024 +ANCHOR_BATCH_LINGER=1h +ANCHOR_BATCH_SIZE=1048576 ANCHOR_BATCH_MONITOR_TICK=10s MAX_ANCHOR_WORKERS=2 -LONG_QUEUE_VISIBILITY_TIMEOUT=7h LONG_QUEUE_MAX_RECEIVE_COUNT=2 DB_NAME=ceramicanchorserviceprod DB_PORT=5432 diff --git a/env/.env.qa b/env/.env.qa index 19cbf9b..7a6b86c 100644 --- a/env/.env.qa +++ b/env/.env.qa @@ -3,7 +3,6 @@ ANCHOR_BATCH_LINGER=1m ANCHOR_BATCH_SIZE=50 ANCHOR_BATCH_MONITOR_TICK=10s MAX_ANCHOR_WORKERS=2 -LONG_QUEUE_VISIBILITY_TIMEOUT=1h LONG_QUEUE_MAX_RECEIVE_COUNT=2 DB_NAME=ceramicanchorserviceqa DB_PORT=5432 diff --git a/env/.env.tnet b/env/.env.tnet index b7701f1..fbff9ed 100644 --- a/env/.env.tnet +++ b/env/.env.tnet @@ -1,9 +1,8 @@ ANCHOR_AUDIT_ENABLED=true ANCHOR_BATCH_LINGER=30m -ANCHOR_BATCH_SIZE=1024 +ANCHOR_BATCH_SIZE=1048576 ANCHOR_BATCH_MONITOR_TICK=10s MAX_ANCHOR_WORKERS=2 -LONG_QUEUE_VISIBILITY_TIMEOUT=1h LONG_QUEUE_MAX_RECEIVE_COUNT=2 DB_NAME=ceramicanchorservicetnet DB_PORT=5432 diff --git a/go.mod b/go.mod index ede8583..63f93b2 100644 --- a/go.mod +++ b/go.mod @@ -6,17 +6,18 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c require ( dagger.io/dagger v0.7.1 - github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908 - github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908 + github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20240306023245-19d920c575d7 + github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20240306023245-19d920c575d7 github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c github.com/alexflint/go-arg v1.4.2 - github.com/aws/aws-sdk-go-v2 v1.21.2 - github.com/aws/aws-sdk-go-v2/config v1.18.4 - github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7 - github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0 - github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11 - github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 + github.com/aws/aws-sdk-go-v2 v1.25.3 + github.com/aws/aws-sdk-go-v2/config v1.27.7 + github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.9 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.4 + github.com/aws/aws-sdk-go-v2/service/ecr v1.27.2 + github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1 + github.com/aws/aws-sdk-go-v2/service/sqs v1.31.2 github.com/disgoorg/disgo v0.16.4 github.com/disgoorg/snowflake/v2 v2.0.1 github.com/go-playground/validator v9.31.0+incompatible @@ -46,19 +47,23 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect github.com/alexflint/go-scalar v1.0.0 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect - github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect - github.com/aws/smithy-go v1.15.0 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.7 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.3 // indirect + github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.20.2 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.28.4 // indirect + github.com/aws/smithy-go v1.20.1 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect diff --git a/go.sum b/go.sum index ef19778..08fc889 100644 --- a/go.sum +++ b/go.sum @@ -42,10 +42,10 @@ dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBr dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1:a1inKt/atXimZ4Mv927x+r7UpyzRUf4emIoiiSC2TN4= dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= -github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908 h1:TMo2kFAn6p0c/S6dmnHhjKAfEH4EInjOxTTE3btOxQs= -github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908/go.mod h1:XIstImO8QgJX/ztikxq+gvYumQc4/23+qFHdJ8H60sk= -github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908 h1:vSJ8+L7TTDpZLUN+DSckpekBs/AAOsQ5MV0lcIHyzlE= -github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908/go.mod h1:ADJZlCg+tCuZ23wXpw7jSGj8CZ8Ye8qi2TERpcCpfms= +github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20240306023245-19d920c575d7 h1:9e7lQwbJeWYJSUJmATswcCygVimD5rH1/MIfCtR7klQ= +github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20240306023245-19d920c575d7/go.mod h1:XIstImO8QgJX/ztikxq+gvYumQc4/23+qFHdJ8H60sk= +github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20240306023245-19d920c575d7 h1:JQ/uior9XVgvTWz1rD8LdrTSJZhUtJVo2I7gfrTjuT8= +github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20240306023245-19d920c575d7/go.mod h1:ADJZlCg+tCuZ23wXpw7jSGj8CZ8Ye8qi2TERpcCpfms= github.com/99designs/gqlgen v0.17.2 h1:yczvlwMsfcVu/JtejqfrLwXuSP0yZFhmcss3caEvHw8= github.com/99designs/gqlgen v0.17.2/go.mod h1:K5fzLKwtph+FFgh9j7nFbRUdBKvTcGnsta51fsMTn3o= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= @@ -82,56 +82,56 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.17.3/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2 v1.21.2 h1:+LXZ0sgo8quN9UOKXXzAWRT3FWd4NxeXWOZom9pE7GA= -github.com/aws/aws-sdk-go-v2 v1.21.2/go.mod h1:ErQhvNuEMhJjweavOYhxVkn2RUx7kQXVATHrjKtxIpM= -github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE= -github.com/aws/aws-sdk-go-v2/config v1.18.4/go.mod h1:EZxMPLSdGAZ3eAmkqXfYbRppZJTzFTkv8VyEzJhKko4= -github.com/aws/aws-sdk-go-v2/credentials v1.13.4 h1:nEbHIyJy7mCvQ/kzGG7VWHSBpRB4H6sJy3bWierWUtg= -github.com/aws/aws-sdk-go-v2/credentials v1.13.4/go.mod h1:/Cj5w9LRsNTLSwexsohwDME32OzJ6U81Zs33zr2ZWOM= -github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7 h1:CyuByiiCA4lPfU8RaHJh2wIYYn0hkFlOkMfWkVY67Mc= -github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7/go.mod h1:pAMtgCPVxcKohC/HNI6nLwLeW007eYl3T+pq7yTMV3o= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 h1:tpNOglTZ8kg9T38NpcGBxudqfUAwUzyUnLQ4XSd0CHE= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM0fvu7deD08vvdRXyc/ueV+0SqaWE= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 h1:nFBQlGtkbPzp/NjZLuFxRqmT91rLJkgvsEQs68h962Y= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43/go.mod h1:auo+PiyLl0n1l8A0e8RIeR8tOzYPfZZH/JNlrJ8igTQ= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 h1:JRVhO25+r3ar2mKGP7E0LDl8K9/G36gjlqca5iQbaqc= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37/go.mod h1:Qe+2KtKml+FEsQF/DHmDV+xjtche/hwoF75EG4UlHW8= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 h1:KeTxcGdNnQudb46oOl4d90f2I33DF/c6q3RnZAmvQdQ= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28/go.mod h1:yRZVr/iT0AqyHeep00SZ4YfBAKojXz08w3XMBscdi0c= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.8/go.mod h1:jvXzk+hVrlkiQOvnq6jH+F6qBK0CEceXkEWugT+4Kdc= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0 h1:xmSAn14nM6IdHyuWO/bsrAagOQtnqzuUCLxdVmj9nhg= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0/go.mod h1:1HkLh8vaL4obF95fne7ZOu7sxomS/+vkBt3/+gqqwE4= -github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27 h1:7MhqbR+k+b0gbOxp+W8yXgsl/Z5/dtMh85K0WI8X2EA= -github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27/go.mod h1:wX9QEZJ8Dw1fdAKCOAUmSvAe3wNJFxnE/4AeYc8blGA= -github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11 h1:wlTgmb/sCmVRJrN5De3CiHj4v/bTCgL5+qpdEd0CPtw= -github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11/go.mod h1:Ce1q2jlNm8BVpjLaOnwnm5v2RClAbK6txwPljFzyW6c= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11/go.mod h1:iV4q2hsqtNECrfmlXyord9u4zyuFEJX9eLgLpSPzWA8= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 h1:7R8uRYyXzdD71KWVCL78lJZltah6VVznXBazvKjfH58= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15/go.mod h1:26SQUPcTNgV1Tapwdt4a1rOsYRsnBsJHLMPoxK2b0d8= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.20/go.mod h1:lxM5qubwGNX29Qy+xTFG8G0r2Mj/TmyC+h3hS/7E4V8= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37 h1:4LoizcvPT9A0tiAFhepxn0bGZXkzvN0pG0epydY3Pno= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37/go.mod h1:7xBUZyP6LeLc+5Ym9PG7atqw4sR28sBtYcHETik+bPE= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 h1:jlgyHbkZQAgAc7VIxJDmtouH8eNjOk2REVAQfVhdaiQ= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20/go.mod h1:Xs52xaLBqDEKRcAfX/hgjmD3YQ7c/W+BEyfamlO/W2E= -github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0 h1:ikSvot5NdywduxtkOwOa2GJFzFuJq1ZjXsGjoIA82Ao= -github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0= -github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 h1:ActQgdTNQej/RuUJjB9uxYVLDOvRGtUreXF8L3c8wyg= -github.com/aws/aws-sdk-go-v2/service/sso v1.11.26/go.mod h1:uB9tV79ULEZUXc6Ob18A46KSQ0JDlrplPni9XW6Ot60= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 h1:wihKuqYUlA2T/Rx+yu2s6NDAns8B9DgnRooB1PVhY+Q= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9/go.mod h1:2E/3D/mB8/r2J7nK42daoKP/ooCwbf0q1PznNc+DZTU= -github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 h1:VQFOLQVL3BrKM/NLO/7FiS4vcp5bqK0mGMyk09xLoAY= -github.com/aws/aws-sdk-go-v2/service/sts v1.17.6/go.mod h1:Az3OXXYGyfNwQNsK/31L4R75qFYnO641RZGAoV3uH1c= -github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= -github.com/aws/smithy-go v1.15.0 h1:PS/durmlzvAFpQHDs4wi4sNNP9ExsqZh6IlfdHXgKK8= -github.com/aws/smithy-go v1.15.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0= +github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo= +github.com/aws/aws-sdk-go-v2/config v1.27.7 h1:JSfb5nOQF01iOgxFI5OIKWwDiEXWTyTgg1Mm1mHi0A4= +github.com/aws/aws-sdk-go-v2/config v1.27.7/go.mod h1:PH0/cNpoMO+B04qET699o5W92Ca79fVtbUnvMIZro4I= +github.com/aws/aws-sdk-go-v2/credentials v1.17.7 h1:WJd+ubWKoBeRh7A5iNMnxEOs982SyVKOJD+K8HIezu4= +github.com/aws/aws-sdk-go-v2/credentials v1.17.7/go.mod h1:UQi7LMR0Vhvs+44w5ec8Q+VS+cd10cjwgHwiVkE0YGU= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.9 h1:wcPuFDEPyk5sY0qIPRJCgjGL+J7pkXexHs8t/0xIjvw= +github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.9/go.mod h1:KS9rl02fOHtG8eOcCvA0jFT30aUIoVs5tcq7lsSmJT0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.3 h1:p+y7FvkK2dxS+FEwRIDHDe//ZX+jDhP8HHE50ppj4iI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.3/go.mod h1:/fYB+FZbDlwlAiynK9KDXlzZl3ANI9JkD0Uhz5FjNT4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.3 h1:ifbIbHZyGl1alsAhPIYsHOg5MuApgqOvVeI8wIugXfs= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.3/go.mod h1:oQZXg3c6SNeY6OZrDY+xHcF4VGIEoNotX2B4PrDeoJI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.3 h1:Qvodo9gHG9F3E8SfYOspPeBt0bjSbsevK8WhRAUHcoY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.3/go.mod h1:vCKrdLXtybdf/uQd/YfVR2r5pcbNuEYKzMQpcxmeSJw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.3 h1:mDnFOE2sVkyphMWtTH+stv0eW3k0OTx94K63xpxHty4= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.3/go.mod h1:V8MuRVcCRt5h1S+Fwu8KbC7l/gBGo3yBAyUbJM2IJOk= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.4 h1:VdtD2r5ZzeX/PvaCUSUsiwu6K0SAhNzgJ50Wu/0KwhM= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.4/go.mod h1:HOZYCpIko/NOS693uPQINLs7drzMjRtIN1+XRL8IkfA= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.2 h1:MDfz/W2jzzQVYnTOGEM/f9eIGo/2BEbeuZZP4BLpiPw= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.2/go.mod h1:E5/EKXnoznpCHjUTexYBdLSkQ2gac4tgcFlr4LSAW0M= +github.com/aws/aws-sdk-go-v2/service/ecr v1.27.2 h1:lkRbIgRuYW3C8y+vTQnSc4D4fiuby7XpEVzKTJkWjcU= +github.com/aws/aws-sdk-go-v2/service/ecr v1.27.2/go.mod h1:H4zhX7f/oFn2xNTW+vXOTSXx5SsJ3PjwIlKJCpzm0DU= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.5 h1:mbWNpfRUTT6bnacmvOTKXZjR/HycibdWzNpfbrbLDIs= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.5/go.mod h1:FCOPWGjsshkkICJIn9hq9xr6dLKtyaWpuUojiN3W1/8= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.4 h1:ikwIKlf0+HbyOhTLo/BRT5z5c8FsjPLPgd75zcRonek= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.4/go.mod h1:Egp7w6xf3EzlnfkfnMbDtHtts8H21B9QrCvc+3NNT24= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.5 h1:K/NXvIftOlX+oGgWGIa3jDyYLDNsdVhsjHmsBH2GLAQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.5/go.mod h1:cl9HGLV66EnCmMNzq4sYOti+/xo8w34CsgzVtm2GgsY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.3 h1:4t+QEX7BsXz98W8W1lNvMAG+NX8qHz2CjLBxQKku40g= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.3/go.mod h1:oFcjjUq5Hm09N9rpxTdeMeLeQcxS7mIkBkL8qUKng+A= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.0 h1:k7gL76sSR0e2pLphjfmjD/+pDDtoOHvWp8ezpTsdyes= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.0/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1 h1:Y/TTvxMdYwNvhzolvneV1wEEN/ncQUSd1AnzFGTMPqM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.52.1/go.mod h1:MGTaf3x/+z7ZGugCGvepnx2DS6+caCYYqKhzVoLNYPk= +github.com/aws/aws-sdk-go-v2/service/sqs v1.31.2 h1:A9ihuyTKpS8Z1ou/D4ETfOEFMyokA6JjRsgXWTiHvCk= +github.com/aws/aws-sdk-go-v2/service/sqs v1.31.2/go.mod h1:J3XhTE+VsY1jDsdDY+ACFAppZj/gpvygzC5JE0bTLbQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.2 h1:XOPfar83RIRPEzfihnp+U6udOveKZJvPQ76SKWrLRHc= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.2/go.mod h1:Vv9Xyk1KMHXrR3vNQe8W5LMFdTjSeWk0gBZBzvf3Qa0= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.2 h1:pi0Skl6mNl2w8qWZXcdOyg197Zsf4G97U7Sso9JXGZE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.2/go.mod h1:JYzLoEVeLXk+L4tn1+rrkfhkxl6mLDEVaDSvGq9og90= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.4 h1:Ppup1nVNAOWbBOrcoOxaxPeEnSFB2RnnQdguhXpmeQk= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.4/go.mod h1:+K1rNPVyGxkRuv9NNiaZ4YhBFuyw2MMA9SlIJ1Zlpz8= +github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw= +github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= @@ -334,7 +334,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= diff --git a/models/constants.go b/models/constants.go index 2377d54..efa647e 100644 --- a/models/constants.go +++ b/models/constants.go @@ -2,8 +2,8 @@ package models import "time" -const DefaultAnchorBatchSize = 1024 -const DefaultAnchorBatchLinger = 12 * time.Hour +const DefaultAnchorBatchSize = 1048576 // 2^20 +const DefaultAnchorBatchLinger = time.Hour const ( Env_AnchorAuditEnabled = "ANCHOR_AUDIT_ENABLED" diff --git a/models/job.go b/models/job.go index 92200a8..3c486d2 100644 --- a/models/job.go +++ b/models/job.go @@ -25,7 +25,7 @@ const ( func NewJob(jobType job.JobType, params map[string]interface{}) job.JobState { return job.JobState{ - Job: uuid.New().String(), + JobId: uuid.New().String(), Stage: job.JobStage_Queued, Type: jobType, Ts: time.Now(), diff --git a/models/metrics.go b/models/metrics.go index 488bc60..0f25b5f 100644 --- a/models/metrics.go +++ b/models/metrics.go @@ -7,6 +7,7 @@ const ( MetricName_BatchCreated MetricName = "batch_created" MetricName_BatchIngressRequest MetricName = "batch_ingress_request" MetricName_BatchSize MetricName = "batch_size" + MetricName_BatchStored MetricName = "batch_stored" MetricName_FailureDlqMessage MetricName = "failure_dlq_message" MetricName_FailureMessage MetricName = "failure_message" MetricName_StatusIngressMessage MetricName = "status_ingress_message" diff --git a/models/services.go b/models/services.go index 445a56a..b9e7f53 100644 --- a/models/services.go +++ b/models/services.go @@ -26,8 +26,19 @@ type JobRepository interface { QueryJob(ctx context.Context, id string) (*job.JobState, error) } +type KeyValueRepository interface { + Store(ctx context.Context, key string, value interface{}) error +} + +type Queue interface { + Start() + Shutdown() + WaitForRxShutdown() + Monitor() QueueMonitor + Publisher() QueuePublisher +} + type QueuePublisher interface { - GetUrl() string SendMessage(ctx context.Context, event any) (string, error) } diff --git a/services/batch.go b/services/batch.go index 29c51d8..eec091f 100644 --- a/services/batch.go +++ b/services/batch.go @@ -17,12 +17,14 @@ import ( type BatchingService struct { batchPublisher models.QueuePublisher + batchStore models.KeyValueRepository batcher *batch.Executor[*models.AnchorRequestMessage, *uuid.UUID] metricService models.MetricService logger models.Logger + initialized bool } -func NewBatchingService(ctx context.Context, logger models.Logger, batchPublisher models.QueuePublisher, metricService models.MetricService) *BatchingService { +func NewBatchingService(ctx context.Context, batchStore models.KeyValueRepository, metricService models.MetricService, logger models.Logger) *BatchingService { anchorBatchSize := models.DefaultAnchorBatchSize if configAnchorBatchSize, found := os.LookupEnv(models.Env_AnchorBatchSize); found { if parsedAnchorBatchSize, err := strconv.Atoi(configAnchorBatchSize); err == nil { @@ -35,7 +37,12 @@ func NewBatchingService(ctx context.Context, logger models.Logger, batchPublishe anchorBatchLinger = parsedAnchorBatchLinger } } - batchingService := BatchingService{batchPublisher: batchPublisher, metricService: metricService, logger: logger} + batchingService := BatchingService{ + batchStore: batchStore, + metricService: metricService, + logger: logger, + initialized: false, + } beOpts := batch.Opts{MaxSize: anchorBatchSize, MaxLinger: anchorBatchLinger} batchingService.batcher = batch.New[*models.AnchorRequestMessage, *uuid.UUID]( beOpts, @@ -47,7 +54,15 @@ func NewBatchingService(ctx context.Context, logger models.Logger, batchPublishe return &batchingService } +func (b BatchingService) Start(batchPublisher models.QueuePublisher) { + b.batchPublisher = batchPublisher + b.initialized = true +} + func (b BatchingService) Batch(ctx context.Context, msgBody string) error { + if !b.initialized { + b.logger.Fatalf("batching service not initialized") + } anchorReq := new(models.AnchorRequestMessage) if err := json.Unmarshal([]byte(msgBody), anchorReq); err != nil { return err @@ -74,15 +89,23 @@ func (b BatchingService) batch(ctx context.Context, anchorReqs []*models.AnchorR anchorReqBatch.Ids[idx] = anchorReq.Id batchResults[idx] = results.New[*uuid.UUID](&anchorReqBatch.Id, nil) } + + // Store the batch to S3 before sending it to the queue + if err := b.batchStore.Store(ctx, anchorReqBatch.Id.String(), anchorReqBatch); err != nil { + b.logger.Errorf("error storing batch: %v, %v", anchorReqBatch.Id, err) + return nil, err + } + b.metricService.Count(ctx, models.MetricName_BatchStored, 1) + if _, err := b.batchPublisher.SendMessage(ctx, anchorReqBatch); err != nil { - b.logger.Errorf("error sending message: %v, %v", anchorReqBatch, err) + b.logger.Errorf("error sending message: %v, %v", anchorReqBatch.Id, err) return nil, err } b.metricService.Count(ctx, models.MetricName_BatchCreated, 1) b.metricService.Distribution(ctx, models.MetricName_BatchSize, batchSize) b.logger.Debugw( "batch generated", - "batch", anchorReqBatch, + "batch", anchorReqBatch.Id, ) return batchResults, nil } diff --git a/services/test_helpers.go b/services/test_helpers.go index e8e2423..38ac705 100644 --- a/services/test_helpers.go +++ b/services/test_helpers.go @@ -117,8 +117,8 @@ func (m *MockJobRepository) CreateJob(_ context.Context) (string, error) { return "", fmt.Errorf("failed to create job") } newJob := models.NewJob(job.JobType_Anchor, nil) - m.jobStore[newJob.Job] = &newJob - return newJob.Job, nil + m.jobStore[newJob.JobId] = &newJob + return newJob.JobId, nil } func (m *MockJobRepository) QueryJob(_ context.Context, id string) (*job.JobState, error) { diff --git a/services/validate.go b/services/validate.go index 5f27c26..262a477 100644 --- a/services/validate.go +++ b/services/validate.go @@ -15,13 +15,23 @@ type ValidationService struct { statusPublisher models.QueuePublisher metricService models.MetricService logger models.Logger + initialized bool } -func NewValidationService(logger models.Logger, stateDb models.StateRepository, readyPublisher models.QueuePublisher, statusPublisher models.QueuePublisher, metricService models.MetricService) *ValidationService { - return &ValidationService{stateDb, readyPublisher, statusPublisher, metricService, logger} +func NewValidationService(stateDb models.StateRepository, metricService models.MetricService, logger models.Logger) *ValidationService { + return &ValidationService{stateDb, nil, nil, metricService, logger, false} +} + +func (v ValidationService) Start(readyPublisher models.QueuePublisher, statusPublisher models.QueuePublisher) { + v.readyPublisher = readyPublisher + v.statusPublisher = statusPublisher + v.initialized = true } func (v ValidationService) Validate(ctx context.Context, msgBody string) error { + if !v.initialized { + v.logger.Fatalf("validation service not initialized") + } anchorReq := new(models.AnchorRequestMessage) if err := json.Unmarshal([]byte(msgBody), anchorReq); err != nil { return err From 878a666aca3040cace612f3e063ac65fb00a66b6 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 26 Mar 2024 18:50:36 -0400 Subject: [PATCH 02/10] don't send anchor request ids in batch requests --- models/messages.go | 2 +- services/batch.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/models/messages.go b/models/messages.go index 23a8188..c413c9e 100644 --- a/models/messages.go +++ b/models/messages.go @@ -17,7 +17,7 @@ type AnchorRequestMessage struct { type AnchorBatchMessage struct { Id uuid.UUID `json:"bid"` - Ids []uuid.UUID `json:"rids"` + Ids []uuid.UUID `json:"rids,omitempty"` } type RequestStatusMessage struct { diff --git a/services/batch.go b/services/batch.go index eec091f..97421f1 100644 --- a/services/batch.go +++ b/services/batch.go @@ -97,6 +97,8 @@ func (b BatchingService) batch(ctx context.Context, anchorReqs []*models.AnchorR } b.metricService.Count(ctx, models.MetricName_BatchStored, 1) + // Send just the batch ID in the message to the queue + anchorReqBatch.Ids = nil if _, err := b.batchPublisher.SendMessage(ctx, anchorReqBatch); err != nil { b.logger.Errorf("error sending message: %v, %v", anchorReqBatch.Id, err) return nil, err From dae70bf92f807f2632456581f59630b5459b577a Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 26 Mar 2024 20:18:24 -0400 Subject: [PATCH 03/10] various fixes --- cmd/cas/main.go | 16 ++++++++++------ common/aws/queue/queue.go | 7 ++++++- common/aws/storage/s3.go | 22 +++++++++++++++++----- models/messages.go | 2 +- services/batch.go | 19 ++++++++++++------- services/validate.go | 6 +++--- 6 files changed, 49 insertions(+), 23 deletions(-) diff --git a/cmd/cas/main.go b/cmd/cas/main.go index 661dc6f..70625a5 100644 --- a/cmd/cas/main.go +++ b/cmd/cas/main.go @@ -56,11 +56,15 @@ func main() { // HTTP clients dynamoDbClient := dynamodb.NewFromConfig(awsCfg) sqsClient := sqs.NewFromConfig(awsCfg) - s3Client := s3.NewFromConfig(awsCfg) + s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + o.UsePathStyle = true + }) stateDb := ddb.NewStateDb(serverCtx, logger, dynamoDbClient) jobDb := ddb.NewJobDb(serverCtx, logger, dynamoDbClient) - batchStore := storage.NewS3Store(logger, s3Client) + + casBucket := "ceramic-" + os.Getenv(cas.Env_Env) + "-cas" + batchStore := storage.NewS3Store(serverCtx, logger, s3Client, casBucket) discordHandler, err := notifs.NewDiscordHandler(logger) if err != nil { @@ -228,7 +232,7 @@ func main() { RedriveOpts: longQueueRedriveOpts, NumWorkers: &maxBatchQueueWorkers, }, - batchingService.Batch, + nil, ) if err != nil { logger.Fatalf("error creating batch queue: %v", err) @@ -284,11 +288,11 @@ func main() { batchWg.Add(2) go func() { defer batchWg.Done() - batchQueue.Shutdown() + readyQueue.Shutdown() }() go func() { defer batchWg.Done() - batchQueue.WaitForRxShutdown() + readyQueue.WaitForRxShutdown() time.Sleep(5 * time.Second) batchingService.Flush() }() @@ -317,7 +321,7 @@ func main() { deadLetterQueue.Start() failureQueue.Start() statusQueue.Start() - batchQueue.Start() + readyQueue.Start() validateQueue.Start() if configAnchorAuditEnabled, found := os.LookupEnv(models.Env_AnchorAuditEnabled); found { diff --git a/common/aws/queue/queue.go b/common/aws/queue/queue.go index 60600bc..c84176a 100644 --- a/common/aws/queue/queue.go +++ b/common/aws/queue/queue.go @@ -85,11 +85,15 @@ func newQueue( MaxWorkers: int(maxWorkers), MaxInflightReceiveMessageRequests: int(maxInflightRequests), } + var consumer *gosqs.SQSConsumer = nil + if callback != nil { + consumer = gosqs.NewConsumer(qOpts, publisher, callback) + } return &queue{ opts.QueueType, index, publisher, - gosqs.NewConsumer(qOpts, publisher, callback), + consumer, monitor, logger, }, arn, nil @@ -106,6 +110,7 @@ func (p queue) SendMessage(ctx context.Context, event any) (string, error) { } } +// TODO: Check for nil consumer func (p queue) Start() { p.consumer.Start() p.logger.Infof("%s: started", p.queueType) diff --git a/common/aws/storage/s3.go b/common/aws/storage/s3.go index 8212c87..09fcfe7 100644 --- a/common/aws/storage/s3.go +++ b/common/aws/storage/s3.go @@ -4,12 +4,9 @@ import ( "bytes" "context" "encoding/json" - "os" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/ceramicnetwork/go-cas" "github.com/ceramicnetwork/go-cas/common" "github.com/ceramicnetwork/go-cas/models" ) @@ -22,8 +19,11 @@ type S3Store struct { bucket string } -func NewS3Store(logger models.Logger, s3Client *s3.Client) *S3Store { - bucket := "ceramic-" + os.Getenv(cas.Env_Env) + "-cas" +func NewS3Store(ctx context.Context, logger models.Logger, s3Client *s3.Client, bucket string) *S3Store { + // Create the bucket if it doesn't exist + if err := createBucket(ctx, s3Client, bucket); err != nil { + logger.Fatalf("failed to create bucket %s: %v", bucket, err) + } return &S3Store{s3Client, logger, bucket} } @@ -48,3 +48,15 @@ func (s *S3Store) Store(ctx context.Context, key string, value interface{}) erro } return nil } + +func createBucket(ctx context.Context, client *s3.Client, bucket string) error { + httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime) + defer httpCancel() + + if _, err := client.CreateBucket(httpCtx, &s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }); err != nil { + return err + } + return nil +} diff --git a/models/messages.go b/models/messages.go index c413c9e..23a8188 100644 --- a/models/messages.go +++ b/models/messages.go @@ -17,7 +17,7 @@ type AnchorRequestMessage struct { type AnchorBatchMessage struct { Id uuid.UUID `json:"bid"` - Ids []uuid.UUID `json:"rids,omitempty"` + Ids []uuid.UUID `json:"rids"` } type RequestStatusMessage struct { diff --git a/services/batch.go b/services/batch.go index 97421f1..4d2f5fd 100644 --- a/services/batch.go +++ b/services/batch.go @@ -3,6 +3,7 @@ package services import ( "context" "encoding/json" + "fmt" "os" "strconv" "time" @@ -54,12 +55,12 @@ func NewBatchingService(ctx context.Context, batchStore models.KeyValueRepositor return &batchingService } -func (b BatchingService) Start(batchPublisher models.QueuePublisher) { +func (b *BatchingService) Start(batchPublisher models.QueuePublisher) { b.batchPublisher = batchPublisher b.initialized = true } -func (b BatchingService) Batch(ctx context.Context, msgBody string) error { +func (b *BatchingService) Batch(ctx context.Context, msgBody string) error { if !b.initialized { b.logger.Fatalf("batching service not initialized") } @@ -78,7 +79,7 @@ func (b BatchingService) Batch(ctx context.Context, msgBody string) error { } } -func (b BatchingService) batch(ctx context.Context, anchorReqs []*models.AnchorRequestMessage) ([]results.Result[*uuid.UUID], error) { +func (b *BatchingService) batch(ctx context.Context, anchorReqs []*models.AnchorRequestMessage) ([]results.Result[*uuid.UUID], error) { batchSize := len(anchorReqs) anchorReqBatch := models.AnchorBatchMessage{ Id: uuid.New(), @@ -91,15 +92,19 @@ func (b BatchingService) batch(ctx context.Context, anchorReqs []*models.AnchorR } // Store the batch to S3 before sending it to the queue - if err := b.batchStore.Store(ctx, anchorReqBatch.Id.String(), anchorReqBatch); err != nil { + key := fmt.Sprintf("cas/anchor/batch/%s", anchorReqBatch.Id.String()) + if err := b.batchStore.Store(ctx, key, anchorReqBatch); err != nil { b.logger.Errorf("error storing batch: %v, %v", anchorReqBatch.Id, err) return nil, err } b.metricService.Count(ctx, models.MetricName_BatchStored, 1) // Send just the batch ID in the message to the queue - anchorReqBatch.Ids = nil - if _, err := b.batchPublisher.SendMessage(ctx, anchorReqBatch); err != nil { + anchorBatchMessage := models.AnchorBatchMessage{ + Id: anchorReqBatch.Id, + Ids: []uuid.UUID{}, + } + if _, err := b.batchPublisher.SendMessage(ctx, anchorBatchMessage); err != nil { b.logger.Errorf("error sending message: %v, %v", anchorReqBatch.Id, err) return nil, err } @@ -112,7 +117,7 @@ func (b BatchingService) batch(ctx context.Context, anchorReqs []*models.AnchorR return batchResults, nil } -func (b BatchingService) Flush() { +func (b *BatchingService) Flush() { // Flush the current batch however far along it's gotten in size or expiration. The caller needs to ensure that no // more messages are sent to this service for processing once this function is called. Receiving more messages will // cause workers to wait till the end of the batch expiration if there aren't enough messages to fill the batch. diff --git a/services/validate.go b/services/validate.go index 262a477..fd3b592 100644 --- a/services/validate.go +++ b/services/validate.go @@ -22,13 +22,13 @@ func NewValidationService(stateDb models.StateRepository, metricService models.M return &ValidationService{stateDb, nil, nil, metricService, logger, false} } -func (v ValidationService) Start(readyPublisher models.QueuePublisher, statusPublisher models.QueuePublisher) { +func (v *ValidationService) Start(readyPublisher models.QueuePublisher, statusPublisher models.QueuePublisher) { v.readyPublisher = readyPublisher v.statusPublisher = statusPublisher v.initialized = true } -func (v ValidationService) Validate(ctx context.Context, msgBody string) error { +func (v *ValidationService) Validate(ctx context.Context, msgBody string) error { if !v.initialized { v.logger.Fatalf("validation service not initialized") } @@ -127,7 +127,7 @@ func (v ValidationService) Validate(ctx context.Context, msgBody string) error { } } -func (v ValidationService) sendStatusMsg(ctx context.Context, id uuid.UUID, status models.RequestStatus) error { +func (v *ValidationService) sendStatusMsg(ctx context.Context, id uuid.UUID, status models.RequestStatus) error { statusMsg := &models.RequestStatusMessage{Id: id, Status: status} if _, err := v.statusPublisher.SendMessage(ctx, statusMsg); err != nil { v.logger.Errorf("error sending status message: %v, %v", statusMsg, err) From 0269554dd2d653d9236ce234fa450b2fbe9755b2 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Wed, 27 Mar 2024 13:45:42 -0400 Subject: [PATCH 04/10] add check for unspecified queue consumer --- common/aws/queue/queue.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/aws/queue/queue.go b/common/aws/queue/queue.go index c84176a..b92cdc5 100644 --- a/common/aws/queue/queue.go +++ b/common/aws/queue/queue.go @@ -110,8 +110,10 @@ func (p queue) SendMessage(ctx context.Context, event any) (string, error) { } } -// TODO: Check for nil consumer func (p queue) Start() { + if p.consumer == nil { + p.logger.Fatalf("%s: consumer not configured", p.queueType) + } p.consumer.Start() p.logger.Infof("%s: started", p.queueType) } From 1f0629cc8e075c252a84cb05611a673d18ad18a3 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Wed, 27 Mar 2024 16:36:26 -0400 Subject: [PATCH 05/10] match dev/prod batching settings --- env/.env.dev | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/env/.env.dev b/env/.env.dev index e63ecf4..b363c85 100644 --- a/env/.env.dev +++ b/env/.env.dev @@ -1,6 +1,6 @@ ANCHOR_AUDIT_ENABLED=true -ANCHOR_BATCH_LINGER=10s -ANCHOR_BATCH_SIZE=20 +ANCHOR_BATCH_LINGER=1h +ANCHOR_BATCH_SIZE=1048576 ANCHOR_BATCH_MONITOR_TICK=10s MAX_ANCHOR_WORKERS=2 LONG_QUEUE_MAX_RECEIVE_COUNT=2 From 34e6acb90f56351ca706229adcf368f6f8ae33ac Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Wed, 27 Mar 2024 19:47:07 -0400 Subject: [PATCH 06/10] increase number of ready queue consumer workers --- cmd/cas/main.go | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/cmd/cas/main.go b/cmd/cas/main.go index 70625a5..04cf1a7 100644 --- a/cmd/cas/main.go +++ b/cmd/cas/main.go @@ -198,6 +198,29 @@ func main() { // Create a minimum of 10 Ready queue publishers, or as many needed to process an anchor batch while keeping each // queue below the maximum number of inflight SQS messages (120,000). numReadyPublishers := int(math.Max(10, float64(anchorBatchSize/120_000))) + // The Ready queue is a "multi-queue", which is necessary because of the unconventional way we use it. + // + // The BatchingService contains logic to "hold" anchor request SQS messages until one of two conditions is + // satisfied: either the batch linger duration has run out, or the batch is full. While requests are held this way, + // they are considered "in-flight" w.r.t. SQS. Once a batch is formed and recorded, all the messages that comprise + // the batch are ACK'd and thus deleted from SQS. If a batch fails to get created, all messages will be NACK'd (if + // there is a handled error) or simply not ACK'd (if there is an unhandled error / crash), which will make them + // visible to the BatchingService once it recovers/retries. + // + // This allows SQS to be the persistence for messages flowing through the system versus needing to store batches in + // a DB to be able to recover messages in case of a failure. While the latter is doable, it pushes the (non-trivial) + // complexity for maintaining batches down to the DB. + // + // Given this context and the fact that the maximum number of in-flight message a SQS queue can have is 120,000, we + // can only have batches of up to 120,000 requests with a single queue. Using a multi-queue allows the + // BatchingService to hold a much larger number of anchor requests in-flight, up to 120,000 per sub-queue. This + // allows batches to be constructed with virtually any size that we want. + // + // Because each message is held in-flight till its batch is formed, we need a number of consumer workers greater + // than the batch size. This prevents a smaller number of workers from waiting on an incomplete batch to fill up + // because there aren't any workers available to add to the batch even when messages are available in the queue. The + // 2 multiplier is arbitrary but will allow two batches worth of requests to be read and processed in parallel. + maxReadyQueueWorkers := anchorBatchSize * 2 readyQueue, err := queue.NewMultiQueue( serverCtx, metricService, @@ -207,6 +230,7 @@ func main() { QueueType: queue.Type_Ready, VisibilityTimeout: &longQueueVisibilityTimeout, RedriveOpts: longQueueRedriveOpts, + NumWorkers: &maxReadyQueueWorkers, }, batchingService.Batch, numReadyPublishers, @@ -215,12 +239,6 @@ func main() { logger.Fatalf("error creating ready queue: %v", err) } // Batch queue - // - // Launch a number of workers greater than the batch size. This prevents a small number of workers from waiting on - // an incomplete batch to fill up because there aren't any workers available to add to the batch even when messages - // are available in the queue. The 2 multiplier is arbitrary but will allow two batches worth of requests to be read - // and processed in parallel. - maxBatchQueueWorkers := anchorBatchSize * 2 batchQueue, _, err := queue.NewQueue( serverCtx, metricService, @@ -230,7 +248,6 @@ func main() { QueueType: queue.Type_Batch, VisibilityTimeout: &longQueueVisibilityTimeout, RedriveOpts: longQueueRedriveOpts, - NumWorkers: &maxBatchQueueWorkers, }, nil, ) From 7f8e690cd4fea05acbed729abb7f4f550f38487e Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Fri, 29 Mar 2024 14:56:08 -0400 Subject: [PATCH 07/10] use continual anchoring mode for all envs --- common/aws/ddb/job.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/common/aws/ddb/job.go b/common/aws/ddb/job.go index b52b34e..ca57078 100644 --- a/common/aws/ddb/job.go +++ b/common/aws/ddb/job.go @@ -45,13 +45,11 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error { func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) { jobParams := map[string]interface{}{ - job.AnchorJobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker - job.AnchorJobParam_Overrides: map[string]string{}, - } - // Only enable continuous anchoring on Clay and Prod - if (jdb.env == cas.EnvTag_Tnet) || (jdb.env == cas.EnvTag_Prod) { - jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_AppMode] = models.AnchorAppMode_ContinualAnchoring - jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_SchedulerStopAfterNoOp] = "true" + job.AnchorJobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker + job.AnchorJobParam_Overrides: map[string]string{ + models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring, + models.AnchorOverrides_SchedulerStopAfterNoOp: "true", + }, } // If an override anchor contract address is available, pass it through to the job. if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found { From f3ba3e15ac57bb4e31b5fd506f21a73ca0d631a6 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Fri, 29 Mar 2024 15:09:19 -0400 Subject: [PATCH 08/10] add location constraint to s3 create + continue if bucket already exists --- common/aws/storage/s3.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/common/aws/storage/s3.go b/common/aws/storage/s3.go index 09fcfe7..09e1b83 100644 --- a/common/aws/storage/s3.go +++ b/common/aws/storage/s3.go @@ -4,8 +4,12 @@ import ( "bytes" "context" "encoding/json" + "errors" + "os" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/ceramicnetwork/go-cas/common" "github.com/ceramicnetwork/go-cas/models" @@ -55,7 +59,15 @@ func createBucket(ctx context.Context, client *s3.Client, bucket string) error { if _, err := client.CreateBucket(httpCtx, &s3.CreateBucketInput{ Bucket: aws.String(bucket), + CreateBucketConfiguration: &types.CreateBucketConfiguration{ + LocationConstraint: types.BucketLocationConstraint(os.Getenv("AWS_REGION")), + }, }); err != nil { + var ownedByYouErr *types.BucketAlreadyOwnedByYou + if errors.As(err, &ownedByYouErr) { + // This means that the bucket already exists and is owned by us, which is fine. + return nil + } return err } return nil From ca983cc48b0e5038ecec615d3ab1efc83dc8f81b Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 2 Apr 2024 17:55:28 -0400 Subject: [PATCH 09/10] set batch size to 10K to avoid sql max parameter count issues --- env/.env.dev | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/env/.env.dev b/env/.env.dev index b363c85..5a2065a 100644 --- a/env/.env.dev +++ b/env/.env.dev @@ -1,8 +1,8 @@ ANCHOR_AUDIT_ENABLED=true ANCHOR_BATCH_LINGER=1h -ANCHOR_BATCH_SIZE=1048576 +ANCHOR_BATCH_SIZE=10240 ANCHOR_BATCH_MONITOR_TICK=10s -MAX_ANCHOR_WORKERS=2 +MAX_ANCHOR_WORKERS=4 LONG_QUEUE_MAX_RECEIVE_COUNT=2 DB_NAME=ceramicanchorservicedev DB_PORT=5432 From b45628bebad704107ae9f4a710c3899dc75d1933 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 16 Apr 2024 14:58:38 -0400 Subject: [PATCH 10/10] make polling db load limit configurable --- services/poll.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/services/poll.go b/services/poll.go index 9a47a5a..e2d57ba 100644 --- a/services/poll.go +++ b/services/poll.go @@ -4,13 +4,14 @@ import ( "context" "fmt" "os" + "strconv" "time" "github.com/ceramicnetwork/go-cas/models" ) const defaultPollTick = time.Hour -const dbLoadLimit = 1000 +const defaultDbLoadLimit = 1000 // Only look for unprocessed requests as far back as 2 days const startCheckpointDelta = 48 * time.Hour @@ -25,6 +26,7 @@ type RequestPoller struct { logger models.Logger notif models.Notifier tick time.Duration + loadLimit int endCheckpointDelta time.Duration } @@ -47,6 +49,12 @@ func NewRequestPoller( pollTick = parsedPollTick } } + dbLoadLimit := defaultDbLoadLimit + if configDbLoadLimit, found := os.LookupEnv("DB_LOAD_LIMIT"); found { + if parsedDbLoadLimit, err := strconv.Atoi(configDbLoadLimit); err == nil { + dbLoadLimit = parsedDbLoadLimit + } + } return &RequestPoller{ anchorDb: anchorDb, stateDb: stateDb, @@ -54,6 +62,7 @@ func NewRequestPoller( logger: logger, notif: notif, tick: pollTick, + loadLimit: dbLoadLimit, endCheckpointDelta: endCheckpointDelta, } } @@ -82,7 +91,7 @@ func (p RequestPoller) Run(ctx context.Context) { models.RequestStatus_Pending, startCheckpoint, endCheckpoint, - dbLoadLimit, + defaultDbLoadLimit, ) if err != nil { p.logger.Errorf("error loading requests: %v", err)