8000 feat: cas scaling ph 2 (s3 batching) by smrz2001 · Pull Request #44 · ceramicnetwork/go-cas · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: cas scaling ph 2 (s3 batching) #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/cas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/joho/godotenv"

"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"

"github.com/ceramicnetwork/go-cas"
"github.com/ceramicnetwork/go-cas/common/aws/config"
"github.com/ceramicnetwork/go-cas/common/aws/ddb"
"github.com/ceramicnetwork/go-cas/common/aws/queue"
"github.com/ceramicnetwork/go-cas/common/aws/storage"
"github.com/ceramicnetwork/go-cas/common/db"
"github.com/ceramicnetwork/go-cas/common/loggers"
"github.com/ceramicnetwork/go-cas/common/metrics"
Expand Down Expand Up @@ -53,10 +55,16 @@ func main() {
// HTTP clients
dynamoDbClient := dynamodb.NewFromConfig(awsCfg)
sqsClient := sqs.NewFromConfig(awsCfg)
s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = true
})

stateDb := ddb.NewStateDb(serverCtx, logger, dynamoDbClient)
jobDb := ddb.NewJobDb(serverCtx, logger, dynamoDbClient)

casBucket := "ceramic-" + os.Getenv(cas.Env_Env) + "-cas"
batchStore := storage.NewS3Store(serverCtx, logger, s3Client, casBucket)

discordHandler, err := notifs.NewDiscordHandler(logger)
if err != nil {
logger.Fatalf("error creating discord handler: %v", err)
Expand Down Expand Up @@ -127,6 +135,7 @@ func main() {
// The Ready and Batch queues will need larger visibility timeouts than the other queues. Requests pulled from the
// Ready queue will remain in flight for the batch linger duration. Batches from the Batch queue will remain in
// flight as long as it takes for them to get anchored.
//
// These queues will thus allow a smaller maximum receive count before messages fall through to the DLQ. Detecting
// failures is harder given the longer visibility timeouts, so it's important that they be detected as soon as
// possible.
Expand Down Expand Up @@ -250,7 +259,7 @@ func main() {
// are available in the queue. The 2 multiplier is arbitrary but will allow two batches worth of requests to be read
// and processed in parallel.
maxBatchQueueWorkers := anchorBatchSize * 2
batchingService := services.NewBatchingService(serverCtx, logger, batchQueue, metricService)
batchingService := services.NewBatchingService(serverCtx, batchQueue, batchStore, metricService, logger)
batchingConsumer := queue.NewConsumer(logger, readyQueue, batchingService.Batch, &maxBatchQueueWorkers)

// The Validation service reads from the Validate queue and posts to the Ready and Status queues
Expand Down
12 changes: 7 additions & 5 deletions common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.JobRepository = &JobDatabase{}

type JobDatabase struct {
ddbClient *dynamodb.Client
table string
Expand All @@ -43,15 +45,15 @@ func (jdb *JobDatabase) createJobTable(ctx context.Context) error {

func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
jobParams := map[string]interface{}{
job.JobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
job.JobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
job.AnchorJobParam_Version: models.WorkerVersion, // this will launch a CASv5 Worker
job.AnchorJobParam_Overrides: map[string]string{
models.AnchorOverrides_AppMode: models.AnchorAppMode_ContinualAnchoring,
models.AnchorOverrides_SchedulerStopAfterNoOp: "true",
},
}
// If an override anchor contract address is available, pass it through to the job.
if contractAddress, found := os.LookupEnv(models.Env_AnchorContractAddress); found {
jobParams[job.JobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
jobParams[job.AnchorJobParam_Overrides].(map[string]string)[models.AnchorOverrides_ContractAddress] = contractAddress
}
newJob := models.NewJob(job.JobType_Anchor, jobParams)
attributeValues, err := attributevalue.MarshalMapWithOptions(newJob, func(options *attributevalue.EncoderOptions) {
Expand All @@ -72,7 +74,7 @@ func (jdb *JobDatabase) CreateJob(ctx context.Context) (string, error) {
if err != nil {
return "", err
} else {
return newJob.Job, nil
return newJob.JobId, nil
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions common/aws/ddb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.StateRepository = &StateDatabase{}

type StateDatabase struct {
client *dynamodb.Client
checkpointTable string
Expand Down
2 changes: 2 additions & 0 deletions common/aws/queue/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.QueueMonitor = &Monitor{}

type Monitor struct {
queueUrl string
client *sqs.Client
Expand Down
74 changes: 74 additions & 0 deletions common/aws/storage/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package storage

import (
"bytes"
"context"
"encoding/json"
"errors"
"os"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"

"github.com/ceramicnetwork/go-cas/common"
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.KeyValueRepository = &S3Store{}

type S3Store struct {
client *s3.Client
logger models.Logger
bucket string
}

func NewS3Store(ctx context.Context, logger models.Logger, s3Client *s3.Client, bucket string) *S3Store {
// Create the bucket if it doesn't exist
if err := createBucket(ctx, s3Client, bucket); err != nil {
logger.Fatalf("failed to create bucket %s: %v", bucket, err)
}
return &S3Store{s3Client, logger, bucket}
}

func (s *S3Store) Store(ctx context.Context, key string, value interface{}) error {
if jsonBytes, err := json.Marshal(value); err != nil {
return err
} else {
httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime)
defer httpCancel()

putObjectIn := s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(jsonBytes),
ContentType: aws.String("application/json"),
}
if _, err = s.client.PutObject(httpCtx, &putObjectIn); err != nil {
return err
} else {
s.logger.Debugf("stored key: %s", key)
}
}
return nil
}

func createBucket(ctx context.Context, client *s3.Client, bucket string) error {
httpCtx, httpCancel := context.WithTimeout(ctx, common.DefaultRpcWaitTime)
defer httpCancel()

if _, err := client.CreateBucket(httpCtx, &s3.CreateBucketInput{
Bucket: aws.String(bucket),
CreateBucketConfiguration: &types.CreateBucketConfiguration{
LocationConstraint: types.BucketLocationConstraint(os.Getenv("AWS_REGION")),
},
}); err != nil {
var ownedByYouErr *types.BucketAlreadyOwnedByYou
if errors.As(err, &ownedByYouErr) {
// This means that the bucket already exists and is owned by us, which is fine.
return nil
}
return err
}
return nil
}
2 changes: 2 additions & 0 deletions common/db/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.AnchorRepository = &AnchorDatabase{}

type AnchorDatabase struct {
opts anchorDbOpts
logger models.Logger
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/otl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.MetricService = &OtlMetricService{}

type OtlMetricService struct {
caller string
meterProvider *sdk.MeterProvider
Expand Down
2 changes: 2 additions & 0 deletions common/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/ceramicnetwork/go-cas/models"
)

var _ models.Notifier = &DiscordHandler{}

type DiscordColor int

const (
Expand Down
2 changes: 1 addition & 1 deletion env/.env.dev
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ANCHOR_AUDIT_ENABLED=true
ANCHOR_BATCH_LINGER=1h
ANCHOR_BATCH_SIZE=6144
ANCHOR_BATCH_SIZE=16383
ANCHOR_BATCH_MONITOR_TICK=10s
MAX_ANCHOR_WORKERS=5
LONG_QUEUE_VISIBILITY_TIMEOUT=2h
Expand Down
2 changes: 1 addition & 1 deletion env/.env.prod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ANCHOR_AUDIT_ENABLED=true
ANCHOR_BATCH_LINGER=1h
ANCHOR_BATCH_SIZE=6144
ANCHOR_BATCH_SIZE=16383
ANCHOR_BATCH_MONITOR_TICK=10s
MAX_ANCHOR_WORKERS=5
LONG_QUEUE_VISIBILITY_TIMEOUT=2h
Expand Down
2 changes: 1 addition & 1 deletion env/.env.tnet
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ANCHOR_AUDIT_ENABLED=true
ANCHOR_BATCH_LINGER=1h
ANCHOR_BATCH_SIZE=6144
ANCHOR_BATCH_SIZE=16383
ANCHOR_BATCH_MONITOR_TICK=10s
MAX_ANCHOR_WORKERS=5
LONG_QUEUE_VISIBILITY_TIMEOUT=2h
Expand Down
47 changes: 26 additions & 21 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ replace github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3 => github.c

require (
dagger.io/dagger v0.7.1
github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20231026142209-de0959fd5908
github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20231026142209-de0959fd5908
github.com/3box/pipeline-tools/cd/manager/common/aws/utils v0.0.0-20240320183457-7556cceed3e2
github.com/3box/pipeline-tools/cd/manager/common/job v0.0.0-20240320183457-7556cceed3e2
github.com/abevier/go-sqs v0.0.0-20230602180220-c0264a2611a3
github.com/abevier/tsk v0.0.0-20230712145722-249b1e98b01c
github.com/alexflint/go-arg v1.4.2
github.com/aws/aws-sdk-go-v2 v1.21.2
github.com/aws/aws-sdk-go-v2/config v1.18.4
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.7
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.23.0
github.com/aws/aws-sdk-go-v2/service/ecr v1.18.11
github.com/aws/aws-sdk-go-v2/service/sqs v1.22.0
github.com/aws/aws-sdk-go-v2 v1.27.0
github.com/aws/aws-sdk-go-v2/config v1.27.15
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.17
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.32.3
github.com/aws/aws-sdk-go-v2/service/ecr v1.28.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.54.2
github.com/aws/aws-sdk-go-v2/service/sqs v1.32.2
github.com/disgoorg/disgo v0.16.4
github.com/disgoorg/snowflake/v2 v2.0.1
github.com/go-playground/validator v9.31.0+incompatible
Expand Down Expand Up @@ -46,19 +47,23 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
github.com/alexflint/go-scalar v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.37 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect
github.com/aws/smithy-go v1.15.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.15 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.8 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.8 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.9 // indirect
github.com/aws/smithy-go v1.20.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
Expand Down
Loading
Loading
0