Exemples du planificateur EventBridge avec le kit SDK pour Python (Boto3) - Exemples de code de kit AWS SDK

D’autres exemples de kits AWS SDK sont disponibles dans le référentiel GitHub AWS Doc SDK Examples.

Exemples du planificateur EventBridge avec le kit SDK pour Python (Boto3)

Les exemples de code suivants montrent comment réaliser des actions et mettre en œuvre des scénarios courants en utilisant le kit AWS SDK pour Python (Boto3) avec le planificateur EventBridge.

Les actions sont des extraits de code de programmes plus larges et doivent être exécutées dans leur contexte. Alors que les actions vous indiquent comment appeler des fonctions de service individuelles, vous pouvez les voir en contexte dans leurs scénarios associés.

Les Scénarios sont des exemples de code qui vous montrent comment accomplir des tâches spécifiques en appelant plusieurs fonctions au sein d’un même service ou combinés à d’autres Services AWS.

Chaque exemple inclut un lien vers le code source complet, où vous trouverez des instructions sur la configuration et l’exécution du code en contexte.

Mise en route

Les exemples de code suivants montrent comment démarrer avec le planificateur EventBridge.

SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

import boto3 def hello_scheduler(scheduler_client): """ Use the AWS SDK for Python (Boto3) to create an Amazon EventBridge Scheduler client and list the schedules in your account. This example uses the default settings specified in your shared credentials and config files. :param scheduler_client: A Boto3 Amazon EventBridge Scheduler Client object. This object wraps the low-level Amazon EventBridge Scheduler service API. """ print("Hello, Amazon EventBridge Scheduler! Let's list some of your schedules:\n") paginator = scheduler_client.get_paginator("list_schedules") page_iterator = paginator.paginate(PaginationConfig={"MaxItems": 10}) schedule_names: [str] = [] for page in page_iterator: for schedule in page["Schedules"]: schedule_names.append(schedule["Name"]) print(f"{len(schedule_names)} schedule(s) retrieved.") for schedule_name in schedule_names: print(f"\t{schedule_name}") if __name__ == "__main__": hello_scheduler(boto3.client("scheduler"))
  • Pour plus de détails sur l’API, consultez ListSchedules dans la Référence des API du kit AWS SDK pour Python (Boto3).

Actions

L’exemple de code suivant montre comment utiliser CreateSchedule.

SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

class SchedulerWrapper: def __init__(self, eventbridge_scheduler_client: client): self.scheduler_client = eventbridge_scheduler_client @classmethod def from_client(cls) -> "SchedulerWrapper": """ Creates a SchedulerWrapper instance with a default EventBridge Scheduler client. :return: An instance of SchedulerWrapper initialized with the default EventBridge Scheduler client. """ eventbridge_scheduler_client = boto3.client("scheduler") return cls(eventbridge_scheduler_client) def create_schedule( self, name: str, schedule_expression: str, schedule_group_name: str, target_arn: str, role_arn: str, input: str, delete_after_completion: bool = False, use_flexible_time_window: bool = False, ) -> str: """ Creates a new schedule with the specified parameters. :param name: The name of the schedule. :param schedule_expression: The expression that defines when the schedule runs. :param schedule_group_name: The name of the schedule group. :param target_arn: The Amazon Resource Name (ARN) of the target. :param role_arn: The Amazon Resource Name (ARN) of the execution IAM role. :param input: The input for the target. :param delete_after_completion: Whether to delete the schedule after it completes. :param use_flexible_time_window: Whether to use a flexible time window. :return The ARN of the created schedule. """ try: hours_to_run = 1 flexible_time_window_minutes = 10 parameters = { "Name": name, "ScheduleExpression": schedule_expression, "GroupName": schedule_group_name, "Target": {"Arn": target_arn, "RoleArn": role_arn, "Input": input}, "StartDate": datetime.now(timezone.utc), "EndDate": datetime.now(timezone.utc) + timedelta(hours=hours_to_run), } if delete_after_completion: parameters["ActionAfterCompletion"] = "DELETE" if use_flexible_time_window: parameters["FlexibleTimeWindow"] = { "Mode": "FLEXIBLE", "MaximumWindowInMinutes": flexible_time_window_minutes, } else: parameters["FlexibleTimeWindow"] = {"Mode": "OFF"} response = self.scheduler_client.create_schedule(**parameters) return response["ScheduleArn"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": logger.error( "Failed to create schedule '%s' due to a conflict. %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error creating schedule: %s", err.response["Error"]["Message"] ) raise
  • Pour plus de détails sur l’API, consultez CreateSchedule dans la Référence des API du kit AWS SDK pour Python (Boto3).

L’exemple de code suivant montre comment utiliser CreateScheduleGroup.

SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

class SchedulerWrapper: def __init__(self, eventbridge_scheduler_client: client): self.scheduler_client = eventbridge_scheduler_client @classmethod def from_client(cls) -> "SchedulerWrapper": """ Creates a SchedulerWrapper instance with a default EventBridge Scheduler client. :return: An instance of SchedulerWrapper initialized with the default EventBridge Scheduler client. """ eventbridge_scheduler_client = boto3.client("scheduler") return cls(eventbridge_scheduler_client) def create_schedule_group(self, name: str) -> str: """ Creates a new schedule group with the specified name and description. :param name: The name of the schedule group. :param description: The description of the schedule group. :return: The ARN of the created schedule group. """ try: response = self.scheduler_client.create_schedule_group(Name=name) return response["ScheduleGroupArn"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": logger.error( "Failed to create schedule group '%s' due to a conflict. %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error creating schedule group: %s", err.response["Error"]["Message"], ) raise
  • Pour plus de détails sur l’API, consultez CreateScheduleGroup dans la Référence des API du kit AWS SDK pour Python (Boto3).

L’exemple de code suivant montre comment utiliser DeleteSchedule.

SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

class SchedulerWrapper: def __init__(self, eventbridge_scheduler_client: client): self.scheduler_client = eventbridge_scheduler_client @classmethod def from_client(cls) -> "SchedulerWrapper": """ Creates a SchedulerWrapper instance with a default EventBridge Scheduler client. :return: An instance of SchedulerWrapper initialized with the default EventBridge Scheduler client. """ eventbridge_scheduler_client = boto3.client("scheduler") return cls(eventbridge_scheduler_client) def delete_schedule(self, name: str, schedule_group_name: str) -> None: """ Deletes the schedule with the specified name and schedule group. :param name: The name of the schedule. :param schedule_group_name: The name of the schedule group. """ try: self.scheduler_client.delete_schedule( Name=name, GroupName=schedule_group_name ) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Failed to delete schedule with ID '%s' because the resource was not found: %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error deleting schedule: %s", err.response["Error"]["Message"] ) raise
  • Pour plus de détails sur l’API, consultez DeleteSchedule dans la Référence des API du kit AWS SDK pour Python (Boto3).

L’exemple de code suivant montre comment utiliser DeleteScheduleGroup.

SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

class SchedulerWrapper: def __init__(self, eventbridge_scheduler_client: client): self.scheduler_client = eventbridge_scheduler_client @classmethod def from_client(cls) -> "SchedulerWrapper": """ Creates a SchedulerWrapper instance with a default EventBridge Scheduler client. :return: An instance of SchedulerWrapper initialized with the default EventBridge Scheduler client. """ eventbridge_scheduler_client = boto3.client("scheduler") return cls(eventbridge_scheduler_client) def delete_schedule_group(self, name: str) -> None: """ Deletes the schedule group with the specified name. :param name: The name of the schedule group. """ try: self.scheduler_client.delete_schedule_group(Name=name) logger.info("Schedule group %s deleted successfully.", name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Failed to delete schedule group with ID '%s' because the resource was not found: %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error deleting schedule group: %s", err.response["Error"]["Message"], ) raise
  • Pour plus de détails sur l’API, consultez DeleteScheduleGroup dans la Référence des API du kit AWS SDK pour Python (Boto3).

Scénarios

L’exemple de code suivant illustre comment :

  • déployer une pile CloudFormation avec les ressources requises ;

  • créer un groupe d’emplois du temps avec le planificateur EventBridge ;

  • créer un emploi du temps unique avec le planificateur EventBridge avec un créneau flexible ;

  • créer un emploi du temps récurrent avec le planificateur EventBridge avec un taux déterminé ;

  • supprimer l’emploi du temps et le groupe d’emplois du temps du planificateur EventBridge ;

  • nettoyer les ressources et supprimer la pile.

SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples de code AWS.

Exécutez un scénario interactif à une invite de commande.

class SchedulerScenario: """ A scenario that demonstrates how to use Boto3 to schedule and receive events using the Amazon EventBridge Scheduler. """ def __init__( self, scheduler_wrapper: SchedulerWrapper, cloud_formation_resource: ServiceResource, ): self.eventbridge_scheduler = scheduler_wrapper self.cloud_formation_resource = cloud_formation_resource self.stack: ServiceResource = None self.schedule_group_name = None self.sns_topic_arn = None self.role_arn = None def run(self) -> None: """ Runs the scenario. """ print(DASHES) print("Welcome to the Amazon EventBridge Scheduler Workflow.") print(DASHES) print(DASHES) self.prepare_application() print(DASHES) print(DASHES) self.create_one_time_schedule() print(DASHES) print(DASHES) self.create_recurring_schedule() print(DASHES) print(DASHES) if q.ask( "Do you want to delete all resources created by this workflow? (y/n) ", q.is_yesno, ): self.cleanup() print(DASHES) print("Amazon EventBridge Scheduler workflow completed.") def prepare_application(self) -> None: """ Prepares the application by prompting the user setup information, deploying a CloudFormation stack and creating a schedule group. """ print("Preparing the application...") print( "\nThis example creates resources in a CloudFormation stack, including an SNS topic" + "\nthat will be subscribed to the EventBridge Scheduler events. " + "\n\nYou will need to confirm the subscription in order to receive event emails. " ) email_address = q.ask("Enter an email address to use for event subscriptions: ") stack_name = q.ask("Enter a name for the AWS Cloud Formation Stack: ") template_file = SchedulerScenario.get_template_as_string() parameters = [{"ParameterKey": "email", "ParameterValue": email_address}] self.stack = self.deploy_cloudformation_stack( stack_name, template_file, parameters ) outputs = self.stack.outputs for output in outputs: if output.get("OutputKey") == "RoleARN": self.role_arn = output.get("OutputValue") elif output.get("OutputKey") == "SNStopicARN": self.sns_topic_arn = output.get("OutputValue") if not self.sns_topic_arn or not self.role_arn: error_string = f""" Failed to retrieve required outputs from CloudFormation stack. 'sns_topic_arn'={self.sns_topic_arn}, 'role_arn'={self.role_arn} """ logger.error(error_string) raise ValueError(error_string) print(f"Stack output RoleARN: {self.role_arn}") print(f"Stack output SNStopicARN: a") schedule_group_name = "scenario-schedules-group" schedule_group_arn = self.eventbridge_scheduler.create_schedule_group( schedule_group_name ) print( f"Successfully created schedule group '{self.schedule_group_name}': {schedule_group_arn}." ) self.schedule_group_name = schedule_group_name print("Application preparation complete.") def create_one_time_schedule(self) -> None: """ Creates a one-time schedule to send an initial event. """ schedule_name = q.ask("Enter a name for the one-time schedule:") scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=1) formatted_scheduled_time = scheduled_time.strftime("%Y-%m-%dT%H:%M:%S") print( f"Creating a one-time schedule named '{schedule_name}' " + f"\nto send an initial event in 1 minute with a flexible time window..." ) schedule_arn = self.eventbridge_scheduler.create_schedule( schedule_name, f"at({formatted_scheduled_time})", self.schedule_group_name, self.sns_topic_arn, self.role_arn, f"One time scheduled event test from schedule {schedule_name}.", delete_after_completion=True, use_flexible_time_window=True, ) print( f"Successfully created schedule '{schedule_name}' in schedule group 'scenario-schedules-group': {schedule_arn}." ) print(f"Subscription email will receive an email from this event.") print(f"You must confirm your subscription to receive event emails.") print(f"One-time schedule '{schedule_name}' created successfully.") def create_recurring_schedule(self) -> None: """ Create a recurring schedule to send events at a specified rate in minutes. """ print("Creating a recurring schedule to send events for one hour...") schedule_name = q.ask("Enter a name for the recurring schedule: ") schedule_rate_in_minutes = q.ask( "Enter the desired schedule rate (in minutes): ", q.is_int ) schedule_arn = self.eventbridge_scheduler.create_schedule( schedule_name, f"rate({schedule_rate_in_minutes} minutes)", self.schedule_group_name, self.sns_topic_arn, self.role_arn, f"Recurrent event test from schedule {schedule_name}.", ) print( f"Successfully created schedule '{schedule_name}' in schedule group 'scenario-schedules-group': {schedule_arn}." ) print(f"Subscription email will receive an email from this event.") print(f"You must confirm your subscription to receive event emails.") if q.ask( f"Are you ready to delete the '{schedule_name}' schedule? (y/n)", q.is_yesno ): self.eventbridge_scheduler.delete_schedule( schedule_name, self.schedule_group_name ) def deploy_cloudformation_stack( self, stack_name: str, cfn_template: str, parameters: [dict[str, str]] ) -> ServiceResource: """ Deploys prerequisite resources used by the scenario. The resources are defined in the associated `cfn_template.yaml` AWS CloudFormation script and are deployed as a CloudFormation stack, so they can be easily managed and destroyed. :param stack_name: The name of the CloudFormation stack. :param cfn_template: The CloudFormation template as a string. :param parameters: The parameters for the CloudFormation stack. :return: The CloudFormation stack resource. """ print(f"Deploying CloudFormation stack: {stack_name}.") stack = self.cloud_formation_resource.create_stack( StackName=stack_name, TemplateBody=cfn_template, Capabilities=["CAPABILITY_NAMED_IAM"], Parameters=parameters, ) print(f"CloudFormation stack creation started: {stack_name}") print("Waiting for CloudFormation stack creation to complete...") waiter = self.cloud_formation_resource.meta.client.get_waiter( "stack_create_complete" ) waiter.wait(StackName=stack.name) stack.load() print("CloudFormation stack creation complete.") return stack def destroy_cloudformation_stack(self, stack: ServiceResource) -> None: """ Destroys the resources managed by the CloudFormation stack, and the CloudFormation stack itself. :param stack: The CloudFormation stack that manages the example resources. """ print( f"CloudFormation stack '{stack.name}' is being deleted. This may take a few minutes." ) stack.delete() waiter = self.cloud_formation_resource.meta.client.get_waiter( "stack_delete_complete" ) waiter.wait(StackName=stack.name) print(f"CloudFormation stack '{stack.name}' has been deleted.") def cleanup(self) -> None: """ Deletes the CloudFormation stack and the resources created for the demo. """ if self.schedule_group_name: schedule_group_name = self.schedule_group_name self.schedule_group_name = None self.eventbridge_scheduler.delete_schedule_group(schedule_group_name) print(f"Successfully deleted schedule group '{schedule_group_name}'.") if self.stack is not None: stack = self.stack self.stack = None self.destroy_cloudformation_stack(stack) print("Stack deleted, demo complete.") @staticmethod def get_template_as_string() -> str: """ Returns a string containing this scenario's CloudFormation template. """ script_directory = os.path.dirname(os.path.abspath(__file__)) template_file_path = os.path.join(script_directory, "cfn_template.yaml") file = open(template_file_path, "r") return file.read() if __name__ == "__main__": demo: SchedulerScenario = None try: scheduler_wrapper = SchedulerWrapper.from_client() cloud_formation_resource = resource("cloudformation") demo = SchedulerScenario(scheduler_wrapper, cloud_formation_resource) demo.run() except Exception as exception: logging.exception("Something went wrong with the demo!") if demo is not None: demo.cleanup()

Classe SchedulerWrapper qui encapsule les actions du Planificateur Amazon EventBridge.

class SchedulerWrapper: def __init__(self, eventbridge_scheduler_client: client): self.scheduler_client = eventbridge_scheduler_client @classmethod def from_client(cls) -> "SchedulerWrapper": """ Creates a SchedulerWrapper instance with a default EventBridge Scheduler client. :return: An instance of SchedulerWrapper initialized with the default EventBridge Scheduler client. """ eventbridge_scheduler_client = boto3.client("scheduler") return cls(eventbridge_scheduler_client) def create_schedule( self, name: str, schedule_expression: str, schedule_group_name: str, target_arn: str, role_arn: str, input: str, delete_after_completion: bool = False, use_flexible_time_window: bool = False, ) -> str: """ Creates a new schedule with the specified parameters. :param name: The name of the schedule. :param schedule_expression: The expression that defines when the schedule runs. :param schedule_group_name: The name of the schedule group. :param target_arn: The Amazon Resource Name (ARN) of the target. :param role_arn: The Amazon Resource Name (ARN) of the execution IAM role. :param input: The input for the target. :param delete_after_completion: Whether to delete the schedule after it completes. :param use_flexible_time_window: Whether to use a flexible time window. :return The ARN of the created schedule. """ try: hours_to_run = 1 flexible_time_window_minutes = 10 parameters = { "Name": name, "ScheduleExpression": schedule_expression, "GroupName": schedule_group_name, "Target": {"Arn": target_arn, "RoleArn": role_arn, "Input": input}, "StartDate": datetime.now(timezone.utc), "EndDate": datetime.now(timezone.utc) + timedelta(hours=hours_to_run), } if delete_after_completion: parameters["ActionAfterCompletion"] = "DELETE" if use_flexible_time_window: parameters["FlexibleTimeWindow"] = { "Mode": "FLEXIBLE", "MaximumWindowInMinutes": flexible_time_window_minutes, } else: parameters["FlexibleTimeWindow"] = {"Mode": "OFF"} response = self.scheduler_client.create_schedule(**parameters) return response["ScheduleArn"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": logger.error( "Failed to create schedule '%s' due to a conflict. %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error creating schedule: %s", err.response["Error"]["Message"] ) raise def delete_schedule(self, name: str, schedule_group_name: str) -> None: """ Deletes the schedule with the specified name and schedule group. :param name: The name of the schedule. :param schedule_group_name: The name of the schedule group. """ try: self.scheduler_client.delete_schedule( Name=name, GroupName=schedule_group_name ) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Failed to delete schedule with ID '%s' because the resource was not found: %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error deleting schedule: %s", err.response["Error"]["Message"] ) raise def create_schedule_group(self, name: str) -> str: """ Creates a new schedule group with the specified name and description. :param name: The name of the schedule group. :param description: The description of the schedule group. :return: The ARN of the created schedule group. """ try: response = self.scheduler_client.create_schedule_group(Name=name) return response["ScheduleGroupArn"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": logger.error( "Failed to create schedule group '%s' due to a conflict. %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error creating schedule group: %s", err.response["Error"]["Message"], ) raise def delete_schedule_group(self, name: str) -> None: """ Deletes the schedule group with the specified name. :param name: The name of the schedule group. """ try: self.scheduler_client.delete_schedule_group(Name=name) logger.info("Schedule group %s deleted successfully.", name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Failed to delete schedule group with ID '%s' because the resource was not found: %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error deleting schedule group: %s", err.response["Error"]["Message"], ) raise