8000 Make throughputStress resumable by stephanos · Pull Request #143 · temporalio/omes · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Make throughputStress resumable #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion loadgen/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,6 @@ func VisibilityCountIsEventually(
return fmt.Errorf("expected %d workflows in visibility, got %d after waiting %v",
expectedCount, visibilityCount.Count, waitAtMost)
}
time.Sleep(5 * time.Second)
time.Sleep(1 * time.Second)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1s seems like a good compromise between too slow and too fast.

}
}
3 changes: 3 additions & 0 deletions loadgen/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type ScenarioInfo struct {
Namespace string
// Path to the root of the omes dir
RootPath string
// Optional callback to receive status updates from the scenario. If, when and with what
// data (type) it is invoked are specific to the scenario. See `throughputStress` for an example.
StatusCallback func(any)
}

func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int {
Expand Down
138 changes: 95 additions & 43 deletions scenarios/throughput_stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,29 @@ import (
"sync/atomic"
"time"

"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/loadgen/throughputstress"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"

"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/loadgen/throughputstress"
"go.temporal.io/sdk/client"
)

// --option arguments
const (
IterFlag = "internal-iterations"
IterTimeout = "internal-iterations-timeout"
IterFlag = "internal-iterations"
IterTimeout = "internal-iterations-timeout"
// IterResumeAt is the iteration from which to resume a previous run from. If set, it will skip the
// run initialization and start the next iteration starting from the given iteration.
IterResumeAt = "internal-iterations-resume-at"
SkipSleepFlag = "skip-sleep"
CANEventFlag = "continue-as-new-after-event-count"
NexusEndpointFlag = "nexus-endpoint"
// WorkflowIDPrefix is the prefix for each run's workflow ID. Use it to ensure that the workflow IDs are unique.
WorkflowIDPrefix = "workflow-id-prefix"
// WorkflowCountResumeAt is the number of completed workflows from a previous run that is being resumed.
WorkflowCountResumeAt = "workflow-count-resume-at"
// VisibilityVerificationTimeout is the timeout for verifying the total visibility count at the end of the scenario.
// It needs to account for a backlog of tasks and, if used, ElasticSearch's eventual consistency.
VisibilityVerificationTimeout = "visibility-count-timeout"
Expand All @@ -41,18 +45,42 @@ const (
defaultWorkflowIDPrefix = "throughputStress"
)

type ThroughputStressScenarioStatusUpdate struct {
// CompletedIteration is the iteration that has been completed.
CompletedIteration int
// CompletedWorkflows is the total number of workflows that have been completed so far.
CompletedWorkflows uint64
}

type tpsExecutor struct {
workflowCount atomic.Uint64
}

// Run executes the throughput stress scenario.
//
// It executes `throughputStress` workflows in parallel - up to the configured maximum cocurrency limit - and
// waits for the results. At the end, it verifies that the total number of executed workflows matches Visibility's count.
//
// To resume a previous run, set the following options:
//
// --option internal-iterations-resume-at=<value>
// --option workflow-count-resume-at=<value>
//
// Note that the caller is responsible for adjusting the scenario's iterations/timeout accordingly.
func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error {
// Parse scenario options
internalIterations := info.ScenarioOptionInt(IterFlag, 5)
internalIterTimeout := info.ScenarioOptionDuration(IterTimeout, time.Minute)
internalIterResumeFrom := info.ScenarioOptionInt(IterResumeAt, 0)
_, resumingFromPreviousRun := info.ScenarioOptions[IterResumeAt]
continueAsNewCount := info.ScenarioOptionInt(CANEventFlag, 120)
workflowIDPrefix := cmp.Or(info.ScenarioOptions[WorkflowIDPrefix], defaultWorkflowIDPrefix)
workflowCountStartAt := info.ScenarioOptionInt(WorkflowCountResumeAt, 0)
nexusEndpoint := info.ScenarioOptions[NexusEndpointFlag] // disabled by default
skipSleep := info.ScenarioOptionBool(SkipSleepFlag, false)
if info.StatusCallback == nil {
info.StatusCallback = func(data any) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered doing this further up the callstack, but didn't find a good place to set the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move it into RunConfiguration (which has a "set defaults" step)?

}

var sleepActivityPerPriority *throughputstress.SleepActivity
if sleepActivitiesWithPriorityStr, ok := info.ScenarioOptions[SleepActivityPerPriorityJsonFlag]; ok {
Expand All @@ -69,58 +97,34 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
}
timeout := time.Duration(1*internalIterations) * internalIterTimeout

// Make sure the search attribute is registered
attribMap := map[string]enums.IndexedValueType{
ThroughputStressScenarioIdSearchAttribute: enums.INDEXED_VALUE_TYPE_KEYWORD,
}

_, err = info.Client.OperatorService().AddSearchAttributes(ctx,
&operatorservice.AddSearchAttributesRequest{
Namespace: info.Namespace,
SearchAttributes: attribMap,
})
var deniedErr *serviceerror.PermissionDenied
var alreadyErr *serviceerror.AlreadyExists

if errors.As(err, &alreadyErr) {
info.Logger.Infof("Search Attribute %s already exists", ThroughputStressScenarioIdSearchAttribute)
} else if err != nil {
info.Logger.Warnf("Failed to add Search Attribute %s: %v", ThroughputStressScenarioIdSearchAttribute, err)

if !errors.As(err, &deniedErr) {
// Initialize the scenario run.
if !resumingFromPreviousRun {
err = t.initFirstRun(ctx, info)
if err != nil {
return err
}
} else {
info.Logger.Infof("Search Attribute %s added", ThroughputStressScenarioIdSearchAttribute)
}

// Complain if there are already existing workflows with the provided run id
visibilityCount, err := info.Client.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{
Namespace: info.Namespace,
Query: fmt.Sprintf("%s='%s'", ThroughputStressScenarioIdSearchAttribute, info.RunID),
})
if err != nil {
return err
}
if visibilityCount.Count > 0 {
return fmt.Errorf("there are already %d workflows with scenario Run ID '%s'",
visibilityCount.Count, info.RunID)
info.Logger.Info("Resuming from previous run")
t.workflowCount.Add(uint64(workflowCountStartAt))
}

// Start the scenario run.
genericExec := &loadgen.GenericExecutor{
DefaultConfiguration: loadgen.RunConfiguration{
Iterations: 20,
MaxConcurrent: 5,
},
Execute: func(ctx context.Context, run *loadgen.Run) error {
wfID := fmt.Sprintf("%s-%s-%d", workflowIDPrefix, run.RunID, run.Iteration)
curIteration := internalIterResumeFrom + run.Iteration
wfID := fmt.Sprintf("%s/%s/iter-%d", workflowIDPrefix, run.RunID, curIteration)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the workflow ID a bit more expressive.


var result throughputstress.WorkflowOutput
err = run.ExecuteAnyWorkflow(ctx,
client.StartWorkflowOptions{
ID: wfID,
TaskQueue: run.TaskQueue(),
WorkflowExecutionTimeout: timeout,
WorkflowExecutionErrorWhenAlreadyStarted: true,
WorkflowExecutionErrorWhenAlreadyStarted: !resumingFromPreviousRun, // don't fail when resuming
SearchAttributes: map[string]interface{}{
ThroughputStressScenarioIdSearchAttribute: run.ScenarioInfo.RunID,
},
Expand All @@ -134,8 +138,17 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
NexusEndpoint: nexusEndpoint,
SleepActivityPerPriority: sleepActivityPerPriority,
})
// The 1 is for the final workflow run
t.workflowCount.Add(uint64(result.TimesContinued + result.ChildrenSpawned + 1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed when testing that this is actually wrong as it ignores the error.


if err == nil {
// The 1 is for the final workflow run.
curTotal := t.workflowCount.Add(uint64(result.TimesContinued + result.ChildrenSpawned + 1))

info.StatusCallback(ThroughputStressScenarioStatusUpdate{
CompletedIteration: curIteration,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller will need to make sure to store the "low water mark"; and not just the latest iteration.

CompletedWorkflows: curTotal,
})
}

return err
},
}
Expand All @@ -144,7 +157,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
return err
}

// Post-scenario, verify visibility counts
// Post-scenario, verify reported count from Visibility matches the expected count.
totalWorkflowCount := t.workflowCount.Load()
info.Logger.Info("Total workflows executed: ", totalWorkflowCount)
return loadgen.VisibilityCountIsEventually(
Expand All @@ -160,6 +173,45 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
)
}

func (t *tpsExecutor) initFirstRun(ctx context.Context, info loadgen.ScenarioInfo) error {
// Add search attribute, if it doesn't exist yet, to query for workflows by run ID.
_, err := info.Client.OperatorService().AddSearchAttributes(ctx,
&operatorservice.AddSearchAttributesRequest{
Namespace: info.Namespace,
SearchAttributes: map[string]enums.IndexedValueType{
ThroughputStressScenarioIdSearchAttribute: enums.INDEXED_VALUE_TYPE_KEYWORD,
},
})
var deniedErr *serviceerror.PermissionDenied
var alreadyErr *serviceerror.AlreadyExists
if errors.As(err, &alreadyErr) {
info.Logger.Infof("Search Attribute %s already exists", ThroughputStressScenarioIdSearchAttribute)
} else if err != nil {
info.Logger.Warnf("Failed to add Search Attribute %s: %v", ThroughputStressScenarioIdSearchAttribute, err)
if !errors.As(err, &deniedErr) {
return err
}
} else {
info.Logger.Infof("Search Attribute %s added", ThroughputStressScenarioIdSearchAttribute)
}

// Complain if there are already existing workflows with the provided run id; unless resuming.
workflowCountQry := fmt.Sprintf("%s='%s'", ThroughputStressScenarioIdSearchAttribute, info.RunID)
visibilityCount, err := info.Client.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{
Namespace: info.Namespace,
Query: workflowCountQry,
})
if err != nil {
return err
}
if visibilityCount.Count > 0 {
return fmt.Errorf("there are already %d workflows with scenario Run ID '%s'",
visibilityCount.Count, info.RunID)
}

return nil
}

func init() {
loadgen.MustRegisterScenario(loadgen.Scenario{
Description: fmt.Sprintf(
Expand Down
1 change: 1 addition & 0 deletions workers/go/throughputstress/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor
// run to the child.
attrs := workflow.GetInfo(ctx).SearchAttributes.IndexedFields
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("%s/child-%d", workflow.GetInfo(ctx).WorkflowExecution.ID, output.ChildrenSpawned),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helps correlate the child workflows with the parent.

SearchAttributes: map[string]interface{}{
scenarios.ThroughputStressScenarioIdSearchAttribute: attrs[scenarios.ThroughputStressScenarioIdSearchAttribute],
},
Expand Down
Loading
0