Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Prise en main de Kinesis Data Streams pour Amazon DynamoDB
Cette section explique comment utiliser les tables Kinesis Data Streams pour Amazon DynamoDB avec la console Amazon DynamoDB, le () et l'API. AWS Command Line Interface AWS CLI
Création d’un flux de données Amazon Kinesis actif
Tous ces exemples utilisent la table DynamoDB Music créée dans le cadre du tutoriel Mise en route avec DynamoDB.
Pour en savoir plus sur la façon de créer des consommateurs et de connecter votre flux de données Kinesis à d’autres services AWS , consultez Lecture de données à partir de Kinesis Data Streams dans le Guide du développeur Amazon Kinesis Data Streams.
Lorsque vous utilisez les partitions KDS pour la première fois, nous vous recommandons de les configurer de manière à ce que leur capacité s’adapte à la hausse ou à la baisse en fonction de vos modèles d’utilisation. Une fois que vous aurez accumulé davantage de données sur les modèles d’utilisation, vous pourrez ajuster les partitions de votre flux en conséquence.
- Console
-
-
Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à l'adresse. https://console.aws.amazon.com/kinesis/
-
Choisissez Create data stream (Créer un flux de données), puis suivez les instructions pour créer un flux appelé samplestream.
-
Ouvrez la console DynamoDB à l'adresse. https://console.aws.amazon.com/dynamodb/
-
Dans le volet de navigation sur le côté gauche de la console, choisissez Tables.
-
Choisissez la table Music.
-
Choisissez l’onglet Exportations et flux.
-
(Facultatif) Sous Informations sur le flux de données Amazon Kinesis, vous pouvez modifier la précision de l’horodatage des enregistrements de microseconde (par défaut) à milliseconde.
-
Choisissez samplestream dans la liste déroulante.
-
Cliquez sur le bouton Activer.
- AWS CLI
-
-
Créez un flux de données Kinesis nommé samplestream à l’aide de la commande create-stream.
aws kinesis create-stream --stream-name samplestream --shard-count 3
Consultez Considérations relatives à la gestion des partitions pour Kinesis Data Streams avant de définir le nombre de partitions du flux de données Kinesis.
-
Vérifiez que le flux Kinesis est actif et prêt pour utilisation à l’aide de la commande describe-stream.
aws kinesis describe-stream --stream-name samplestream
-
Activez le streaming Kinesis sur la table DynamoDB à l’aide de la commande DynamoDB enable-kinesis-streaming-destination. Remplacez la valeur stream-arn par celle que la commande describe-stream a renvoyée à l’étape précédente. Activez éventuellement le streaming avec une précision plus détaillée (microseconde) des valeurs d’horodatage renvoyées sur chaque enregistrement.
Activez le streaming avec une précision d’horodatage de l’ordre de la microseconde :
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
Ou activez le streaming avec la précision d’horodatage par défaut (milliseconde) :
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
Vérifiez si le streaming Kinesis est actif sur la table à l’aide de la commande DynamoDB describe-kinesis-streaming-destination.
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
Écrivez des données dans la table DynamoDB à l’aide de la commande put-item, comme décrit dans le Guide du développeur DynamoDB.
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Utilisez la commande CLI get-records de Kinesis pour extraire le contenu du flux Kinesis. Ensuite, utilisez l’extrait de code suivant pour désérialiser le contenu du flux.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
Suivez les instructions du Guide du développeur Kinesis Data Streams pour créer un flux de données Kinesis nommé samplestream à l’aide de Java.
Consultez Considérations relatives à la gestion des partitions pour Kinesis Data Streams avant de définir le nombre de partitions pour le flux de données Kinesis.
-
Utilisez l’extrait de code suivant pour activer le streaming Kinesis sur la table DynamoDB. Activez éventuellement le streaming avec une précision plus détaillée (microseconde) des valeurs d’horodatage renvoyées sur chaque enregistrement.
Activez le streaming avec une précision d’horodatage de l’ordre de la microseconde :
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
Ou activez le streaming avec la précision d’horodatage par défaut (milliseconde) :
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
Suivez les instructions du Guide du développeur Kinesis Data Streams pour lire le flux de données créé.
-
Utilisez l’extrait de code suivant pour désérialiser le contenu du flux
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
Modification d’un flux de données Amazon Kinesis actif
Cette section décrit comment apporter des modifications à une configuration Kinesis Data Streams pour DynamoDB active à l'aide de la console et de l'API. AWS CLI
AWS Management Console
AWS CLI
-
Appelez describe-kinesis-streaming-destination pour vérifier que le stream est ACTIVE.
-
Appelez UpdateKinesisStreamingDestination, comme dans cet exemple :
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
Appelez describe-kinesis-streaming-destination pour vérifier que le stream est UPDATING.
-
Appelez describe-kinesis-streaming-destination régulièrement jusqu’à ce que le statut de streaming redevienne ACTIVE. Les mises à jour de précision de l’horodatage prennent effet généralement sous 5 minutes. Une fois ce statut mis à jour, cela indique que la mise à jour est terminée et que la nouvelle valeur de précision sera appliquée aux futurs enregistrements.
-
Écrivez dans la table à l’aide de putItem.
-
Utilisez la commande get-records de Kinesis pour récupérer le contenu du flux.
-
Vérifiez que la valeur ApproximateCreationDateTime des écritures possède la précision souhaitée.
API Java
-
Fournissez un extrait de code qui construit une demande UpdateKinesisStreamingDestination et une réponse UpdateKinesisStreamingDestination.
-
Fournissez un extrait de code qui construit une demande DescribeKinesisStreamingDestination et une réponse DescribeKinesisStreamingDestination response.
-
Appelez describe-kinesis-streaming-destination régulièrement jusqu’à ce que le statut de streaming redevienne ACTIVE, indiquant que la mise à jour est terminée et que la nouvelle valeur de précision sera appliquée aux futurs enregistrements.
-
Effectuez des écritures sur la table.
-
Lisez le flux et désérialisez son contenu.
-
Vérifiez que la valeur ApproximateCreationDateTime des écritures possède la précision souhaitée.