Event Targets for Amazon EventBridge
This library contains integration classes to send Amazon EventBridge to any
number of supported AWS Services. Instances of these classes should be passed
to the rule.addTarget() method.
Currently supported are:
See the README of the aws-cdk-lib/aws-events library for more information on
EventBridge.
Event retry policy and using dead-letter queues
The Codebuild, CodePipeline, Lambda, Kinesis Data Streams, StepFunctions, LogGroup, SQSQueue, SNSTopic and ECSTask targets support attaching a dead letter queue and setting retry policies. See the lambda example. Use escape hatches for the other target types.
Invoke a Lambda function
Use the LambdaFunction target to invoke a lambda function.
The code snippet below creates an event rule with a Lambda function as a target
triggered for every events from aws.ec2 source. You can optionally attach a
dead letter queue.
import aws_cdk.aws_lambda as lambda_
fn = lambda_.Function(self, "MyFunc",
runtime=lambda_.Runtime.NODEJS_LATEST,
handler="index.handler",
code=lambda_.Code.from_inline("exports.handler = handler.toString()")
)
rule = events.Rule(self, "rule",
event_pattern=events.EventPattern(
source=["aws.ec2"]
)
)
queue = sqs.Queue(self, "Queue")
rule.add_target(targets.LambdaFunction(fn,
dead_letter_queue=queue, # Optional: add a dead letter queue
max_event_age=Duration.hours(2), # Optional: set the maxEventAge retry policy
retry_attempts=2
))
Log an event into a LogGroup
Use the LogGroup target to log your events in a CloudWatch LogGroup.
For example, the following code snippet creates an event rule with a CloudWatch LogGroup as a target.
Every events sent from the aws.ec2 source will be sent to the CloudWatch LogGroup.
import aws_cdk.aws_logs as logs
log_group = logs.LogGroup(self, "MyLogGroup",
log_group_name="MyLogGroup"
)
rule = events.Rule(self, "rule",
event_pattern=events.EventPattern(
source=["aws.ec2"]
)
)
rule.add_target(targets.CloudWatchLogGroup(log_group))
A rule target input can also be specified to modify the event that is sent to the log group. Unlike other event targets, CloudWatchLogs requires a specific input template format.
import aws_cdk.aws_logs as logs
# log_group: logs.LogGroup
# rule: events.Rule
rule.add_target(targets.CloudWatchLogGroup(log_group,
log_event=targets.LogGroupTargetInput.from_object_v2(
timestamp=events.EventField.from_path("$.time"),
message=events.EventField.from_path("$.detail-type")
)
))
If you want to use static values to overwrite the message make sure that you provide a string
value.
import aws_cdk.aws_logs as logs
# log_group: logs.LogGroup
# rule: events.Rule
rule.add_target(targets.CloudWatchLogGroup(log_group,
log_event=targets.LogGroupTargetInput.from_object_v2(
message=JSON.stringify({
"CustomField": "CustomValue"
})
)
))
The cloudwatch log event target will create an AWS custom resource internally which will default
to set installLatestAwsSdk to true. This may be problematic for CN partition deployment. To
workaround this issue, set installLatestAwsSdk to false.
import aws_cdk.aws_logs as logs
# log_group: logs.LogGroup
# rule: events.Rule
rule.add_target(targets.CloudWatchLogGroup(log_group,
install_latest_aws_sdk=False
))
Start a CodeBuild build
Use the CodeBuildProject target to trigger a CodeBuild project.
The code snippet below creates a CodeCommit repository that triggers a CodeBuild project on commit to the master branch. You can optionally attach a dead letter queue.
import aws_cdk.aws_codebuild as codebuild
import aws_cdk.aws_codecommit as codecommit
repo = codecommit.Repository(self, "MyRepo",
repository_name="aws-cdk-codebuild-events"
)
project = codebuild.Project(self, "MyProject",
source=codebuild.Source.code_commit(repository=repo)
)
dead_letter_queue = sqs.Queue(self, "DeadLetterQueue")
# trigger a build when a commit is pushed to the repo
on_commit_rule = repo.on_commit("OnCommit",
target=targets.CodeBuildProject(project,
dead_letter_queue=dead_letter_queue
),
branches=["master"]
)
Start a CodePipeline pipeline
Use the CodePipeline target to trigger a CodePipeline pipeline.
The code snippet below creates a CodePipeline pipeline that is triggered every hour
import aws_cdk.aws_codepipeline as codepipeline
pipeline = codepipeline.Pipeline(self, "Pipeline")
rule = events.Rule(self, "Rule",
schedule=events.Schedule.expression("rate(1 hour)")
)
rule.add_target(targets.CodePipeline(pipeline))
Start a StepFunctions state machine
Use the SfnStateMachine target to trigger a State Machine.
The code snippet below creates a Simple StateMachine that is triggered every minute with a dummy object as input. You can optionally attach a dead letter queue to the target.
import aws_cdk.aws_iam as iam
import aws_cdk.aws_stepfunctions as sfn
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(Duration.minutes(1))
)
dlq = sqs.Queue(self, "DeadLetterQueue")
role = iam.Role(self, "Role",
assumed_by=iam.ServicePrincipal("events.amazonaws.com")
)
state_machine = sfn.StateMachine(self, "SM",
definition=sfn.Wait(self, "Hello", time=sfn.WaitTime.duration(Duration.seconds(10)))
)
rule.add_target(targets.SfnStateMachine(state_machine,
input=events.RuleTargetInput.from_object({"SomeParam": "SomeValue"}),
dead_letter_queue=dlq,
role=role
))
Queue a Batch job
Use the BatchJob target to queue a Batch job.
The code snippet below creates a Simple JobQueue that is triggered every hour with a dummy object as input. You can optionally attach a dead letter queue to the target.
import aws_cdk.aws_ec2 as ec2
import aws_cdk.aws_ecs as ecs
import aws_cdk.aws_batch as batch
from aws_cdk.aws_ecs import ContainerImage
# vpc: ec2.Vpc
compute_environment = batch.FargateComputeEnvironment(self, "ComputeEnv",
vpc=vpc
)
job_queue = batch.JobQueue(self, "JobQueue",
priority=1,
compute_environments=[batch.OrderedComputeEnvironment(
compute_environment=compute_environment,
order=1
)
]
)
job_definition = batch.EcsJobDefinition(self, "MyJob",
container=batch.EcsEc2ContainerDefinition(self, "Container",
image=ecs.ContainerImage.from_registry("test-repo"),
memory=cdk.Size.mebibytes(2048),
cpu=256
)
)
queue = sqs.Queue(self, "Queue")
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(Duration.hours(1))
)
rule.add_target(targets.BatchJob(job_queue.job_queue_arn, job_queue, job_definition.job_definition_arn, job_definition,
dead_letter_queue=queue,
event=events.RuleTargetInput.from_object({"SomeParam": "SomeValue"}),
retry_attempts=2,
max_event_age=Duration.hours(2)
))
Invoke an API Gateway REST API
Use the ApiGateway target to trigger a REST API.
The code snippet below creates a Api Gateway REST API that is invoked every hour.
import aws_cdk.aws_apigateway as api
import aws_cdk.aws_lambda as lambda_
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(Duration.minutes(1))
)
fn = lambda_.Function(self, "MyFunc",
handler="index.handler",
runtime=lambda_.Runtime.NODEJS_LATEST,
code=lambda_.Code.from_inline("exports.handler = e => {}")
)
rest_api = api.LambdaRestApi(self, "MyRestAPI", handler=fn)
dlq = sqs.Queue(self, "DeadLetterQueue")
rule.add_target(
targets.ApiGateway(rest_api,
path="/*/test",
method="GET",
stage="prod",
path_parameter_values=["path-value"],
header_parameters={
"Header1": "header1"
},
query_string_parameters={
"QueryParam1": "query-param-1"
},
dead_letter_queue=dlq
))
Invoke an API Gateway V2 HTTP API
Use the ApiGatewayV2 target to trigger a HTTP API.
import aws_cdk.aws_apigatewayv2 as apigwv2
# http_api: apigwv2.HttpApi
# rule: events.Rule
rule.add_target(targets.ApiGatewayV2(http_api))
Invoke an AWS API
Use the AwsApi target to make direct AWS API calls from EventBridge rules. This is useful for invoking AWS services that don’t have a dedicated EventBridge target.
Basic Usage
The following example shows how to update an ECS service when a rule is triggered:
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(Duration.hours(1))
)
rule.add_target(targets.AwsApi(
service="ECS",
action="updateService",
parameters={
"service": "my-service",
"force_new_deployment": True
}
))
IAM Permissions
By default, the AwsApi target automatically creates the necessary IAM permissions based on the service and action you specify. The permission format follows the pattern: service:Action.
For example:
ECSservice withupdateServiceaction →ecs:UpdateServicepermissionRDSservice withcreateDBSnapshotaction →rds:CreateDBSnapshotpermission
Custom IAM Policy
In some cases, you may need to provide a custom IAM policy statement, especially when:
You need to restrict permissions to specific resources (instead of
*)The service requires additional permissions beyond the main action
You want more granular control over the permissions
import aws_cdk.aws_iam as iam
import aws_cdk.aws_s3 as s3
# rule: events.Rule
# bucket: s3.Bucket
rule.add_target(targets.AwsApi(
service="s3",
action="GetBucketEncryption",
parameters={
"Bucket": bucket.bucket_name
},
policy_statement=iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:GetEncryptionConfiguration"],
resources=[bucket.bucket_arn]
)
))
Invoke an API Destination
Use the targets.ApiDestination target to trigger an external API. You need to
create an events.Connection and events.ApiDestination as well.
The code snippet below creates an external destination that is invoked every hour.
connection = events.Connection(self, "Connection",
authorization=events.Authorization.api_key("x-api-key", SecretValue.secrets_manager("ApiSecretName")),
description="Connection with API Key x-api-key"
)
destination = events.ApiDestination(self, "Destination",
connection=connection,
endpoint="https://example.com",
description="Calling example.com with API key x-api-key"
)
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(Duration.minutes(1)),
targets=[targets.ApiDestination(destination)]
)
You can also import an existing connection and destination to create additional rules:
connection = events.Connection.from_event_bus_arn(self, "Connection", "arn:aws:events:us-east-1:123456789012:event-bus/EventBusName", "arn:aws:secretsmanager:us-east-1:123456789012:secret:SecretName-f3gDy9")
api_destination_arn = "arn:aws:events:us-east-1:123456789012:api-destination/DestinationName/11111111-1111-1111-1111-111111111111"
api_destination_arn_for_policy = "arn:aws:events:us-east-1:123456789012:api-destination/DestinationName"
destination = events.ApiDestination.from_api_destination_attributes(self, "Destination",
api_destination_arn=api_destination_arn,
connection=connection,
api_destination_arn_for_policy=api_destination_arn_for_policy
)
rule = events.Rule(self, "OtherRule",
schedule=events.Schedule.rate(Duration.minutes(10)),
targets=[targets.ApiDestination(destination)]
)
Invoke an AppSync GraphQL API
Use the AppSync target to trigger an AppSync GraphQL API. You need to
create an AppSync.GraphqlApi configured with AWS_IAM authorization mode.
The code snippet below creates an AppSync GraphQL API target that is invoked every hour, calling the publish mutation.
import aws_cdk.aws_appsync as appsync
api = appsync.GraphqlApi(self, "api",
name="api",
definition=appsync.Definition.from_file("schema.graphql"),
authorization_config=appsync.AuthorizationConfig(
default_authorization=appsync.AuthorizationMode(authorization_type=appsync.AuthorizationType.IAM)
)
)
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(targets.AppSync(api,
graph_qLOperation="mutation Publish($message: String!){ publish(message: $message) { message } }",
variables=events.RuleTargetInput.from_object({
"message": "hello world"
})
))
You can pass an existing role with the proper permissions to be used for the target when the rule is triggered. The code snippet below uses an existing role and grants permissions to use the publish Mutation on the GraphQL API.
import aws_cdk.aws_iam as iam
import aws_cdk.aws_appsync as appsync
api = appsync.GraphqlApi.from_graphql_api_attributes(self, "ImportedAPI",
graphql_api_id="<api-id>",
graphql_api_arn="<api-arn>",
graph_qLEndpoint_arn="<api-endpoint-arn>",
visibility=appsync.Visibility.GLOBAL,
modes=[appsync.AuthorizationType.IAM]
)
rule = events.Rule(self, "Rule", schedule=events.Schedule.rate(cdk.Duration.minutes(1)))
role = iam.Role(self, "Role", assumed_by=iam.ServicePrincipal("events.amazonaws.com"))
# allow EventBridge to use the `publish` mutation
api.grant_mutation(role, "publish")
rule.add_target(targets.AppSync(api,
graph_qLOperation="mutation Publish($message: String!){ publish(message: $message) { message } }",
variables=events.RuleTargetInput.from_object({
"message": "hello world"
}),
event_role=role
))
Put an event on an EventBridge bus
Use the EventBus target to route event to a different EventBus.
The code snippet below creates the scheduled event rule that route events to an imported event bus.
rule = events.Rule(self, "Rule",
schedule=events.Schedule.expression("rate(1 minute)")
)
rule.add_target(targets.EventBus(
events.EventBus.from_event_bus_arn(self, "External", "arn:aws:events:eu-west-1:999999999999:event-bus/test-bus")))
Put an event on a Firehose delivery stream
Use the FirehoseDeliveryStream target to put event to an Amazon Data Firehose delivery stream.
The code snippet below creates the scheduled event rule that put events to an Amazon Data Firehose delivery stream.
import aws_cdk.aws_kinesisfirehose as firehose
import aws_cdk.aws_s3 as s3
# bucket: s3.Bucket
stream = firehose.DeliveryStream(self, "DeliveryStream",
destination=firehose.S3Bucket(bucket)
)
rule = events.Rule(self, "Rule",
schedule=events.Schedule.expression("rate(1 minute)")
)
rule.add_target(targets.FirehoseDeliveryStream(stream))
Run an ECS Task
Use the EcsTask target to run an ECS Task.
The code snippet below creates a scheduled event rule that will run the task described in taskDefinition every hour.
Tagging Tasks
By default, ECS tasks run from EventBridge targets will not have tags applied to
them. You can set the propagateTags field to propagate the tags set on the task
definition to the task initialized by the event trigger.
If you want to set tags independent of those applied to the TaskDefinition, you
can use the tags array. Both of these fields can be used together or separately
to set tags on the triggered task.
import aws_cdk.aws_ecs as ecs
# cluster: ecs.ICluster
# task_definition: ecs.TaskDefinition
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(
targets.EcsTask(
cluster=cluster,
task_definition=task_definition,
propagate_tags=ecs.PropagatedTagSource.TASK_DEFINITION,
tags=[targets.Tag(
key="my-tag",
value="my-tag-value"
)
]
))
Launch type for ECS Task
By default, if isEc2Compatible for the taskDefinition is true, the EC2 type is used as
the launch type for the task, otherwise the FARGATE type.
If you want to override the default launch type, you can set the launchType property.
import aws_cdk.aws_ecs as ecs
# cluster: ecs.ICluster
# task_definition: ecs.TaskDefinition
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(targets.EcsTask(
cluster=cluster,
task_definition=task_definition,
launch_type=ecs.LaunchType.FARGATE
))
Assign public IP addresses to tasks
You can set the assignPublicIp flag to assign public IP addresses to tasks.
If you want to detach the public IP address from the task, you have to set the flag false.
You can specify the flag true only when the launch type is set to FARGATE.
import aws_cdk.aws_ecs as ecs
import aws_cdk.aws_ec2 as ec2
# cluster: ecs.ICluster
# task_definition: ecs.TaskDefinition
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(
targets.EcsTask(
cluster=cluster,
task_definition=task_definition,
assign_public_ip=True,
subnet_selection=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC)
))
Enable Amazon ECS Exec for ECS Task
If you use Amazon ECS Exec, you can run commands in or get a shell to a container running on an Amazon EC2 instance or on AWS Fargate.
import aws_cdk.aws_ecs as ecs
# cluster: ecs.ICluster
# task_definition: ecs.TaskDefinition
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(targets.EcsTask(
cluster=cluster,
task_definition=task_definition,
task_count=1,
container_overrides=[targets.ContainerOverride(
container_name="TheContainer",
command=["echo", events.EventField.from_path("$.detail.event")]
)],
enable_execute_command=True
))
Overriding Values in the Task Definition
You can override values in the task definition by setting the corresponding properties in the EcsTaskProps. All
values in the TaskOverrides API are
supported.
import aws_cdk.aws_ecs as ecs
# cluster: ecs.ICluster
# task_definition: ecs.TaskDefinition
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(targets.EcsTask(
cluster=cluster,
task_definition=task_definition,
task_count=1,
# Overrides the cpu and memory values in the task definition
cpu="512",
memory="512"
))
Schedule a Redshift query (serverless or cluster)
Use the RedshiftQuery target to schedule an Amazon Redshift Query.
The code snippet below creates the scheduled event rule that route events to an Amazon Redshift Query
import aws_cdk.aws_redshiftserverless as redshiftserverless
# workgroup: redshiftserverless.CfnWorkgroup
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
dlq = sqs.Queue(self, "DeadLetterQueue")
rule.add_target(targets.RedshiftQuery(workgroup.attr_workgroup_workgroup_arn,
database="dev",
dead_letter_queue=dlq,
sql=["SELECT * FROM foo", "SELECT * FROM baz"]
))
Publish to an SNS Topic
Use the SnsTopic target to publish to an SNS Topic.
The code snippet below creates the scheduled event rule that publishes to an SNS Topic using a resource policy.
import aws_cdk.aws_sns as sns
# topic: sns.ITopic
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(targets.SnsTopic(topic))
Alternatively, a role can be attached to the target when the rule is triggered.
import aws_cdk.aws_sns as sns
# topic: sns.ITopic
rule = events.Rule(self, "Rule",
schedule=events.Schedule.rate(cdk.Duration.hours(1))
)
rule.add_target(targets.SnsTopic(topic, authorize_using_role=True))