DynamoDB Streams 하위 수준 API: Java 예시 - Amazon DynamoDB

DynamoDB Streams 하위 수준 API: Java 예시

참고

이 페이지의 코드는 완전한 코드가 아니며, Amazon DynamoDB Streams 사용 시나리오 중 일부만 처리합니다. DynamoDB에서 스트림 레코드를 사용하기 위해 권장되는 방법은 DynamoDB Streams Kinesis 어댑터를 사용하여 스트림 레코드 처리 섹션에 기술된 것처럼 Amazon Kinesis 어댑터를 통해 Kinesis Client Library(KCL)를 사용하는 것입니다.

이 섹션에는 실행 중인 DynamoDB Streams를 보여 주는 Java 프로그램이 나와 있습니다. 이 프로그램에서는 다음 작업을 수행합니다.

  1. 스트림을 활성화하여 DynamoDB 테이블을 생성합니다.

  2. 이 테이블의 스트림 설정을 설명합니다.

  3. 테이블의 데이터를 수정합니다.

  4. 스트림의 샤드를 설명합니다.

  5. 샤드의 스트림 레코드를 읽습니다.

  6. 하위 샤드를 가져오고 레코드를 계속 읽습니다.

  7. 그리고 정리합니다.

프로그램을 실행하면 다음과 비슷한 출력 화면이 나타납니다.

Testing Streams Demo Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id Waiting for TestTableForStreams to be created... Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208 Stream enabled: true Update view type: NEW_AND_OLD_IMAGES Performing write activities on TestTableForStreams Processing item 1 of 100 Processing item 2 of 100 Processing item 3 of 100 ... Processing item 100 of 100 Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},} Shard iterator: EjYFEkX2a26eVTWe... StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES) StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES) StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES) ... Deleting the table... Table StreamsDemoTable deleted. Demo complete
예제
import java.util.HashMap; import java.util.List; import java.util.Map; import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeAction; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.model.ShardFilter; import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import software.amazon.awssdk.services.dynamodb.model.StreamSpecification; import software.amazon.awssdk.services.dynamodb.model.TableDescription; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter; public class StreamsLowLevelDemo { public static void main(String[] args) { final String usage = "Testing Streams Demo"; try { System.out.println(usage); String tableName = "StreamsDemoTable"; String key = "Id"; System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key); Region region = Region.US_WEST_2; DynamoDbClient ddb = DynamoDbClient.builder() .region(region) .build(); DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder() .region(region) .build(); DescribeTableRequest describeTableRequest = DescribeTableRequest.builder() .tableName(tableName) .build(); TableDescription tableDescription = null; try{ tableDescription = ddb.describeTable(describeTableRequest).table(); }catch (Exception e){ System.out.println("Table " + tableName + " does not exist."); tableDescription = createTable(ddb, tableName, key); } // Print the stream settings for the table String streamArn = tableDescription.latestStreamArn(); StreamSpecification streamSpec = tableDescription.streamSpecification(); System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " + streamArn); System.out.println("Stream enabled: " + streamSpec.streamEnabled()); System.out.println("Update view type: " + streamSpec.streamViewType()); System.out.println(); // Generate write activity in the table System.out.println("Performing write activities on " + tableName); int maxItemCount = 100; for (Integer i = 1; i <= maxItemCount; i++) { System.out.println("Processing item " + i + " of " + maxItemCount); // Write a new item putItemInTable(key, i, tableName, ddb); // Update the item updateItemInTable(key, i, tableName, ddb); // Delete the item deleteDynamoDBItem(key, i, tableName, ddb); } // Process Stream processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName); // Delete the table System.out.println("Deleting the table..."); DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder() .tableName(tableName) .build(); ddb.deleteTable(deleteTableRequest); System.out.println("Table " + tableName + " deleted."); System.out.println("Demo complete"); ddb.close(); } catch (Exception e) { System.out.println("Error: " + e.getMessage()); } } private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) { // Get all the shard IDs from the stream. Note that DescribeStream returns // the shard IDs one page at a time. String lastEvaluatedShardId = null; do { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamArn(streamArn) .exclusiveStartShardId(lastEvaluatedShardId).build(); DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest); List<Shard> shards = describeStreamResponse.streamDescription().shards(); // Process each shard on this page fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards); // If LastEvaluatedShardId is set, then there is // at least one more page of shard IDs to retrieve lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId(); } while (lastEvaluatedShardId != null); } private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) { for (Shard shard : shards) { String shardId = shard.shardId(); System.out.println("Shard: " + shard); // Get an iterator for the current shard GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder() .streamArn(streamArn).shardId(shardId) .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build(); GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest); String currentShardIter = getShardIteratorResult.shardIterator(); // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY). // To prevent running the loop until the Shard is sealed, we process only the // items that were written into DynamoDB and then exit. int processedRecordCount = 0; while (currentShardIter != null && processedRecordCount < maxItemCount) { // Use the shard iterator to read the stream records GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder() .shardIterator(currentShardIter).build(); GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.records(); for (Record record : records) { System.out.println(" " + record.dynamodb()); } processedRecordCount += records.size(); currentShardIter = getRecordsResult.nextShardIterator(); } if (currentShardIter == null){ System.out.println("Shard has been fully processed. Shard iterator is null."); System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards"); DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder() .streamArn(streamArn) .shardFilter(ShardFilter.builder() .type(ShardFilterType.CHILD_SHARDS) .shardId(shardId).build()) .build(); DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards); fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards()); } } } private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) { Map<String, AttributeValue> item = new HashMap<>(); item.put(key, AttributeValue.builder() .s(i.toString()) .build()); item.put("Message", AttributeValue.builder() .s("New Item!") .build()); PutItemRequest request = PutItemRequest.builder() .tableName(tableName) .item(item) .build(); ddb.putItem(request); } private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) { HashMap<String, AttributeValue> itemKey = new HashMap<>(); itemKey.put(key, AttributeValue.builder() .s(i.toString()) .build()); HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>(); updatedValues.put("Message", AttributeValueUpdate.builder() .value(AttributeValue.builder().s("This is an updated item").build()) .action(AttributeAction.PUT) .build()); UpdateItemRequest request = UpdateItemRequest.builder() .tableName(tableName) .key(itemKey) .attributeUpdates(updatedValues) .build(); ddb.updateItem(request); } public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) { HashMap<String, AttributeValue> keyToGet = new HashMap<>(); keyToGet.put(key, AttributeValue.builder() .s(i.toString()) .build()); DeleteItemRequest deleteReq = DeleteItemRequest.builder() .tableName(tableName) .key(keyToGet) .build(); ddb.deleteItem(deleteReq); } public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) { DynamoDbWaiter dbWaiter = ddb.waiter(); StreamSpecification streamSpecification = StreamSpecification.builder() .streamEnabled(true) .streamViewType("NEW_AND_OLD_IMAGES") .build(); CreateTableRequest request = CreateTableRequest.builder() .attributeDefinitions(AttributeDefinition.builder() .attributeName(key) .attributeType(ScalarAttributeType.S) .build()) .keySchema(KeySchemaElement.builder() .attributeName(key) .keyType(KeyType.HASH) .build()) .billingMode(BillingMode.PAY_PER_REQUEST) // DynamoDB automatically scales based on traffic. .tableName(tableName) .streamSpecification(streamSpecification) .build(); TableDescription newTable; try { CreateTableResponse response = ddb.createTable(request); DescribeTableRequest tableRequest = DescribeTableRequest.builder() .tableName(tableName) .build(); System.out.println("Waiting for " + tableName + " to be created..."); // Wait until the Amazon DynamoDB table is created. WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest); waiterResponse.matched().response().ifPresent(System.out::println); newTable = response.tableDescription(); return newTable; } catch (DynamoDbException e) { System.err.println(e.getMessage()); System.exit(1); } return null; } }