将批量加载与 AWS CLI - Amazon Timestream

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将批量加载与 AWS CLI

设置

要开始使用批量加载,请执行以下步骤。

  1. 按照中的说明 AWS CLI 进行安装访问 Amazon Timestream 以使用 LiveAnalytics AWS CLI

  2. 运行以下命令以验证 Timestream CLI 命令是否已更新。验证 create-batch-load-task是否在列表中。

    aws timestream-write help

  3. 按照 准备批量加载数据文件 的说明准备数据来源。

  4. 按照 访问 Amazon Timestream 以使用 LiveAnalytics AWS CLI 的说明创建数据库和表。

  5. 为报告输出创建 S3 存储桶。存储桶必须位于同一区域。有关存储桶的更多信息,请参阅创建、配置和使用 Amazon S3 存储桶

  6. 创建批量加载任务。要查看步骤,请参阅 创建批量加载任务

  7. 确认任务的状态。要查看步骤,请参阅 描述批量加载任务

创建批量加载任务

您可以使用 create-batch-load-task 命令创建批量加载任务。使用 CLI 创建批量加载任务时,可使用 JSON 参数 cli-input-json,该参数允许您将参数聚合到单个 JSON 片段中。您还可以使用其他多个参数(包括 data-model-configurationdata-source-configurationreport-configurationtarget-database-nametarget-table-name)将这些细节拆分开。

有关示例,请参阅 创建批量加载任务示例

描述批量加载任务

您可以按以下方式检索批量加载任务描述。

aws timestream-write describe-batch-load-task --task-id <value>

以下是示例响应。

{ "BatchLoadTaskDescription": { "TaskId": "<TaskId>", "DataSourceConfiguration": { "DataSourceS3Configuration": { "BucketName": "test-batch-load-west-2", "ObjectKeyPrefix": "sample.csv" }, "CsvConfiguration": {}, "DataFormat": "CSV" }, "ProgressReport": { "RecordsProcessed": 2, "RecordsIngested": 0, "FileParseFailures": 0, "RecordIngestionFailures": 2, "FileFailures": 0, "BytesIngested": 119 }, "ReportConfiguration": { "ReportS3Configuration": { "BucketName": "test-batch-load-west-2", "ObjectKeyPrefix": "<ObjectKeyPrefix>", "EncryptionOption": "SSE_S3" } }, "DataModelConfiguration": { "DataModel": { "TimeColumn": "timestamp", "TimeUnit": "SECONDS", "DimensionMappings": [ { "SourceColumn": "vehicle", "DestinationColumn": "vehicle" }, { "SourceColumn": "registration", "DestinationColumn": "license" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "test", "MultiMeasureAttributeMappings": [ { "SourceColumn": "wgt", "TargetMultiMeasureAttributeName": "weight", "MeasureValueType": "DOUBLE" }, { "SourceColumn": "spd", "TargetMultiMeasureAttributeName": "speed", "MeasureValueType": "DOUBLE" }, { "SourceColumn": "fuel", "TargetMultiMeasureAttributeName": "fuel", "MeasureValueType": "DOUBLE" }, { "SourceColumn": "miles", "TargetMultiMeasureAttributeName": "miles", "MeasureValueType": "DOUBLE" } ] } } }, "TargetDatabaseName": "BatchLoadExampleDatabase", "TargetTableName": "BatchLoadExampleTable", "TaskStatus": "FAILED", "RecordVersion": 1, "CreationTime": 1677167593.266, "LastUpdatedTime": 1677167602.38 } }

列出批量加载任务

您可以按以下方式列出批量加载任务。

aws timestream-write list-batch-load-tasks

输出如下所示。

{ "BatchLoadTasks": [ { "TaskId": "<TaskId>", "TaskStatus": "FAILED", "DatabaseName": "BatchLoadExampleDatabase", "TableName": "BatchLoadExampleTable", "CreationTime": 1677167593.266, "LastUpdatedTime": 1677167602.38 } ] }

恢复批量加载任务

您可以按以下方式恢复批量加载任务。

aws timestream-write resume-batch-load-task --task-id <value>

响应可能表示成功,也可能包含错误信息。

创建批量加载任务示例

  1. 为名为的 LiveAnalytics 数据库BatchLoad和名为的表创建时间流。BatchLoadTest验证并根据需要调整 MemoryStoreRetentionPeriodInHoursMagneticStoreRetentionPeriodInDays 的值。

    aws timestream-write create-database --database-name BatchLoad \ aws timestream-write create-table --database-name BatchLoad \ --table-name BatchLoadTest \ --retention-properties "{\"MemoryStoreRetentionPeriodInHours\": 12, \"MagneticStoreRetentionPeriodInDays\": 100}"
  2. 使用控制台创建 S3 存储桶,并将 sample.csv 文件复制到该位置。您可以通过示例 CSV 下载示例 CSV。

  3. 使用控制台为 Timestream 创建 S3 存储桶 LiveAnalytics ,以便在批量加载任务完成时出现错误时编写报告。

  4. 创建批量加载任务。请务必$REPORT_BUCKET使用您在前面步骤中创建的存储桶替换$INPUT_BUCKET和。

    aws timestream-write create-batch-load-task \ --data-model-configuration "{\ \"DataModel\": {\ \"TimeColumn\": \"timestamp\",\ \"TimeUnit\": \"SECONDS\",\ \"DimensionMappings\": [\ {\ \"SourceColumn\": \"vehicle\"\ },\ {\ \"SourceColumn\": \"registration\",\ \"DestinationColumn\": \"license\"\ }\ ], \"MultiMeasureMappings\": {\ \"TargetMultiMeasureName\": \"mva_measure_name\",\ \"MultiMeasureAttributeMappings\": [\ {\ \"SourceColumn\": \"wgt\",\ \"TargetMultiMeasureAttributeName\": \"weight\",\ \"MeasureValueType\": \"DOUBLE\"\ },\ {\ \"SourceColumn\": \"spd\",\ \"TargetMultiMeasureAttributeName\": \"speed\",\ \"MeasureValueType\": \"DOUBLE\"\ },\ {\ \"SourceColumn\": \"fuel_consumption\",\ \"TargetMultiMeasureAttributeName\": \"fuel\",\ \"MeasureValueType\": \"DOUBLE\"\ },\ {\ \"SourceColumn\": \"miles\",\ \"MeasureValueType\": \"BIGINT\"\ }\ ]\ }\ }\ }" \ --data-source-configuration "{ \"DataSourceS3Configuration\": {\ \"BucketName\": \"$INPUT_BUCKET\",\ \"ObjectKeyPrefix\": \"$INPUT_OBJECT_KEY_PREFIX\" },\ \"DataFormat\": \"CSV\"\ }" \ --report-configuration "{\ \"ReportS3Configuration\": {\ \"BucketName\": \"$REPORT_BUCKET\",\ \"EncryptionOption\": \"SSE_S3\"\ }\ }" \ --target-database-name BatchLoad \ --target-table-name BatchLoadTest

    上述命令返回以下输出。

    { "TaskId": "TaskId " }
  5. 检查任务的进度。请务必$TASK_ID使用在上一步中返回的任务 ID 进行替换。

    aws timestream-write describe-batch-load-task --task-id $TASK_ID

输出示例

{ "BatchLoadTaskDescription": { "ProgressReport": { "BytesIngested": 1024, "RecordsIngested": 2, "FileFailures": 0, "RecordIngestionFailures": 0, "RecordsProcessed": 2, "FileParseFailures": 0 }, "DataModelConfiguration": { "DataModel": { "DimensionMappings": [ { "SourceColumn": "vehicle", "DestinationColumn": "vehicle" }, { "SourceColumn": "registration", "DestinationColumn": "license" } ], "TimeUnit": "SECONDS", "TimeColumn": "timestamp", "MultiMeasureMappings": { "MultiMeasureAttributeMappings": [ { "TargetMultiMeasureAttributeName": "weight", "SourceColumn": "wgt", "MeasureValueType": "DOUBLE" }, { "TargetMultiMeasureAttributeName": "speed", "SourceColumn": "spd", "MeasureValueType": "DOUBLE" }, { "TargetMultiMeasureAttributeName": "fuel", "SourceColumn": "fuel_consumption", "MeasureValueType": "DOUBLE" }, { "TargetMultiMeasureAttributeName": "miles", "SourceColumn": "miles", "MeasureValueType": "DOUBLE" } ], "TargetMultiMeasureName": "mva_measure_name" } } }, "TargetDatabaseName": "BatchLoad", "CreationTime": 1672960381.735, "TaskStatus": "SUCCEEDED", "RecordVersion": 1, "TaskId": "TaskId ", "TargetTableName": "BatchLoadTest", "ReportConfiguration": { "ReportS3Configuration": { "EncryptionOption": "SSE_S3", "ObjectKeyPrefix": "ObjectKeyPrefix ", "BucketName": "amzn-s3-demo-bucket" } }, "DataSourceConfiguration": { "DataSourceS3Configuration": { "ObjectKeyPrefix": "sample.csv", "BucketName": "amzn-s3-demo-source-bucket" }, "DataFormat": "CSV", "CsvConfiguration": {} }, "LastUpdatedTime": 1672960387.334 } }