Amazon DynamoDB 專用 Kinesis Data Streams 入門 - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon DynamoDB 專用 Kinesis Data Streams 入門

本節說明如何將 Amazon DynamoDB 資料表的 Kinesis Data Streams 與 Amazon DynamoDB 主控台、 AWS Command Line Interface (AWS CLI) 和 API 搭配使用。

建立作用中的 Amazon Kinesis 資料串流

所有這些範例都使用 Music DynamoDB 資料表,該資料表是在 DynamoDB 入門教學課程中建立的。

若要進一步了解如何建置取用者並將 Kinesis 資料串流連線至其他 AWS 服務,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的從 Kinesis Data Streams 讀取資料

注意

第一次使用 KDS 碎片時,建議您將碎片設定為隨著使用模式向上擴展和縮減。累積更多使用模式的相關資料後,您可以調整串流中的碎片以進行配對。

Console
  1. 登入 AWS 管理主控台 並開啟位於 https://https://console.aws.amazon.com/kinesis/ 的 Kinesis 主控台。

  2. 選擇 Create data stream (建立資料串流),並依照說明來建立名為 samplestream 的串流。

  3. 請在 https://console.aws.amazon.com/dynamodb/ 開啟 DynamoDB 主控台。

  4. 在主控台左側的導覽窗格中,選擇 Tables (資料表)。

  5. 選擇 Music (音樂) 資料表。

  6. 選擇 Exports and streams (匯出與串流) 索引標籤。

  7. (選用) 您可以在 Amazon Kinesis 資料串流詳細資訊下,將記錄時間戳記精確度從微秒 (預設) 變更為毫秒。

  8. 從下拉式清單選擇 samplestream (範例串流)。

  9. 選擇 Turn On (開啟) 按鈕。

AWS CLI
  1. 使用 create-stream 命令建立名為 samplestream 的 Kinesis 資料串流。

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。

  2. 使用 describe-stream 命令確認 Kinesis 串流是否處於作用中狀態且可供使用。

    aws kinesis describe-stream --stream-name samplestream
  3. 使用 DynamoDB enable-kinesis-streaming-destination 命令在 DynamoDB 資料表上啟用 Kinesis 串流功能。將 stream-arn 值替換為上一個步驟中傳回的值 describe-stream。可選擇性啟用時間戳記值精確度更精細 (微秒級) 的串流,傳回至每筆記錄上。

    啟用微秒級時間戳記精確度串流:

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    或啟用預設時間戳記精確度串流 (毫秒級):

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. 使用 DynamoDB describe-kinesis-streaming-destination 命令確認資料表上的 Kinesis 串流是否處於作用中狀態。

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. 使用 put-item 命令將資料寫入 DynamoDB 資料表,如《DynamoDB 開發人員指南》中所述。

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. 使用 Kinesis get-records CLI 命令來擷取 Kinesis 串流內容。然後使用下面的程式碼片段來將串流內容還原序列化。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. 遵循《Kinesis Data Streams 開發人員指南》中的說明,使用 Java 建立名為 samplestream 的 Kinesis 資料串流。

    請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。

  2. 使用以下程式碼片段來啟用 DynamoDB 資料表上的 Kinesis 串流。可選擇性啟用時間戳記值精確度更精細 (微秒級) 的串流,傳回至每筆記錄上。

    啟用微秒級時間戳記精確度串流:

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    或啟用預設時間戳記精確度串流 (毫秒級):

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. 遵循《Kinesis Data Streams 開發人員指南》中的說明,從建立的資料串流中進行讀取

  4. 然後使用下面的程式碼片段將串流內容還原序列化

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }

變更作用中的 Amazon Kinesis 資料串流

本節說明如何使用 主控台 AWS CLI 和 API 來變更作用中的 Kinesis Data Streams for DynamoDB 設定。

AWS 管理主控台

  1. 請在 https://console.aws.amazon.com/dynamodb/ 開啟 DynamoDB 主控台。

  2. 前往您的資料表。

  3. 選擇匯出與串流

AWS CLI

  1. 呼叫 describe-kinesis-streaming-destination 以確認串流為 ACTIVE

  2. 呼叫 UpdateKinesisStreamingDestination,例如在此範例中:

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. 呼叫 describe-kinesis-streaming-destination 以確認串流為 UPDATING

  4. 定期呼叫 describe-kinesis-streaming-destination,直到串流狀態再次變成 ACTIVE 為止。時間戳記精確度更新通常需要 5 分鐘才會生效。一旦此狀態更新,表示更新已完成,且新的精確度值將套用至未來的記錄。

  5. 使用 putItem 寫入資料表。

  6. 使用 Kinesis get-records 命令來取得串流內容。

  7. 確認寫入的 ApproximateCreationDateTime 具有所需的精確度。

Java API

  1. 提供可建構 UpdateKinesisStreamingDestination 請求和 UpdateKinesisStreamingDestination 回應的程式碼片段。

  2. 提供可建構 DescribeKinesisStreamingDestination 請求和 DescribeKinesisStreamingDestination response 回應的程式碼片段。

  3. 定期呼叫 describe-kinesis-streaming-destination,直到串流狀態再次變成 ACTIVE,表示更新已完成,且新的精確度值將套用至未來的記錄。

  4. 執行對資料表的寫入。

  5. 從串流讀取並還原序列化串流內容。

  6. 確認寫入的 ApproximateCreationDateTime 具有所需的精確度。