

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 适用于 Python 的 Amazon SNS 扩展型客户端库
<a name="extended-client-library-python"></a>

## 先决条件
<a name="prereqs-sns-extended-client-library-python"></a>

以下是使用[适用于 Python 的 Amazon SNS 扩展型客户端库](https://github.com/awslabs/amazon-sns-python-extended-client-lib)的先决条件：
+ 一个 AWS 软件开发工具包。本页上的示例使用 AWS Python SDK Boto3。要安装和设置 SDK，请参阅 [https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html) 文档。
+ 并 AWS 账户 具有正确的凭据。要创建 AWS 账户，请导航到[AWS 主页](https://aws.amazon.com/)，然后选择**创建 AWS 帐户**。按照说明进行操作。

  有关凭证的信息，请参阅《AWS SDK for Python 开发人员指南》**中的[凭证](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html)。
+ Python 3.x（或更高版本）和 pip。
+ 适用于 Python 的 Amazon SNS 扩展型客户端库（也可从 [PyPI](https://pypi.org/project/amazon-sns-extended-client/) 中获得）。

## 配置消息存储
<a name="large-message-configure-storage-python"></a>

Boto3 Amazon [SN](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#client) S 客户端[、主题[PlatformEndpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/platformendpoint/index.html)和对象上提供了以下属性，](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/topic/index.html)用于配置 Amazon S3 消息存储选项。
+ **`large_payload_support`** – 用于存储大型消息的 Amazon S3 存储桶名称。
+ **`use_legacy_attribute`** – 如果设置为 `True`，则所有已发布的消息都使用旧版保留消息属性（`SQSLargePayloadSize`），而不是当前的保留消息属性（`ExtendedPayloadSize`）。
+ **`message_size_threshold`** – 在大型消息桶中存储消息的阈值。不能低于 `0` 或超过 `262144`。默认值为 `262144`。
+ **`always_through_s3`** – 如果为 `True`，则所有消息都存储在 Amazon S3 中。默认值为 `False`。
+ **`s3_client`** – 用于在 Amazon S3 中存储对象的 Boto3 Amazon S3 `client` 对象。如果您想要控制 Amazon S3 客户端（例如自定义 Amazon S3 配置或凭证），请使用此选项。首次使用时，若未预先设置，默认值为 `boto3.client("s3")`。

## 示例：使用存储在 Amazon S3 中的有效负载将消息发布到 Amazon SNS
<a name="example-s3-large-payloads-python"></a>

以下代码示例展示了如何：
+ 创建示例 Amazon SNS 主题和 Amazon SQS 队列。
+ 为 Amazon SQS 队列附加策略以接收来自 Amazon SNS 主题的消息。
+ 订阅队列以接收来自主题的消息。
+ 使用 Amazon SNS 扩展客户端、主题资源和 PlatformEndpoint 资源发布测试消息。
+ 消息有效负载存储在 Amazon S3 中，并发布对它的引用。
+ 打印队列中已发布的消息以及从 Amazon S3 检索到的原始消息。

要发布大型消息，请使用适用于 Python 的 Amazon SNS 扩展型客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。

```
import boto3
from sns_extended_client import SNSExtendedClientSession
from json import loads

s3_extended_payload_bucket = "extended-client-bucket-store"  # S3 bucket with the given bucket name is a resource which is created and accessible with the given AWS credentials
TOPIC_NAME = "---TOPIC-NAME---"
QUEUE_NAME = "---QUEUE-NAME---"

def allow_sns_to_write_to_sqs(topicarn, queuearn):
    policy_document = """{{
        "Version": "2012-10-17",		 	 	 
        "Statement":[
            {{
            "Sid":"MyPolicy",
            "Effect":"Allow",
            "Principal" : {{"AWS" : "*"}},
            "Action":"SQS:SendMessage",
            "Resource": "{}",
            "Condition":{{
                "ArnEquals":{{
                "aws:SourceArn": "{}"
                }}
            }}
            }}
        ]
        }}""".format(queuearn, topicarn)

    return policy_document

def get_msg_from_s3(body,sns_extended_client):
    """Handy Helper to fetch message from S3"""
    json_msg = loads(body)
    s3_object = sns_extended_client.s3_client.get_object(
        Bucket=json_msg[1].get("s3BucketName"), Key=json_msg[1].get("s3Key")
    )
    msg = s3_object.get("Body").read().decode()
    return msg


def fetch_and_print_from_sqs(sqs, queue_url,sns_extended_client):
    sqs_msg = sqs.receive_message(
        QueueUrl=queue_url,
        AttributeNames=['All'],
        MessageAttributeNames=['All'],
        VisibilityTimeout=0,
        WaitTimeSeconds=0,
        MaxNumberOfMessages=1
    ).get("Messages")[0]
    
    message_body = sqs_msg.get("Body")
    print("Published Message: {}".format(message_body))
    print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body,sns_extended_client)))

    # Delete the Processed Message
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=sqs_msg['ReceiptHandle']
    )


sns_extended_client = boto3.client("sns", region_name="us-east-1")
create_topic_response = sns_extended_client.create_topic(Name=TOPIC_NAME)
sns_topic_arn = create_topic_response.get("TopicArn")

# create and subscribe an sqs queue to the sns client
sqs = boto3.client("sqs",region_name="us-east-1")
demo_queue_url = sqs.create_queue(QueueName=QUEUE_NAME).get("QueueUrl")
sqs_queue_arn = sqs.get_queue_attributes(
    QueueUrl=demo_queue_url, AttributeNames=["QueueArn"]
)["Attributes"].get("QueueArn")

# Adding policy to SQS queue such that SNS topic can send msg to SQS queue
policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_queue_arn)
response = sqs.set_queue_attributes(
    QueueUrl = demo_queue_url,
    Attributes = {
        'Policy' : policy_json
    }
)

# Set the RawMessageDelivery subscription attribute to TRUE if you want to use
# SQSExtendedClient to help with retrieving msg from S3
sns_extended_client.subscribe(TopicArn=sns_topic_arn, Protocol="sqs", 
Endpoint=sqs_queue_arn
, Attributes={"RawMessageDelivery":"true"}
)

sns_extended_client.large_payload_support = s3_extended_payload_bucket

# Change default s3_client attribute of sns_extended_client to use 'us-east-1' region
sns_extended_client.s3_client = boto3.client("s3", region_name="us-east-1")


# Below is the example that all the messages will be sent to the S3 bucket
sns_extended_client.always_through_s3 = True
sns_extended_client.publish(
    TopicArn=sns_topic_arn, Message="This message should be published to S3"
)
print("\n\nPublished using SNS extended client:")
fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client)  # Prints message stored in s3

# Below is the example that all the messages larger than 32 bytes will be sent to the S3 bucket
print("\nUsing decreased message size threshold:")

sns_extended_client.always_through_s3 = False
sns_extended_client.message_size_threshold = 32
sns_extended_client.publish(
    TopicArn=sns_topic_arn,
    Message="This message should be published to S3 as it exceeds the limit of the 32 bytes",
)

fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client)  # Prints message stored in s3


# Below is the example to publish message using the SNS.Topic resource
sns_extended_client_resource = SNSExtendedClientSession().resource(
    "sns", region_name="us-east-1"
)

topic = sns_extended_client_resource.Topic(sns_topic_arn)
topic.large_payload_support = s3_extended_payload_bucket

# Change default s3_client attribute of topic to use 'us-east-1' region
topic.s3_client = boto3.client("s3", region_name="us-east-1")

topic.always_through_s3 = True
# Can Set custom S3 Keys to be used to store objects in S3
topic.publish(
    Message="This message should be published to S3 using the topic resource",
    MessageAttributes={
        "S3Key": {
            "DataType": "String",
            "StringValue": "347c11c4-a22c-42e4-a6a2-9b5af5b76587",
        }
    },
)
print("\nPublished using Topic Resource:")
fetch_and_print_from_sqs(sqs, demo_queue_url,topic)

# Below is the example to publish message using the SNS.PlatformEndpoint resource
sns_extended_client_resource = SNSExtendedClientSession().resource(
    "sns", region_name="us-east-1"
)

platform_endpoint = sns_extended_client_resource.PlatformEndpoint(sns_topic_arn)
platform_endpoint.large_payload_support = s3_extended_payload_bucket

# Change default s3_client attribute of platform_endpoint to use 'us-east-1' region
platform_endpoint.s3_client = boto3.client("s3", region_name="us-east-1")

platform_endpoint.always_through_s3 = True
# Can Set custom S3 Keys to be used to store objects in S3
platform_endpoint.publish(
    Message="This message should be published to S3 using the PlatformEndpoint resource",
    MessageAttributes={
        "S3Key": {
            "DataType": "String",
            "StringValue": "247c11c4-a22c-42e4-a6a2-9b5af5b76587",
        }
    },
)
print("\nPublished using PlatformEndpoint Resource:")
fetch_and_print_from_sqs(sqs, demo_queue_url,platform_endpoint)
```

**输出**

```
Published using SNS extended client:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3

Using decreased message size threshold:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 as it exceeds the limit of the 32 bytes

Published using Topic Resource:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 using the topic resource

Published using PlatformEndpoint Resource:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 using the PlatformEndpoint resource
```