使用 SDK for Python (Boto3) 的 Lambda 範例 - AWS SDK 程式碼範例

文件 AWS 開發套件範例 GitHub 儲存庫中有更多可用的 AWS SDK 範例

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 SDK for Python (Boto3) 的 Lambda 範例

下列程式碼範例示範如何使用 適用於 Python (Boto3) 的 AWS SDK 搭配 Lambda 執行動作和實作常見案例。

基本概念是程式碼範例,這些範例說明如何在服務內執行基本操作。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會告訴您如何呼叫個別服務函數,但您可以在其相關情境中查看內容中的動作。

案例是向您展示如何呼叫服務中的多個函數或與其他 AWS 服務組合來完成特定任務的程式碼範例。

每個範例均包含完整原始碼的連結,您可在連結中找到如何設定和執行內容中程式碼的相關指示。

開始使用

下列程式碼範例示範如何開始使用 Lambda。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

import boto3 def main(): """ List the Lambda functions in your AWS account. """ # Create the Lambda client lambda_client = boto3.client("lambda") # Use the paginator to list the functions paginator = lambda_client.get_paginator("list_functions") response_iterator = paginator.paginate() print("Here are the Lambda functions in your account:") for page in response_iterator: for function in page["Functions"]: print(f" {function['FunctionName']}") if __name__ == "__main__": main()
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的「ListFunctions」。

基本概念

以下程式碼範例顯示做法:

  • 建立 IAM 角色和 Lambda 函數,然後上傳處理常式程式碼。

  • 調用具有單一參數的函數並取得結果。

  • 更新函數程式碼並使用環境變數進行設定。

  • 調用具有新參數的函數並取得結果。顯示傳回的執行日誌。

  • 列出您帳戶的函數,然後清理相關資源。

如需詳細資訊,請參閱使用主控台建立 Lambda 函數

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

定義增量一個數字的 Lambda 處理常式。

import logging logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): """ Accepts an action and a single number, performs the specified action on the number, and returns the result. The only allowable action is 'increment'. :param event: The event dict that contains the parameters sent when the function is invoked. :param context: The context in which the function is called. :return: The result of the action. """ result = None action = event.get("action") if action == "increment": result = event.get("number", 0) + 1 logger.info("Calculated result of %s", result) else: logger.error("%s is not a valid action.", action) response = {"result": result} return response

定義可執行算術運算的第二個 Lambda 處理常式。

import logging import os logger = logging.getLogger() # Define a list of Python lambda functions that are called by this AWS Lambda function. ACTIONS = { "plus": lambda x, y: x + y, "minus": lambda x, y: x - y, "times": lambda x, y: x * y, "divided-by": lambda x, y: x / y, } def lambda_handler(event, context): """ Accepts an action and two numbers, performs the specified action on the numbers, and returns the result. :param event: The event dict that contains the parameters sent when the function is invoked. :param context: The context in which the function is called. :return: The result of the specified action. """ # Set the log level based on a variable configured in the Lambda environment. logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO)) logger.debug("Event: %s", event) action = event.get("action") func = ACTIONS.get(action) x = event.get("x") y = event.get("y") result = None try: if func is not None and x is not None and y is not None: result = func(x, y) logger.info("%s %s %s is %s", x, action, y, result) else: logger.error("I can't calculate %s %s %s.", x, action, y) except ZeroDivisionError: logger.warning("I can't divide %s by 0!", x) response = {"result": result} return response

建立可包裝 Lambda 動作的函數。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource @staticmethod def create_deployment_package(source_file, destination_file): """ Creates a Lambda deployment package in .zip format in an in-memory buffer. This buffer can be passed directly to Lambda when creating the function. :param source_file: The name of the file that contains the Lambda handler function. :param destination_file: The name to give the file when it's deployed to Lambda. :return: The deployment package. """ buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w") as zipped: zipped.write(source_file, destination_file) buffer.seek(0) return buffer.read() def get_iam_role(self, iam_role_name): """ Get an AWS Identity and Access Management (IAM) role. :param iam_role_name: The name of the role to retrieve. :return: The IAM role. """ role = None try: temp_role = self.iam_resource.Role(iam_role_name) temp_role.load() role = temp_role logger.info("Got IAM role %s", role.name) except ClientError as err: if err.response["Error"]["Code"] == "NoSuchEntity": logger.info("IAM role %s does not exist.", iam_role_name) else: logger.error( "Couldn't get IAM role %s. Here's why: %s: %s", iam_role_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return role def create_iam_role_for_lambda(self, iam_role_name): """ Creates an IAM role that grants the Lambda function basic permissions. If a role with the specified name already exists, it is used for the demo. :param iam_role_name: The name of the role to create. :return: The role and a value that indicates whether the role is newly created. """ role = self.get_iam_role(iam_role_name) if role is not None: return role, False lambda_assume_role_policy = { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" try: role = self.iam_resource.create_role( RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(lambda_assume_role_policy), ) logger.info("Created role %s.", role.name) role.attach_policy(PolicyArn=policy_arn) logger.info("Attached basic execution policy to role %s.", role.name) except ClientError as error: if error.response["Error"]["Code"] == "EntityAlreadyExists": role = self.iam_resource.Role(iam_role_name) logger.warning("The role %s already exists. Using it.", iam_role_name) else: logger.exception( "Couldn't create role %s or attach policy %s.", iam_role_name, policy_arn, ) raise return role, True def get_function(self, function_name): """ Gets data about a Lambda function. :param function_name: The name of the function. :return: The function data. """ response = None try: response = self.lambda_client.get_function(FunctionName=function_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Function %s does not exist.", function_name) else: logger.error( "Couldn't get function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return response def create_function( self, function_name, handler_name, iam_role, deployment_package ): """ Deploys a Lambda function. :param function_name: The name of the Lambda function. :param handler_name: The fully qualified name of the handler function. This must include the file name and the function name. :param iam_role: The IAM role to use for the function. :param deployment_package: The deployment package that contains the function code in .zip format. :return: The Amazon Resource Name (ARN) of the newly created function. """ try: response = self.lambda_client.create_function( FunctionName=function_name, Description="AWS Lambda doc example", Runtime="python3.9", Role=iam_role.arn, Handler=handler_name, Code={"ZipFile": deployment_package}, Publish=True, ) function_arn = response["FunctionArn"] waiter = self.lambda_client.get_waiter("function_active_v2") waiter.wait(FunctionName=function_name) logger.info( "Created function '%s' with ARN: '%s'.", function_name, response["FunctionArn"], ) except ClientError: logger.error("Couldn't create function %s.", function_name) raise else: return function_arn def delete_function(self, function_name): """ Deletes a Lambda function. :param function_name: The name of the function to delete. """ try: self.lambda_client.delete_function(FunctionName=function_name) except ClientError: logger.exception("Couldn't delete function %s.", function_name) raise def invoke_function(self, function_name, function_params, get_log=False): """ Invokes a Lambda function. :param function_name: The name of the function to invoke. :param function_params: The parameters of the function as a dict. This dict is serialized to JSON before it is sent to Lambda. :param get_log: When true, the last 4 KB of the execution log are included in the response. :return: The response from the function invocation. """ try: response = self.lambda_client.invoke( FunctionName=function_name, Payload=json.dumps(function_params), LogType="Tail" if get_log else "None", ) logger.info("Invoked function %s.", function_name) except ClientError: logger.exception("Couldn't invoke function %s.", function_name) raise return response def update_function_code(self, function_name, deployment_package): """ Updates the code for a Lambda function by submitting a .zip archive that contains the code for the function. :param function_name: The name of the function to update. :param deployment_package: The function code to update, packaged as bytes in .zip format. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_code( FunctionName=function_name, ZipFile=deployment_package ) except ClientError as err: logger.error( "Couldn't update function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def update_function_configuration(self, function_name, env_vars): """ Updates the environment variables for a Lambda function. :param function_name: The name of the function to update. :param env_vars: A dict of environment variables to update. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_configuration( FunctionName=function_name, Environment={"Variables": env_vars} ) except ClientError as err: logger.error( "Couldn't update function configuration %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def list_functions(self): """ Lists the Lambda functions for the current account. """ try: func_paginator = self.lambda_client.get_paginator("list_functions") for func_page in func_paginator.paginate(): for func in func_page["Functions"]: print(func["FunctionName"]) desc = func.get("Description") if desc: print(f"\t{desc}") print(f"\t{func['Runtime']}: {func['Handler']}") except ClientError as err: logger.error( "Couldn't list functions. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

建立可執行該案例的函數。

class UpdateFunctionWaiter(CustomWaiter): """A custom waiter that waits until a function is successfully updated.""" def __init__(self, client): super().__init__( "UpdateSuccess", "GetFunction", "Configuration.LastUpdateStatus", {"Successful": WaitState.SUCCESS, "Failed": WaitState.FAILURE}, client, ) def wait(self, function_name): self._wait(FunctionName=function_name) def run_scenario(lambda_client, iam_resource, basic_file, calculator_file, lambda_name): """ Runs the scenario. :param lambda_client: A Boto3 Lambda client. :param iam_resource: A Boto3 IAM resource. :param basic_file: The name of the file that contains the basic Lambda handler. :param calculator_file: The name of the file that contains the calculator Lambda handler. :param lambda_name: The name to give resources created for the scenario, such as the IAM role and the Lambda function. """ logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Lambda getting started with functions demo.") print("-" * 88) wrapper = LambdaWrapper(lambda_client, iam_resource) print("Checking for IAM role for Lambda...") iam_role, should_wait = wrapper.create_iam_role_for_lambda(lambda_name) if should_wait: logger.info("Giving AWS time to create resources...") wait(10) print(f"Looking for function {lambda_name}...") function = wrapper.get_function(lambda_name) if function is None: print("Zipping the Python script into a deployment package...") deployment_package = wrapper.create_deployment_package( basic_file, f"{lambda_name}.py" ) print(f"...and creating the {lambda_name} Lambda function.") wrapper.create_function( lambda_name, f"{lambda_name}.lambda_handler", iam_role, deployment_package ) else: print(f"Function {lambda_name} already exists.") print("-" * 88) print(f"Let's invoke {lambda_name}. This function increments a number.") action_params = { "action": "increment", "number": q.ask("Give me a number to increment: ", q.is_int), } print(f"Invoking {lambda_name}...") response = wrapper.invoke_function(lambda_name, action_params) print( f"Incrementing {action_params['number']} resulted in " f"{json.load(response['Payload'])}" ) print("-" * 88) print(f"Let's update the function to an arithmetic calculator.") q.ask("Press Enter when you're ready.") print("Creating a new deployment package...") deployment_package = wrapper.create_deployment_package( calculator_file, f"{lambda_name}.py" ) print(f"...and updating the {lambda_name} Lambda function.") update_waiter = UpdateFunctionWaiter(lambda_client) wrapper.update_function_code(lambda_name, deployment_package) update_waiter.wait(lambda_name) print(f"This function uses an environment variable to control logging level.") print(f"Let's set it to DEBUG to get the most logging.") wrapper.update_function_configuration( lambda_name, {"LOG_LEVEL": logging.getLevelName(logging.DEBUG)} ) actions = ["plus", "minus", "times", "divided-by"] want_invoke = True while want_invoke: print(f"Let's invoke {lambda_name}. You can invoke these actions:") for index, action in enumerate(actions): print(f"{index + 1}: {action}") action_params = {} action_index = q.ask( "Enter the number of the action you want to take: ", q.is_int, q.in_range(1, len(actions)), ) action_params["action"] = actions[action_index - 1] print(f"You've chosen to invoke 'x {action_params['action']} y'.") action_params["x"] = q.ask("Enter a value for x: ", q.is_int) action_params["y"] = q.ask("Enter a value for y: ", q.is_int) print(f"Invoking {lambda_name}...") response = wrapper.invoke_function(lambda_name, action_params, True) print( f"Calculating {action_params['x']} {action_params['action']} {action_params['y']} " f"resulted in {json.load(response['Payload'])}" ) q.ask("Press Enter to see the logs from the call.") print(base64.b64decode(response["LogResult"]).decode()) want_invoke = q.ask("That was fun. Shall we do it again? (y/n) ", q.is_yesno) print("-" * 88) if q.ask( "Do you want to list all of the functions in your account? (y/n) ", q.is_yesno ): wrapper.list_functions() print("-" * 88) if q.ask("Ready to delete the function and role? (y/n) ", q.is_yesno): for policy in iam_role.attached_policies.all(): policy.detach_role(RoleName=iam_role.name) iam_role.delete() print(f"Deleted role {lambda_name}.") wrapper.delete_function(lambda_name) print(f"Deleted function {lambda_name}.") print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: run_scenario( boto3.client("lambda"), boto3.resource("iam"), "lambda_handler_basic.py", "lambda_handler_calculator.py", "doc_example_lambda_calculator", ) except Exception: logging.exception("Something went wrong with the demo!")

動作

以下程式碼範例顯示如何使用 CreateFunction

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def create_function( self, function_name, handler_name, iam_role, deployment_package ): """ Deploys a Lambda function. :param function_name: The name of the Lambda function. :param handler_name: The fully qualified name of the handler function. This must include the file name and the function name. :param iam_role: The IAM role to use for the function. :param deployment_package: The deployment package that contains the function code in .zip format. :return: The Amazon Resource Name (ARN) of the newly created function. """ try: response = self.lambda_client.create_function( FunctionName=function_name, Description="AWS Lambda doc example", Runtime="python3.9", Role=iam_role.arn, Handler=handler_name, Code={"ZipFile": deployment_package}, Publish=True, ) function_arn = response["FunctionArn"] waiter = self.lambda_client.get_waiter("function_active_v2") waiter.wait(FunctionName=function_name) logger.info( "Created function '%s' with ARN: '%s'.", function_name, response["FunctionArn"], ) except ClientError: logger.error("Couldn't create function %s.", function_name) raise else: return function_arn
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的「CreateFunction」。

以下程式碼範例顯示如何使用 DeleteFunction

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def delete_function(self, function_name): """ Deletes a Lambda function. :param function_name: The name of the function to delete. """ try: self.lambda_client.delete_function(FunctionName=function_name) except ClientError: logger.exception("Couldn't delete function %s.", function_name) raise
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的「DeleteFunction」。

以下程式碼範例顯示如何使用 GetFunction

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def get_function(self, function_name): """ Gets data about a Lambda function. :param function_name: The name of the function. :return: The function data. """ response = None try: response = self.lambda_client.get_function(FunctionName=function_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Function %s does not exist.", function_name) else: logger.error( "Couldn't get function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return response
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的「GetFunction」。

以下程式碼範例顯示如何使用 Invoke

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def invoke_function(self, function_name, function_params, get_log=False): """ Invokes a Lambda function. :param function_name: The name of the function to invoke. :param function_params: The parameters of the function as a dict. This dict is serialized to JSON before it is sent to Lambda. :param get_log: When true, the last 4 KB of the execution log are included in the response. :return: The response from the function invocation. """ try: response = self.lambda_client.invoke( FunctionName=function_name, Payload=json.dumps(function_params), LogType="Tail" if get_log else "None", ) logger.info("Invoked function %s.", function_name) except ClientError: logger.exception("Couldn't invoke function %s.", function_name) raise return response
  • 如需 API 的詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的 Invoke

以下程式碼範例顯示如何使用 ListFunctions

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def list_functions(self): """ Lists the Lambda functions for the current account. """ try: func_paginator = self.lambda_client.get_paginator("list_functions") for func_page in func_paginator.paginate(): for func in func_page["Functions"]: print(func["FunctionName"]) desc = func.get("Description") if desc: print(f"\t{desc}") print(f"\t{func['Runtime']}: {func['Handler']}") except ClientError as err: logger.error( "Couldn't list functions. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的「ListFunctions」。

以下程式碼範例顯示如何使用 UpdateFunctionCode

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def update_function_code(self, function_name, deployment_package): """ Updates the code for a Lambda function by submitting a .zip archive that contains the code for the function. :param function_name: The name of the function to update. :param deployment_package: The function code to update, packaged as bytes in .zip format. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_code( FunctionName=function_name, ZipFile=deployment_package ) except ClientError as err: logger.error( "Couldn't update function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的「UpdateFunctionCode」。

以下程式碼範例顯示如何使用 UpdateFunctionConfiguration

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def update_function_configuration(self, function_name, env_vars): """ Updates the environment variables for a Lambda function. :param function_name: The name of the function to update. :param env_vars: A dict of environment variables to update. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_configuration( FunctionName=function_name, Environment={"Variables": env_vars} ) except ClientError as err: logger.error( "Couldn't update function configuration %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的「UpdateFunctionConfiguration」。

案例

以下程式碼範例示範如何建立 REST API,此 API 使用虛構資料模擬追蹤美國 COVID-19 每日病例的系統。

適用於 Python 的 SDK (Boto3)

示範如何使用 AWS Chalice 搭配 適用於 Python (Boto3) 的 AWS SDK 來建立使用 Amazon API Gateway AWS Lambda和 Amazon DynamoDB 的無伺服器 REST API。REST API 使用虛構資料模擬追蹤美國 COVID-19 每日病例的系統。了解如何:

  • 使用 AWS Chalice 定義 Lambda 函數中的路由,這些函數稱為 來處理透過 API Gateway 發出的 REST 請求。

  • 使用 Lambda 函式在 DynamoDB 資料表中擷取和存放資料,以便為 REST 請求提供服務。

  • 在 AWS CloudFormation 範本中定義資料表結構和安全角色資源。

  • 使用 AWS Chalice 和 CloudFormation 封裝和部署所有必要的資源。

  • 使用 CloudFormation 清理所有已建立的資源。

如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • API Gateway

  • CloudFormation

  • DynamoDB

  • Lambda

下列程式碼範例顯示如何使用 Amazon Aurora 資料庫支援的 REST API 來建立出借圖書館,讓贊助人可以借書與還書。

適用於 Python 的 SDK (Boto3)

示範如何使用 適用於 Python (Boto3) 的 AWS SDK 搭配 Amazon Relational Database Service (Amazon RDS) API 和 AWS Chalice 來建立由 Amazon Aurora 資料庫支援的 REST API。Web 服務是完全無伺服器的,表示這是一種贊助人可以借書與還書的簡單出借圖書館。了解如何:

  • 建立與管理無伺服器的 Aurora 資料庫叢集。

  • 使用 AWS Secrets Manager 管理資料庫登入資料。

  • 實作資料儲存層,該層使用 Amazon RDS 將資料移入和移出資料庫。

  • 使用 AWS Chalice 將無伺服器 REST API 部署至 Amazon API Gateway 和 AWS Lambda。

  • 使用 Request 套件來將請求傳送到 Web 服務。

如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • API Gateway

  • Aurora

  • Lambda

  • Secrets Manager

下列程式碼範例示範如何建立 AWS Step Functions 訊息應用程式,從資料庫資料表擷取訊息記錄。

適用於 Python 的 SDK (Boto3)

示範如何使用 適用於 Python (Boto3) 的 AWS SDK 搭配 AWS Step Functions 來建立訊息應用程式,從 Amazon DynamoDB 資料表擷取訊息記錄,並透過 Amazon Simple Queue Service (Amazon SQS) 傳送它們。狀態機器會與 AWS Lambda 函數整合,以掃描資料庫是否有未傳送的訊息。

  • 建立從 Amazon DynamoDB 資料表擷取和更新訊息記錄的狀態機器。

  • 更新狀態機器定義,以便也向 Amazon Simple Queue Service (Amazon SQS) 傳送訊息。

  • 開始和停用狀態機器執行。

  • 使用服務整合從狀態機器連接至 Lambda、DynamoDB 和 Amazon SQS。

如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • DynamoDB

  • Lambda

  • Amazon SQS

  • 步驟函數

下列程式碼範例示範如何建立由建置於 Amazon API Gateway 上的 websocket API 提供服務的聊天應用程式。

適用於 Python 的 SDK (Boto3)

示範如何使用 適用於 Python (Boto3) 的 AWS SDK 搭配 Amazon API Gateway V2 來建立與 AWS Lambda 和 Amazon DynamoDB 整合的 Websocket API。

  • 建立由 API Gateway 提供服務的 websocket API。

  • 定義 Lambda 處理常式,該常式將連接存放在 DynamoDB 中,並將訊息傳送給其他聊天參與者。

  • 連接至 websocket 聊天應用程式,並使用 Websockets 套件傳送訊息。

如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • API Gateway

  • DynamoDB

  • Lambda

下列程式碼範例示範如何建立 Amazon API Gateway 調用的 AWS Lambda 函數。

適用於 Python 的 SDK (Boto3)

此範例顯示如何建立和使用目標為 AWS Lambda 函數的 Amazon API Gateway REST API。Lambda 處理常式會展示如何根據 HTTP 方法來路由;如何從查詢字串、標頭和本文中取得資料;以及如何傳回 JSON 回應。

  • 部署 Lambda 函式。

  • 建立 API Gateway REST API。

  • 建立目標為 Lambda 函式的 REST 資源。

  • 授與許可讓 API Gateway 調用 Lambda 函式。

  • 使用 Request 套件來將請求傳送到 REST API。

  • 清理示範期間建立的所有資源。

這個範例在 GitHub 上的檢視效果最佳。如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • API Gateway

  • DynamoDB

  • Lambda

  • Amazon SNS

下列程式碼範例示範如何建立由 Amazon EventBridge 排程事件調用的 AWS Lambda 函數。

適用於 Python 的 SDK (Boto3)

此範例示範如何將 AWS Lambda 函數註冊為排程 Amazon EventBridge 事件的目標。Lambda 處理常式會將合適的訊息和完整的事件資料寫入 Amazon CloudWatch Logs 中以供日後擷取。

  • 部署 Lambda 函式。

  • 建立一個 EventBridge 排程事件,並將 Lambda 函式做為目標。

  • 授予許可讓 EventBridge 調用 Lambda 函式。

  • 列印 CloudWatch Logs 中的最新資料,以顯示排程調用的結果。

  • 清理示範期間建立的所有資源。

這個範例在 GitHub 上的檢視效果最佳。如需完整的原始碼和如何設定及執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • CloudWatch Logs

  • DynamoDB

  • EventBridge

  • Lambda

  • Amazon SNS

無伺服器範例

以下程式碼範例示範如何實作連線至 RDS 資料庫的 Lambda 函式。該函數會提出簡單的資料庫請求並傳回結果。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 連線至 Lambda 函數中的 Amazon RDS 資料庫。

import json import os import boto3 import pymysql # RDS settings proxy_host_name = os.environ['PROXY_HOST_NAME'] port = int(os.environ['PORT']) db_name = os.environ['DB_NAME'] db_user_name = os.environ['DB_USER_NAME'] aws_region = os.environ['AWS_REGION'] # Fetch RDS Auth Token def get_auth_token(): client = boto3.client('rds') token = client.generate_db_auth_token( DBHostname=proxy_host_name, Port=port DBUsername=db_user_name Region=aws_region ) return token def lambda_handler(event, context): token = get_auth_token() try: connection = pymysql.connect( host=proxy_host_name, user=db_user_name, password=token, db=db_name, port=port, ssl={'ca': 'Amazon RDS'} # Ensure you have the CA bundle for SSL connection ) with connection.cursor() as cursor: cursor.execute('SELECT %s + %s AS sum', (3, 2)) result = cursor.fetchone() return result except Exception as e: return (f"Error: {str(e)}") # Return an error message if an exception occurs

以下程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 Kinesis 串流的記錄來接收所觸發的事件。此函數會擷取 Kinesis 承載、從 Base64 解碼,並記錄記錄內容。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來使用 Kinesis 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")

以下程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 DynamoDB 串流的記錄來接收所觸發的事件。函數會擷取 DynamoDB 承載並記下記錄內容。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來使用 DynamoDB 事件。

import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")

以下程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 DocumentDB 變更串流的記錄來接收所觸發的事件。函數會擷取 DocumentDB 承載並記下記錄內容。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 使用 Amazon DocumentDB 事件。

import json def lambda_handler(event, context): for record in event.get('events', []): log_document_db_event(record) return 'OK' def log_document_db_event(record): event_data = record.get('event', {}) operation_type = event_data.get('operationType', 'Unknown') db = event_data.get('ns', {}).get('db', 'Unknown') collection = event_data.get('ns', {}).get('coll', 'Unknown') full_document = event_data.get('fullDocument', {}) print(f"Operation type: {operation_type}") print(f"db: {db}") print(f"collection: {collection}") print("Full document:", json.dumps(full_document, indent=2))

以下程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 Amazon MSK 叢集的記錄來接收所觸發的事件。函數會擷取 MSK 承載並記下記錄內容。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來取用 Amazon MSK 事件。

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)

下列程式碼範例示範如何實作 Lambda 函式,該函式接收透過上傳物件至 S3 儲存貯體時所觸發的事件。此函數會從事件參數擷取 S3 儲存貯體名稱和物件金鑰,並呼叫 Amazon S3 API 以擷取和記錄物件的內容類型。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來使用 S3 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import json import urllib.parse import boto3 print('Loading function') s3 = boto3.client('s3') def lambda_handler(event, context): #print("Received event: " + json.dumps(event, indent=2)) # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') try: response = s3.get_object(Bucket=bucket, Key=key) print("CONTENT TYPE: " + response['ContentType']) return response['ContentType'] except Exception as e: print(e) print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e

下列程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 SNS 主題的訊息來接收所觸發的事件。函數會從事件參數擷取訊息,並記錄每一則訊息的內容。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來使用 SNS 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for record in event['Records']: process_message(record) print("done") def process_message(record): try: message = record['Sns']['Message'] print(f"Processed message {message}") # TODO; Process your record here except Exception as e: print("An error occurred") raise e

下列程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 SQS 佇列的訊息來接收所觸發的事件。函數會從事件參數擷取訊息,並記錄每一則訊息的內容。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 來使用 SQS 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for message in event['Records']: process_message(message) print("done") def process_message(message): try: print(f"Processed message {message['body']}") # TODO: Do interesting work based on the new message except Exception as err: print("An error occurred") raise err

下列程式碼範例示範如何針對接收來自 Kinesis 串流之事件的 Lambda 函式,實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

透過使用 Python 的 Lambda 報告 Kinesis 批次項目失敗。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

下列程式碼範例示範如何針對接收來自 DynamoDB 串流之事件的 Lambda 函式,實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 報告 DynamoDB 批次項目失敗。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

下列程式碼範例示範如何為接收從 SQS 佇列接收事件的 Lambda 函式,實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

適用於 Python 的 SDK (Boto3)
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 報告 SQS 批次項目失敗。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: print(f"Processed message: {record['body']}") except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response