使用 Amazon Data Firehose 将数据流式传输到表 - Amazon Simple Storage Service

使用 Amazon Data Firehose 将数据流式传输到表

Amazon Data Firehose 是一项完全托管式服务,用于实时将流数据传输到目标,如 Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Splunk、Apache Iceberg 表和自定义 HTTP 端点或由受支持的第三方服务提供商拥有的 HTTP 端点。在使用 Amazon Data Firehose 时,您无需编写应用程序或管理资源。您可以配置数据生成工具向 Firehose 发送数据,然后 Firehose 会将数据自动传输到您指定的目标。还可以配置 Firehose 在传输数据之前转换数据。要了解有关 Amazon Data Firehose 的更多信息,请参阅 What is Amazon Data Firehose?

完成以下步骤以设置 Firehose 流式传输到 S3 表存储桶中的表:

  1. 将表存储桶与 AWS 分析服务集成

  2. 配置 Firehose 以将数据传输到 S3 表中。为此,您需要创建一个支持 Firehose 访问表的 AWS Identity and Access Management(IAM)服务角色

  3. 向 Firehose 服务角色授予对表或表命名空间的显式权限。有关更多信息,请参阅授予 Lake Formation 对表资源的权限

  4. 创建一个 Firehose 流来将数据路由到表。

为 Firehose 创建一个角色以使用 S3 表作为目标

Firehose 需要具有特定权限的 IAM 服务角色,才能访问 AWS Glue 表并将数据写入 S3 表。在创建 Firehose 流时,您需要提供此 IAM 角色。

  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 在左侧导航窗格中,选择策略

  3. 选择创建策略,并在策略编辑器中选择 JSON

  4. 添加以下内联策略,来授予对数据目录中所有数据库和表的权限。如果需要,则您可以仅向特定的表和数据库授予权限。要使用这一策略,请将 user input placeholders 替换为您自己的信息。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "S3TableAccessViaGlueFederation", "Effect": "Allow", "Action": [ "glue:GetTable", "glue:GetDatabase", "glue:UpdateTable" ], "Resource": [ "arn:aws:glue:region:account-id:catalog/s3tablescatalog/*", "arn:aws:glue:region:account-id:catalog/s3tablescatalog", "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/*", "arn:aws:glue:region:account-id:table/*/*" ] }, { "Sid": "S3DeliveryErrorBucketPermission", "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::error delivery bucket", "arn:aws:s3:::error delivery bucket/*" ] }, { "Sid": "RequiredWhenUsingKinesisDataStreamsAsSource", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:account-id:stream/stream-name" }, { "Sid": "RequiredWhenDoingMetadataReadsANDDataAndMetadataWriteViaLakeformation", "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": "*" }, { "Sid": "RequiredWhenUsingKMSEncryptionForS3ErrorBucketDelivery", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": [ "arn:aws:kms:region:account-id:key/KMS-key-id" ], "Condition": { "StringEquals": { "kms:ViaService": "s3.region.amazonaws.com" }, "StringLike": { "kms:EncryptionContext:aws:s3:arn": "arn:aws:s3:::error delivery bucket/prefix*" } } }, { "Sid": "LoggingInCloudWatch", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:log-group-name:log-stream:log-stream-name" ] }, { "Sid": "RequiredWhenAttachingLambdaToFirehose", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": [ "arn:aws:lambda:region:account-id:function:function-name:function-version" ] } ] }

    此策略有一些语句,支持访问 Kinesis Data Streams、调用 Lambda 函数和访问 AWS KMS 密钥。如果您不使用这些资源中的任何一个,则可以删除相应的语句。

    如果启用了错误日志记录,则 Firehose 还会将数据传输错误发送到您的 CloudWatch 日志组和流。为此,您必须配置日志组和日志流名称。对于日志组和日志流名称,请参阅 Monitor Amazon Data Firehose Using CloudWatch Logs

  5. 创建策略后,创建一个 IAM 角色,其中 AWS 服务可信实体类型

  6. 对于服务或使用案例,选择 Kinesis。对于使用案例,选择 Kinesis Firehose

  7. 选择下一步,然后选择之前创建的策略。

  8. 为您的角色命名。检查角色详细信息,然后选择创建角色。该角色将具有以下信任策略。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sts:AssumeRole" ], "Principal": { "Service": [ "firehose.amazonaws.com" ] } } ] }

创建到 S3 表的 Firehose 流

以下过程说明如何使用控制台创建 Firehose 流来向 S3 表传输数据。要设置到 S3 表的 Firehose 流,需要满足以下先决条件。

要在配置流时向 Firehose 提供路由信息,可以使用命名空间作为数据库名称和该命名空间中表的名称。可以在 Firehose 流配置的“唯一键”部分使用这些值,来将数据路由到单个表。也可以使用这些值通过 JSON 查询表达式路由到表。有关更多信息,请参阅 Route incoming records to a single Iceberg table

设置到 S3 表的 Firehose 流(控制台)
  1. https://console.aws.amazon.com/firehose/ 中打开 Firehose 控制台。

  2. 选择创建 Firehose 流

  3. 对于,选择下列源之一:

    • Amazon Kinesis Data Streams

    • Amazon MSK

    • 直接 PUT

  4. 对于目标,选择 Apache Iceberg 表

  5. 输入 Firehose 流名称

  6. 配置源设置

  7. 对于目标设置,选择当前账户以流式传输到您账户中的表,或者选择跨账户以流式传输到其它账户中的表。

    • 对于当前账户中的表,请从目录下拉列表中选择您的 S3 表类数据存储服务目录。

    • 对于跨账户中的表,请在另一个账户中输入您要流式传输到的目录的目录 ARN

  8. 使用唯一密钥配置、JSONQuery 表达式或在 Lambda 函数中配置数据库和表名称。有关更多信息,请参阅《Amazon Data Firehose Developer Guide》中的 Route incoming records to a single Iceberg tableRoute incoming records to different Iceberg tables

  9. 备份设置下,指定 S3 备份存储桶

  10. 对于高级设置下的现有 IAM 角色,选择您为 Firehose 创建的 IAM 角色。

  11. 选择创建 Firehose 流

有关您可以为流配置的其它设置的更多信息,请参阅《Amazon Data Firehose Developer Guide》中的 Set up the Firehose stream