This lambda function is intended to configure an OpenSearch cluster, beyond what is natively exposed/available through cloud provisioning frameworks:
- configure alerting: using SNS topic
- create mapping: define field types, such as
double
,date
, and more - initialize dashboard: initializes an empty OpenSearch Dashboard by creating a required Index Pattern if not exists
- document deletion: specify documents within a date/time range using a
match
query condition to delete - helper functions: the overall codebase has defined numerous get/set/delete functions that can be invoked as desired to satisfy requirements beyond configuring alerting, or creating mapping
In general, this function can be executed ad-hoc, or as a custom resource invoked by CloudFormation or Terraform. While below will emphasize on CloudFormation, Terraform variation via aws_cloudformation_stack
, or custom providers are left as an exercise.
The following CloudFormation segment will configure alerting using an existing SNS topic:
OpenSearchConfigurationFunction:
Type: AWS::Lambda::Function
Properties:
Description: custom lambda resource configure opensearch cluster
Code:
S3Bucket: !Ref DeployBucket
S3Key: !Ref S3KeyOpenSearchConfiguration
Layers: !If
- UseTracing
- - !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/aws-xray-sdk}}'
- !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests}}'
- !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests_aws4auth}}'
- - !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests}}'
- !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests_aws4auth}}'
FunctionName: !Ref FunctionNameOpenSearchConfiguration
Handler: !Ref Handler
MemorySize: !Ref MemorySize
Role: !GetAtt OpenSearchConfigurationExecutionRole.Arn
Runtime: !Ref Runtime
Timeout: !Ref Timeout
OpenSearchConfiguration:
Type: Custom::OpenSearchConfigure
Properties:
ServiceToken: !GetAtt OpenSearchConfigurationFunction.Arn
Region: !Ref AWS::Region
OpenSearchDomain: !Sub https://${OpenSearch.Outputs.NestedOpenSearchDomainEndpoint}
OpenSearchIndex: !Ref OpenSearchIndex
SnsAlertName: !Ref OpenSearchIndex
SnsTopicArn: !GetAtt OpenSearchConfigurationNotification.Outputs.NestedSnsTopicArn
SnsRoleArn: !GetAtt OpenSearchConfigurationRole.Arn
MonitorName: OverallFailures
MonitorInterval: !Ref OpenSearchMonitorInterval
MonitorUnit: !Ref OpenSearchMonitorUnit
MonitorCondition: !Ref OpenSearchMonitorCondition
MonitorRangeField: !Ref OpenSearchMonitorRangeField
MonitorRangeFrom: now-1h
MonitorRangeTo: now
MonitorQueryTerms: !Sub
- '{
"${OpenSearchMonitorTerm}": ["${OpenSearchMonitorTermValue}"],
"boost": 1.0
}'
- OpenSearchMonitorTerm: !Ref OpenSearchMonitorTerm
OpenSearchMonitorTermValue: !Ref OpenSearchMonitorTermValue
MonitorTriggerSubject: !Sub ${OpenSearchIndex} detected ${OpenSearchMonitorTermValue}
MonitorTriggerMessage: !Sub
- '
specified ${OpenSearchMonitorTerm} detected ${OpenSearchMonitorTermValue}
satisfying ${OpenSearchMonitorCondition} within ${OpenSearchMonitorInterval}
${OpenSearchMonitorUnit}
'
- OpenSearchMonitorTerm: !Ref OpenSearchMonitorTerm
OpenSearchMonitorTermValue: !Ref OpenSearchMonitorTermValue
OpenSearchMonitorCondition: !Ref OpenSearchMonitorCondition
OpenSearchMonitorInterval: !Ref OpenSearchMonitorInterval
OpenSearchMonitorUnit: !Ref OpenSearchMonitorUnit
DependsOn: [OpenSearch, OpenSearchConfigurationFunction]
An OpenSearch cluster can be defined via CloudFormation using the AWS::OpenSearchService::Domain
. However, there are no attributes that allow index fields to be specified. This can be problematic, since all fields will default as a string
type, preventing the ability to create time-based visualizations within OpenSearch Dashboards.
The following example shows how to define a date
type field, thus opening the ability to configure time-based visualizations.
OpenSearchConfigurationFunction:
Type: AWS::Lambda::Function
Properties:
Description: custom lambda resource configure opensearch cluster
Code:
S3Bucket: !Ref DeployBucket
S3Key: !Ref S3KeyOpenSearchConfiguration
Layers: !If
- UseTracing
- - !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/aws-xray-sdk}}'
- !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests}}'
- !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests_aws4auth}}'
- - !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests}}'
- !Sub '{{resolve:ssm:/${StackSuffixName}/lambda_layer/requests_aws4auth}}'
FunctionName: !Ref FunctionNameOpenSearchConfiguration
Handler: !Ref Handler
MemorySize: !Ref MemorySize
Role: !GetAtt OpenSearchConfigurationExecutionRole.Arn
Runtime: !Ref Runtime
Timeout: !Ref Timeout
OpenSearchConfiguration:
Type: Custom::OpenSearchConfigure
Properties:
ServiceToken: !GetAtt OpenSearchConfigurationFunction.Arn
Region: !Ref AWS::Region
OpenSearchDomain: !Sub https://${OpenSearch.Outputs.NestedOpenSearchDomainEndpoint}
OpenSearchIndex: !Ref OpenSearchIndex
Mappings: !Sub
- '{
"properties": {
"${OpenSearchTimeStampField}": {
"type": "date",
"format": "${OpenSearchTimeStampFieldFormat}"
},
"${OpenSearchPriceField}": {
"type": "double"
},
"${OpenSearchDateField}" : {
"type": "date",
"format": "${OpenSearchDateFieldFormat}"
}
}
}'
- OpenSearchTimeStampField: !Ref OpenSearchTimeStampField
OpenSearchTimeStampFieldFormat: !Ref OpenSearchTimeStampFieldFormat
OpenSearchPriceField: !Ref OpenSearchPriceField
OpenSearchDateField: !Ref OpenSearchDateField
OpenSearchDateFieldFormat: !Ref OpenSearchDateFieldFormat
DependsOn: [OpenSearch, OpenSearchConfigurationFunction]
While it's possible to fully automate the creation of visualizations, and likely subsequent attachment to desired dashboard(s), this codebase prefers a more minimalist approach. Specifically, any small change in a visualization can easily become many magnitudes complicated for automation. Rather, this codebase can setup up a default Index Pattern if one does not exist for a specified Index. Using the Index Pattern, an OpenSearch Dashboard is then created. The provided lambda.py
creates an empty dashboard:
if initialize_dashboard:
#
# create index pattern: used by dashboard
#
index_id = index.replace('*', '').rstrip('-').rstrip('_')
current_index = check_index_pattern(endpoint, awsauth, index_id=index_id, title=index)
if current_index and current_index != index_id:
r = set_index_pattern(endpoint, awsauth, index_id=index_id, title=index)
executions.append({'set_index_pattern': True} if r else {'set_index_pattern': False})
#
# create dashboard: if index and index pattern exists
#
if (
current_index and
check_index(endpoint, awsauth, index) and
not check_dashboard(endpoint, awsauth, index)
):
r = set_dashboard(endpoint, awsauth, index)
executions.append({'set_dashboard': True} if r else {'set_dashboard': False})
else:
executions.append({'set_dashboard': False})
To turn-on this functionality, provide the InitializeDashboard
parameter:
OpenSearchConfiguration:
Type: Custom::OpenSearchConfigure
Properties:
ServiceToken: !GetAtt OpenSearchConfigurationFunction.Arn
Region: !Ref AWS::Region
OpenSearchDomain: !Sub https://${OpenSearch.Outputs.NestedOpenSearchDomainEndpoint}
OpenSearchIndex: !Ref OpenSearchIndex
InitalizeDashboard: true
DependsOn: [OpenSearch, OpenSearchConfigurationFunction]
It's possible to perform index rotation for an OpenSearch Index. However, this segment introduces the ability to delete documents within a specified index, satisfying a match
query condition. This can be particularly useful when only the latest N days of documents are desired. Consider the case of a producer sending data to a Kinesis Stream. This data stream could hypothetically be configured with a Kinesis Firehose to buffer data into a datalake for long term storage. However, the same data stream could be attached with an event source mapping to an OpenSearch index. This allows the ability to keep the most recent data for visualization using OpenSearch Dashboard, while retaining the ability to perform historical analysis from the tangential datalake.
The following example deletes all documents from a specified OpenSearchIndex
, where a message.utc
field from the index is older than or equal to 30 days from now:
OpenSearchConfiguration:
Type: Custom::OpenSearchConfigure
Properties:
ServiceToken: !GetAtt OpenSearchConfigurationFunction.Arn
Region: !Ref AWS::Region
OpenSearchDomain: !Sub https://${OpenSearch.Outputs.NestedOpenSearchDomainEndpoint}
OpenSearchIndex: !Ref OpenSearchIndex
DocumentDeleteRange: '{"message.utc": { "lte": "now-30d" }'
DependsOn: [OpenSearch, OpenSearchConfigurationFunction]
Note: the above requires message.utc
to be a date
field.
The following CloudWatch event rule triggers OpenSearchConfigurationFunction
using a cron expression:
OpenSearchDeleteDocumentRule:
Type: AWS::Events::Rule
Properties:
Name: !Sub ${FunctionNameOpenSearchConfiguration}DeleteIndexDocuments
Description: !Sub |
trigger ${FunctionNameOpenSearchConfiguration} to delete index
documents older than or equal to 30 days from now
ScheduleExpression: cron(0 4 * * ? *)
State: !Ref EnableEventRules
Targets:
- Id: OpenSearchDeleteDocumentRule
Arn: !GetAtt OpenSearchConfigurationFunction.Arn
Input: !Sub
- '{
"RequestType": "Create",
"ResourceProperties": {
"OpenSearchDomain": "https://${OpenSearchDomain}",
"OpenSearchIndex": "${OpenSearchIndex}",
"DocumentDeleteRange": {
"message.utc": { "lte": "now-30d" }
}
}
}'
- OpenSearchDomain: !GetAtt OpenSearch.Outputs.NestedOpenSearchDomainEndpoint
OpenSearchIndex: !Ref OpenSearchIndex
RetryPolicy:
MaximumEventAgeInSeconds: !Ref MaximumEventAgeInSeconds
MaximumRetryAttempts: !Ref MaximumRetryAttempts
DependsOn: [OpenSearchConfigurationFunction, OpenSearchConfiguration]
PermissionForEventsToInvokeLambda:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref OpenSearchConfigurationFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt OpenSearchDeleteDocumentRule.Arn
Please review functions defined in the following files, and invoke them as desired in lambda.py
:
While other versions of Amazon OpenSearch are likely compatible, they have not been explicitly tested. Feel free to open an issue, and adjust the README.md
to help denote which versions are compatible.
- OpenSearch 1.1
Please open an issue if a particular bug is found, or a feature is desired.