Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Erste Schritte mit Kinesis Data Streams für Amazon DynamoDB
In diesem Abschnitt wird beschrieben, wie Kinesis Data Streams für Amazon DynamoDB-Tabellen mit der Amazon DynamoDB DynamoDB-Konsole, der AWS Command Line Interface (AWS CLI) und der API verwendet wird.
Erstellen eines aktiven Amazon-Kinesis-Daten-Streams
Alle diese Beispiele verwenden die Music-DynamoDB-Tabelle, die als Teil des Erste Schritte mit DynamoDB-Tutorials erstellt wurde.
Weitere Informationen zum Erstellen von Konsumenten und zum Verbinden Ihres Kinesis Data Streams mit anderen AWS -Services finden Sie unter Lesen von Daten aus Amazon Kinesis Data Streams im Amazon-Kinesis-Data-Streams-Entwicklerhandbuch.
Wenn Sie zum ersten Mal KDS-Shards verwenden, empfehlen wir, Ihre Shards so einzustellen, dass sie den Nutzungsmustern entsprechend hoch- und herunterskaliert werden. Nachdem Sie mehr Daten zu den Nutzungsmustern gesammelt haben, können Sie die Shards in Ihrem Datenstrom entsprechend anpassen.
- Console
-
-
Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis/.
-
Klicken Sie auf Create data stream (Datenstrom erstellen) und befolgen Sie die Anweisungen, um einen Stream mit dem Namen samplestream zu erstellen.
-
Öffnen Sie die DynamoDB-Konsole unter. https://console.aws.amazon.com/dynamodb/
-
Klicken Sie im Navigationsbereich auf der linken Seite der Konsole auf Tables (Tabellen).
-
Wählen Sie die Tabelle Music (Musik).
-
Wählen Sie die Registerkarte Exports and streams (Exporte und Streams).
-
(Optional) Unter Details zu Amazon-Kinesis-Daten-Streams können Sie die Zeitstempelgenauigkeit des Datensatzes von Mikrosekunde (Standard) auf Millisekunde ändern.
-
Klicken Sie auf samplestream aus der Dropdown-Liste.
-
Klicken Sie auf die Schaltfläche Einschalten.
- AWS CLI
-
-
Erstellen Sie einen Kinesis Data Stream mit dem Namen samplestream, indem Sie den Befehl create-stream wählen.
aws kinesis create-stream --stream-name samplestream --shard-count 3
Sehen Sie sich Überlegungen zur Shard-Verwaltung für Kinesis Data Streams an, bevor Sie die Anzahl der Shards für den Kinesis Data Stream festlegen.
-
Überprüfen Sie, ob der Kinesis Stream aktiv und einsatzbereit ist, indem Sie den Befehl Stream beschreiben auswählen.
aws kinesis describe-stream --stream-name samplestream
-
Aktivieren Sie Kinesis-Streaming für die DynamoDB-Tabelle mithilfe des DynamoDB-Befehls enable-kinesis-streaming-destination. Ersetzen Sie den stream-arn-Wert durch den Wert, der im vorherigen Schritt von describe-stream zurückgegeben wurde. Aktivieren Sie optional das Streaming mit einer höheren Genauigkeit (Mikrosekunden) der für jeden Datensatz zurückgegebenen Zeitstempelwerte.
So aktivieren Sie das Streaming mit einer Zeitstempelgenauigkeit im Mikrosekundenbereich:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
So aktivieren Sie das Streaming mit der standardmäßigen Zeitstempelgenauigkeit (Millisekunde):
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
Überprüfen Sie, ob Kinesis-Streaming in der Tabelle aktiv ist, mithilfe des DynamoDB-describe-kinesis-streaming-destination-Befehls.
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
Schreiben Sie Daten in die DynamoDB-Tabelle mithilfe der put-item, wie in dem Entwicklerhandbuch von DynamoDB angegeben ist.
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Verwenden Sie den Kinesis-CLI-Befehl get-records, um den Inhalt des Kinesis Streams abzurufen. Verwenden Sie dann den folgenden Codeausschnitt, um den Streaminhalt zu deserialisieren.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
Befolgen Sie die Anweisungen im Kinesis-Data-Streams-Entwicklerhandbuch, um einen Kinesis Data Stream mit samplestream Java-Namen zu erstellen.
Sehen Sie sich Überlegungen zur Shard-Verwaltung für Kinesis Data Streams an, bevor Sie die Anzahl der Shards für den Kinesis Data Stream festlegen.
-
Verwenden Sie den folgenden Codeausschnitt, um Kinesis-Streaming für die DynamoDB-Tabelle zu aktivieren. Aktivieren Sie optional das Streaming mit einer höheren Genauigkeit (Mikrosekunden) der für jeden Datensatz zurückgegebenen Zeitstempelwerte.
So aktivieren Sie das Streaming mit einer Zeitstempelgenauigkeit im Mikrosekundenbereich:
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
So aktivieren Sie das Streaming mit der standardmäßigen Zeitstempelgenauigkeit (Millisekunde):
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
Befolgen Sie die Anweisungen im Kinesis-Data-Streams-Entwicklerhandbuch, um aus dem erstellten Datenstrom zu lesen.
-
Verwenden Sie dann den folgenden Codeausschnitt, um den Streaminhalt zu deserialisieren
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
Vornehmen von Änderungen an einem aktiven Amazon-Kinesis-Daten-Stream
In diesem Abschnitt wird beschrieben, wie Sie mithilfe der Konsole und der API Änderungen an einem aktiven Kinesis Data Streams for DynamoDB-Setup vornehmen. AWS CLI
AWS-Managementkonsole
AWS CLI
-
Rufen Sie describe-kinesis-streaming-destination auf, um zu bestätigen, dass der Stream ACTIVE ist.
-
Rufen Sie UpdateKinesisStreamingDestination auf, wie in diesem Beispiel:
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
Rufen Sie describe-kinesis-streaming-destination auf, um zu bestätigen, dass der Stream UPDATING ist.
-
Rufen Sie describe-kinesis-streaming-destination regelmäßig auf, bis der Streaming-Status wieder ACTIVE ist. Normalerweise dauert es bis zu 5 Minuten, bis die Aktualisierungen der Zeitstempelgenauigkeit wirksam werden. Sobald der Status aktualisiert wurde, ist die Änderung abgeschlossen, und der neue Genauigkeitswert wird auf künftige Datensätze angewendet.
-
Schreiben Sie mit putItem in die Tabelle.
-
Verwenden Sie den Kinesis-Befehl get-records, um den Inhalt des Streams abzurufen.
-
Vergewissern Sie sich, dass die ApproximateCreationDateTime der Schreibvorgänge die gewünschte Genauigkeit haben.
Java-API
-
Stellen Sie einen Codeausschnitt bereit, der eine UpdateKinesisStreamingDestination-Anfrage und eine UpdateKinesisStreamingDestination-Antwort erstellt.
-
Stellen Sie einen Codeausschnitt bereit, der eine DescribeKinesisStreamingDestination-Anfrage und eine DescribeKinesisStreamingDestination response erstellt.
-
Rufen Sie describe-kinesis-streaming-destination regelmäßig auf, bis der Streaming-Status wieder ACTIVE ist. Dies zeigt an, dass die Änderung abgeschlossen ist und der neue Genauigkeitswert auf künftige Datensätze angewendet wird.
-
Führen Sie Schreibvorgänge in der Tabelle durch.
-
Lesen Sie Daten aus dem Stream und deserialisieren Sie den Streaminhalt.
-
Vergewissern Sie sich, dass die ApproximateCreationDateTime der Schreibvorgänge die gewünschte Genauigkeit haben.