Package software.amazon.awscdk.services.kinesis
Amazon Kinesis Construct Library
Amazon Kinesis provides collection and processing of large streams of data records in real time. Kinesis data streams can be used for rapid and continuous data intake and aggregation.
Table Of Contents
Streams
Amazon Kinesis Data Streams ingests a large amount of data in real time, durably stores the data, and makes the data available for consumption.
Using the CDK, a new Kinesis stream can be created as part of the stack using the construct's constructor. You may specify the streamName to give
your own identifier to the stream. If not, CloudFormation will generate a name.
Stream.Builder.create(this, "MyFirstStream")
.streamName("my-awesome-stream")
.build();
You can also specify properties such as shardCount to indicate how many shards the stream should choose and a retentionPeriod
to specify how long the data in the shards should remain accessible.
Read more at Creating and Managing Streams
Stream.Builder.create(this, "MyFirstStream")
.streamName("my-awesome-stream")
.shardCount(3)
.retentionPeriod(Duration.hours(48))
.build();
Encryption
Stream encryption enables server-side encryption using an AWS KMS key for a specified stream.
Encryption is enabled by default on your stream with the master key owned by Kinesis Data Streams in regions where it is supported.
new Stream(this, "MyEncryptedStream");
You can enable encryption on your stream with a user-managed key by specifying the encryption property.
A KMS key will be created for you and associated with the stream.
Stream.Builder.create(this, "MyEncryptedStream")
.encryption(StreamEncryption.KMS)
.build();
You can also supply your own external KMS key to use for stream encryption by specifying the encryptionKey property.
Key key = new Key(this, "MyKey");
Stream.Builder.create(this, "MyEncryptedStream")
.encryption(StreamEncryption.KMS)
.encryptionKey(key)
.build();
Import
Any Kinesis stream that has been created outside the stack can be imported into your CDK app.
Streams can be imported by their ARN via the Stream.fromStreamArn() API
IStream importedStream = Stream.fromStreamArn(this, "ImportedStream", "arn:aws:kinesis:us-east-2:123456789012:stream/f3j09j2230j");
Encrypted Streams can also be imported by their attributes via the Stream.fromStreamAttributes() API
IStream importedStream = Stream.fromStreamAttributes(this, "ImportedEncryptedStream", StreamAttributes.builder()
.streamArn("arn:aws:kinesis:us-east-2:123456789012:stream/f3j09j2230j")
.encryptionKey(Key.fromKeyArn(this, "key", "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"))
.build());
Permission Grants
IAM roles, users or groups which need to be able to work with Amazon Kinesis streams at runtime should be granted IAM permissions.
Any object that implements the IGrantable interface (has an associated principal) can be granted permissions by calling:
grantRead(principal)- grants the principal read accessgrantWrite(principal)- grants the principal write permissions to a StreamgrantReadWrite(principal)- grants principal read and write permissions
Read Permissions
Grant read access to a stream by calling the grantRead() API.
If the stream has an encryption key, read permissions will also be granted to the key.
Role lambdaRole = Role.Builder.create(this, "Role")
.assumedBy(new ServicePrincipal("lambda.amazonaws.com"))
.description("Example role...")
.build();
Stream stream = Stream.Builder.create(this, "MyEncryptedStream")
.encryption(StreamEncryption.KMS)
.build();
// give lambda permissions to read stream
stream.grantRead(lambdaRole);
The following read permissions are provided to a service principal by the grantRead() API:
kinesis:DescribeStreamSummarykinesis:GetRecordskinesis:GetShardIteratorkinesis:ListShardskinesis:SubscribeToShard
Write Permissions
Grant write permissions to a stream is provided by calling the grantWrite() API.
If the stream has an encryption key, write permissions will also be granted to the key.
Role lambdaRole = Role.Builder.create(this, "Role")
.assumedBy(new ServicePrincipal("lambda.amazonaws.com"))
.description("Example role...")
.build();
Stream stream = Stream.Builder.create(this, "MyEncryptedStream")
.encryption(StreamEncryption.KMS)
.build();
// give lambda permissions to write to stream
stream.grantWrite(lambdaRole);
The following write permissions are provided to a service principal by the grantWrite() API:
kinesis:ListShardskinesis:PutRecordkinesis:PutRecords
Custom Permissions
You can add any set of permissions to a stream by calling the grant() API.
User user = new User(this, "MyUser"); Stream stream = new Stream(this, "MyStream"); // give my user permissions to list shards stream.grant(user, "kinesis:ListShards");
Metrics
You can use common metrics from your stream to create alarms and/or dashboards. The stream.metric('MetricName') method creates a metric with the stream namespace and dimension. You can also use pre-define methods like stream.metricGetRecordsSuccess(). To find out more about Kinesis metrics check Monitoring the Amazon Kinesis Data Streams Service with Amazon CloudWatch.
Stream stream = new Stream(this, "MyStream");
// Using base metric method passing the metric name
stream.metric("GetRecords.Success");
// using pre-defined metric method
stream.metricGetRecordsSuccess();
// using pre-defined and overriding the statistic
stream.metricGetRecordsSuccess(MetricOptions.builder().statistic("Maximum").build());
Shard-level Metrics
You can enable enhanced shard-level metrics for your Kinesis stream to get detailed monitoring of individual shards. Shard-level metrics provide more granular insights into the performance and health of your stream.
Stream stream = Stream.Builder.create(this, "MyStream")
.shardLevelMetrics(List.of(ShardLevelMetrics.ALL))
.build();
You can also specify individual metrics that you want to monitor:
Stream stream = Stream.Builder.create(this, "MyStream")
.shardLevelMetrics(List.of(ShardLevelMetrics.INCOMING_BYTES, ShardLevelMetrics.INCOMING_RECORDS, ShardLevelMetrics.ITERATOR_AGE_MILLISECONDS, ShardLevelMetrics.OUTGOING_BYTES, ShardLevelMetrics.OUTGOING_RECORDS, ShardLevelMetrics.READ_PROVISIONED_THROUGHPUT_EXCEEDED, ShardLevelMetrics.WRITE_PROVISIONED_THROUGHPUT_EXCEEDED))
.build();
Available shard-level metrics include:
INCOMING_BYTES- The number of bytes successfully put to the shardINCOMING_RECORDS- The number of records successfully put to the shardITERATOR_AGE_MILLISECONDS- The age of the last record in all GetRecords calls made against a shardOUTGOING_BYTES- The number of bytes retrieved from the shardOUTGOING_RECORDS- The number of records retrieved from the shardREAD_PROVISIONED_THROUGHPUT_EXCEEDED- The number of GetRecords calls throttled for the shardWRITE_PROVISIONED_THROUGHPUT_EXCEEDED- The number of records rejected due to throttling for the shardALL- All available metrics
Note: You cannot specify ALL together with other individual metrics. If you want all metrics, use ALL alone.
For more information about shard-level metrics, see Monitoring the Amazon Kinesis Data Streams Service with Amazon CloudWatch.
Stream Consumers
Creating stream consumers allow consumers to receive data from the stream using enhanced fan-out at a rate of up to 2 MiB per second for every shard. This rate is unaffected by the total number of consumers that read from the same stream.
For more information, see Develop enhanced fan-out consumers with dedicated throughput.
To create and associate a stream consumer with a stream
Stream stream = new Stream(this, "MyStream");
StreamConsumer streamConsumer = StreamConsumer.Builder.create(this, "MyStreamConsumer")
.streamConsumerName("MyStreamConsumer")
.stream(stream)
.build();
Read Permissions
Grant read access to a stream consumer, and the stream it is registered with, by calling the grantRead() API.
Role lambdaRole = Role.Builder.create(this, "Role")
.assumedBy(new ServicePrincipal("lambda.amazonaws.com"))
.description("Example role...")
.build();
Stream stream = Stream.Builder.create(this, "MyEncryptedStream")
.encryption(StreamEncryption.KMS)
.build();
StreamConsumer streamConsumer = StreamConsumer.Builder.create(this, "MyStreamConsumer")
.streamConsumerName("MyStreamConsumer")
.stream(stream)
.build();
// give lambda permissions to read stream via the stream consumer
streamConsumer.grantRead(lambdaRole);
In addition to stream's permissions, the following permissions are provided to a service principal by the grantRead() API:
kinesis:DescribeStreamConsumer,kinesis:SubscribeToShard,
Resource Policy
You can create a resource policy for a data stream or a stream consumer. For more information, see Controlling access to Amazon Kinesis Data Streams resources using IAM.
A resource policy is automatically created when addToResourcePolicy is called, if one doesn't already exist.
Using addToResourcePolicy is the simplest way to add a resource policy:
Stream stream = new Stream(this, "MyStream");
StreamConsumer streamConsumer = StreamConsumer.Builder.create(this, "MyStreamConsumer")
.streamConsumerName("MyStreamConsumer")
.stream(stream)
.build();
// create a stream resource policy via addToResourcePolicy method
stream.addToResourcePolicy(PolicyStatement.Builder.create()
.resources(List.of(stream.getStreamArn()))
.actions(List.of("kinesis:GetRecords"))
.principals(List.of(new AnyPrincipal()))
.build());
// create a stream consumer resource policy via addToResourcePolicy method
streamConsumer.addToResourcePolicy(PolicyStatement.Builder.create()
.resources(List.of(stream.getStreamArn()))
.actions(List.of("kinesis:DescribeStreamConsumer"))
.principals(List.of(new AnyPrincipal()))
.build());
You can create a resource manually by using ResourcePolicy.
Also, you can set a custom policy document to ResourcePolicy.
If not, a blank policy document will be set.
Stream stream = new Stream(this, "MyStream");
StreamConsumer streamConsumer = StreamConsumer.Builder.create(this, "MyStreamConsumer")
.streamConsumerName("MyStreamConsumer")
.stream(stream)
.build();
// create a custom policy document
PolicyDocument policyDocument = PolicyDocument.Builder.create()
.assignSids(true)
.statements(List.of(
PolicyStatement.Builder.create()
.actions(List.of("kinesis:GetRecords"))
.resources(List.of(stream.getStreamArn()))
.principals(List.of(new AnyPrincipal()))
.build()))
.build();
// create a stream resource policy manually
// create a stream resource policy manually
ResourcePolicy.Builder.create(this, "ResourcePolicy")
.stream(stream)
.policyDocument(policyDocument)
.build();
// create a stream consumer resource policy manually
// create a stream consumer resource policy manually
ResourcePolicy.Builder.create(this, "ResourcePolicy")
.streamConsumer(streamConsumer)
.policyDocument(policyDocument)
.build();
-
ClassDescriptionAttaches a resource-based policy to a data stream or registered consumer.A fluent builder for
CfnResourcePolicy.Properties for defining aCfnResourcePolicy.A builder forCfnResourcePolicyPropsAn implementation forCfnResourcePolicyPropsCreates a Kinesis stream that captures and transports data records that are emitted from data sources.A fluent builder forCfnStream.Enables or updates server-side encryption using an AWS KMS key for a specified stream.A builder forCfnStream.StreamEncryptionPropertyAn implementation forCfnStream.StreamEncryptionPropertySpecifies the capacity mode to which you want to set your data stream.A builder forCfnStream.StreamModeDetailsPropertyAn implementation forCfnStream.StreamModeDetailsPropertyUse the AWS CloudFormationAWS::Kinesis::StreamConsumerresource to register a consumer with a Kinesis data stream.A fluent builder forCfnStreamConsumer.Properties for defining aCfnStreamConsumer.A builder forCfnStreamConsumerPropsAn implementation forCfnStreamConsumerPropsProperties for defining aCfnStream.A builder forCfnStreamPropsAn implementation forCfnStreamProps(experimental) Indicates that this resource can be referenced as a ResourcePolicy.Internal default implementation forIResourcePolicyRef.A proxy class which represents a concrete javascript instance of this type.A Kinesis Stream.Internal default implementation forIStream.A proxy class which represents a concrete javascript instance of this type.A Kinesis Stream Consumer.Internal default implementation forIStreamConsumer.A proxy class which represents a concrete javascript instance of this type.(experimental) Indicates that this resource can be referenced as a StreamConsumer.Internal default implementation forIStreamConsumerRef.A proxy class which represents a concrete javascript instance of this type.(experimental) Indicates that this resource can be referenced as a Stream.Internal default implementation forIStreamRef.A proxy class which represents a concrete javascript instance of this type.The policy for a data stream or registered consumer.A fluent builder forResourcePolicy.Properties to associate a data stream with a policy.A builder forResourcePolicyPropsAn implementation forResourcePolicyPropsA reference to a ResourcePolicy resource.A builder forResourcePolicyReferenceAn implementation forResourcePolicyReferenceEnhanced shard-level metrics.A Kinesis stream.A fluent builder forStream.A reference to a stream.A builder forStreamAttributesAn implementation forStreamAttributesA Kinesis Stream Consumer.A fluent builder forStreamConsumer.A reference to a StreamConsumer, which can be imported usingStreamConsumer.fromStreamConsumerAttributes.A builder forStreamConsumerAttributesAn implementation forStreamConsumerAttributesProperties for a Kinesis Stream Consumer.A builder forStreamConsumerPropsAn implementation forStreamConsumerPropsA reference to a StreamConsumer resource.A builder forStreamConsumerReferenceAn implementation forStreamConsumerReferenceWhat kind of server-side encryption to apply to this stream.Specifies the capacity mode to apply to this stream.Properties for a Kinesis Stream.A builder forStreamPropsAn implementation forStreamPropsA reference to a Stream resource.A builder forStreamReferenceAn implementation forStreamReference