终止支持通知:2026 年 10 月 7 日, AWS 将停止对的支持。 AWS IoT Greengrass Version 1 2026 年 10 月 7 日之后,您将无法再访问这些 AWS IoT Greengrass V1 资源。如需了解更多信息,请访问迁移自 AWS IoT Greengrass Version 1。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Kinesis Firehose
Kinesis Firehose 连接器通过亚马逊数据 Firehose 传输流将数据发布到亚马逊 S3、亚马逊 Redshift 或亚马逊服务等目的地。 OpenSearch
此连接器是 Kinesis 传输流的数据创建器。它接收 MQTT 主题上的输入数据,并将这些数据发送到指定的传输流。然后,该传输流将数据记录发送到已配置的目标(例如,S3 存储桶)。
此连接器具有以下版本。
版本 |
进行筛选 |
5 |
arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5
|
4 |
arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4
|
3 |
arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3
|
2 |
arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2
|
1 |
arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1
|
有关版本更改的信息,请参阅更改日志。
要求
此连接器具有以下要求:
- Version 4 - 5
-
-
AWS IoT Greengrass 核心软件 v1.9.3 或更高版本。
-
Python 版本 3.7 或 3.8 已安装在核心设备上,并已添加到 PATH 环境变量中。
要使用 Python 3.8,请运行以下命令来创建从默认 Python 3.7 安装文件夹到已安装的 Python 3.8 二进制文件的符号链接。
sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7
这会将设备配置为满足 AWS IoT Greengrass的 Python 要求。
-
一个已配置的 Kinesis 传输流。有关更多信息,请参阅《Amazon Kinesis Firehose 开发人员指南》中的创建 Amazon Data Firehose 传输流。
-
Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecord 和 firehose:PutRecordBatch 操作,如以下示例 IAM policy 中所示。
JSON
- JSON
-
{
"Version":"2012-10-17",
"Statement":[
{
"Sid":"Stmt1528133056761",
"Action":[
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Effect":"Allow",
"Resource":[
"arn:aws:firehose:us-east-1:123456789012:deliverystream/stream-name"
]
}
]
}
利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM policy 应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。
对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)或管理 Greengrass 组角色 (CLI)。
- Versions 2 - 3
-
-
AWS IoT Greengrass 核心软件 v1.7 或更高版本。
-
Python 版本 2.7 已安装在核心设备上,并已添加到 PATH 环境变量中。
-
一个已配置的 Kinesis 传输流。有关更多信息,请参阅《Amazon Kinesis Firehose 开发人员指南》中的创建 Amazon Data Firehose 传输流。
-
Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecord 和 firehose:PutRecordBatch 操作,如以下示例 IAM policy 中所示。
JSON
- JSON
-
{
"Version":"2012-10-17",
"Statement":[
{
"Sid":"Stmt1528133056761",
"Action":[
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Effect":"Allow",
"Resource":[
"arn:aws:firehose:us-east-1:123456789012:deliverystream/stream-name"
]
}
]
}
利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM policy 应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。
对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)或管理 Greengrass 组角色 (CLI)。
- Version 1
-
-
AWS IoT Greengrass 核心软件 v1.7 或更高版本。
-
Python 版本 2.7 已安装在核心设备上,并已添加到 PATH 环境变量中。
-
一个已配置的 Kinesis 传输流。有关更多信息,请参阅《Amazon Kinesis Firehose 开发人员指南》中的创建 Amazon Data Firehose 传输流。
-
Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecord 操作,如以下示例 IAM policy 中所示。
JSON
- JSON
-
{
"Version":"2012-10-17",
"Statement":[
{
"Sid":"Stmt1528133056761",
"Action":[
"firehose:PutRecord"
],
"Effect":"Allow",
"Resource":[
"arn:aws:firehose:us-east-1:123456789012:deliverystream/stream-name"
]
}
]
}
利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM policy 应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。
对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)或管理 Greengrass 组角色 (CLI)。
连接器参数
该连接器提供以下参数:
- Versions 5
-
DefaultDeliveryStreamArn
-
要将数据发送到的默认 Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。
组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求。
AWS IoT 控制台中的显示名称:默认传送流 ARN
必需:true
类型:string
有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$
DeliveryStreamQueueSize
-
在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。
AWS IoT 控制台中的显示名称:要缓冲的最大记录数(每个流)
必需:true
类型:string
有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$
MemorySize
-
要分配给此连接器的内存量(以 KB 为单位)。
AWS IoT 控制台中的显示名称:内存大小
必需:true
类型:string
有效模式:^[0-9]+$
PublishInterval
-
将记录发布到 Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。
AWS IoT 控制台中的显示名称:发布间隔
必需:true
类型:string
有效值:0 - 900
有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900
IsolationMode
-
此连接器的容器化模式。默认值为GreengrassContainer,这意味着连接器在 AWS IoT Greengrass 容器内的隔离运行时环境中运行。
AWS IoT 控制台中的显示名称:容器隔离模式
必需:false
类型:string
有效值:GreengrassContainer 或 NoContainer
有效模式:^NoContainer$|^GreengrassContainer$
- Versions 2 - 4
-
DefaultDeliveryStreamArn
-
要将数据发送到的默认 Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。
组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求。
AWS IoT 控制台中的显示名称:默认传送流 ARN
必需:true
类型:string
有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$
DeliveryStreamQueueSize
-
在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。
AWS IoT 控制台中的显示名称:要缓冲的最大记录数(每个流)
必需:true
类型:string
有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$
MemorySize
-
要分配给此连接器的内存量(以 KB 为单位)。
AWS IoT 控制台中的显示名称:内存大小
必需:true
类型:string
有效模式:^[0-9]+$
PublishInterval
-
将记录发布到 Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。
AWS IoT 控制台中的显示名称:发布间隔
必需:true
类型:string
有效值:0 - 900
有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900
- Version 1
-
DefaultDeliveryStreamArn
-
要将数据发送到的默认 Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。
组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求。
AWS IoT 控制台中的显示名称:默认传送流 ARN
必需:true
类型:string
有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$
创建连接器示例 (AWS CLI)
以下 CLI 命令将创建一个 ConnectorDefinition,其初始版本包含该连接器。
aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{
"Connectors": [
{
"Id": "MyKinesisFirehoseConnector",
"ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5",
"Parameters": {
"DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name",
"DeliveryStreamQueueSize": "5000",
"MemorySize": "65535",
"PublishInterval": "10",
"IsolationMode" : "GreengrassContainer"
}
}
]
}'
在 AWS IoT Greengrass 控制台中,您可以从群组的 “连接器” 页面添加连接器。有关更多信息,请参阅 Greengrass 连接器入门(控制台)。
此连接器接受 MQTT 主题上的流内容,然后将这些内容发送到目标传输流。它接受两种类型的输入数据:
- Versions 2 - 5
-
- 主题筛选条件:
kinesisfirehose/message
-
使用此主题发送包含 JSON 数据的消息。
- 消息属性
-
request
-
要发送到传输流和目标传输流(如果不同于默认流)的数据。
必需:true
类型:包含以下属性的 object:
data
-
要发送到传输流的数据。
必需:true
类型:string
delivery_stream_arn
-
目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。
必需:false
类型:string
有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$
id
-
请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。
必需:false
类型:string
有效模式:.*
- 示例输入
-
{
"request": {
"delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name",
"data": "Data to send to the delivery stream."
},
"id": "request123"
}
- 主题筛选条件:
kinesisfirehose/message/binary/#
-
使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。
要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123。
如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。
- Version 1
-
- 主题筛选条件:
kinesisfirehose/message
-
使用此主题发送包含 JSON 数据的消息。
- 消息属性
-
request
-
要发送到传输流和目标传输流(如果不同于默认流)的数据。
必需:true
类型:包含以下属性的 object:
data
-
要发送到传输流的数据。
必需:true
类型:string
delivery_stream_arn
-
目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。
必需:false
类型:string
有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$
id
-
请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。
必需:false
类型:string
有效模式:.*
- 示例输入
-
{
"request": {
"delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name",
"data": "Data to send to the delivery stream."
},
"id": "request123"
}
- 主题筛选条件:
kinesisfirehose/message/binary/#
-
使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。
要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123。
如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。
输出数据
此连接器将状态信息发布为 MQTT 主题的输出数据。
- Versions 2 - 5
-
- 订阅中的主题筛选条件
-
kinesisfirehose/message/status
- 示例输出
-
响应将包含批次中发送的每条数据记录的状态。
{
"response": [
{
"ErrorCode": "error",
"ErrorMessage": "test error",
"id": "request123",
"status": "fail"
},
{
"firehose_record_id": "xyz2",
"id": "request456",
"status": "success"
},
{
"firehose_record_id": "xyz3",
"id": "request890",
"status": "success"
}
]
}
如果连接器检测到可重试的错误(如连接错误),它会在下一批次中重试发布。指数退避由 SDK 处理。 AWS 失败并带可重试的错误的请求将添加回队列的结尾以供进一步发布。
- Version 1
-
- 订阅中的主题筛选条件
-
kinesisfirehose/message/status
- 示例输出:成功
-
{
"response": {
"firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm",
"status": "success"
},
"id": "request123"
}
- 示例输出:失败
-
{
"response" : {
"error": "ResourceNotFoundException",
"error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.",
"status": "fail"
},
"id": "request123"
}
用法示例
使用以下概括步骤设置可用于尝试连接器的示例 Python 3.7 Lambda 函数。
确保满足连接器的要求。
对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)或管理 Greengrass 组角色 (CLI)。
-
创建并发布将输入数据发送到连接器的 Lambda 函数。
将示例代码保存为 PY 文件。下载并解压适用于 Python 的AWS IoT Greengrass Core 软件开发工具包。然后,创建一个 zip 包,其中在根级别包含 PY 文件和 greengrasssdk 文件夹。此 zip 包是您上传到 AWS Lambda的部署包。
创建 Python 3.7 Lambda 函数后,请发布函数版本并创建别名。
-
配置 Greengrass 组。
-
通过别名来添加 Lambda 函数(推荐)。将 Lambda 生命周期配置为长时间生存(或在 CLI 中设置为 "Pinned": true)。
-
添加连接器并配置其参数。
-
添加允许连接器接收 JSON 输入数据并针对支持的主题筛选条件发送输出数据的订阅。
-
部署组。
-
在 AWS IoT 控制台的 “测试” 页面上,订阅输出数据主题以查看来自连接器的状态消息。示例 Lambda 函数是长时间生存的,并且在部署组后立即开始发送消息。
完成测试后,您可以将 Lambda 生命周期设置为按需(或在 CLI 中设置为 "Pinned": false)并部署组。这会阻止函数发送消息。
示例
以下示例 Lambda 函数向连接器发送一条输入消息。此消息包含 JSON 数据。
import greengrasssdk
import time
import json
iot_client = greengrasssdk.client('iot-data')
send_topic = 'kinesisfirehose/message'
def create_request_with_all_fields():
return {
"request": {
"data": "Message from Firehose Connector Test"
},
"id" : "req_123"
}
def publish_basic_message():
messageToPublish = create_request_with_all_fields()
print("Message To Publish: ", messageToPublish)
iot_client.publish(topic=send_topic,
payload=json.dumps(messageToPublish))
publish_basic_message()
def lambda_handler(event, context):
return
许可证
Kinesis Firehose 连接器包含以下第三方软件/许可:
该连接器在 Greengrass Core 软件许可协议下发布。
更改日志
下表介绍每个版本连接器的更改。
版本 |
更改 |
5 |
增加了用于配置连接器容器化模式的 IsolationMode 参数。 |
4 |
已将 Lambda 运行时升级到 Python 3.7,这会更改运行时要求。 |
3 |
修复以减少过多的日志记录,并修复了其他较小的错误。
|
2 |
添加了对按指定间隔向 Firehose 发送批处理数据记录的支持。
|
1 |
初始版本。
|
Greengrass 组在一个时间上只能包含一个版本的连接器。有关升级连接器版本的信息,请参阅升级连接器版本。
另请参阅