

# Write data to Amazon Kinesis Data Streams
<a name="building-producers"></a>

A *producer* is an application that writes data to Amazon Kinesis Data Streams. You can build producers for Kinesis Data Streams using the AWS SDK for Java and the Kinesis Producer Library (KPL).

If you are new to Kinesis Data Streams, start by becoming familiar with the concepts and terminology presented in [What is Amazon Kinesis Data Streams?](introduction.md) and [Use the AWS CLI to perform Amazon Kinesis Data Streams operations](getting-started.md).

**Important**  
Kinesis Data Streams supports changes to the data record retention period of your data stream. For more information, see [Change the data retention period](kinesis-extended-retention.md).

To put data into the stream, you must specify the name of the stream, a partition key, and the data blob to be added to the stream. The partition key is used to determine which shard in the stream the data record is added to.

All the data in the shard is sent to the same worker that is processing the shard. Which partition key you use depends on your application logic. The number of partition keys should typically be much greater than the number of shards. This is because the partition key is used to determine how to map a data record to a particular shard. If you have enough partition keys, the data can be evenly distributed across the shards in a stream.

**Topics**
+ [Develop producers using the Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md)
+ [Develop producers using the Amazon Kinesis Data Streams API with the AWS SDK for Java](developing-producers-with-sdk.md)
+ [Write to Amazon Kinesis Data Streams using Kinesis Agent](writing-with-agents.md)
+ [Write to Kinesis Data Streams using other AWS services](using-other-services.md)
+ [Write to Kinesis Data Streams using third-party integrations](using-other-services-third-party.md)
+ [Troubleshoot Amazon Kinesis Data Streams producers](troubleshooting-producers.md)
+ [Optimize Kinesis Data Streams producers](advanced-producers.md)

# Develop producers using the Amazon Kinesis Producer Library (KPL)
<a name="developing-producers-with-kpl"></a>

An Amazon Kinesis Data Streams producer is an application that puts user data records into a Kinesis data stream (also called *data ingestion*). The Amazon Kinesis Producer Library (KPL) simplifies producer application development, letting developers achieve high write throughput to a Kinesis data stream. 

You can monitor the KPL with Amazon CloudWatch. For more information, see [Monitor the Kinesis Producer Library with Amazon CloudWatch](monitoring-with-kpl.md).

**Topics**
+ [Review the role of the KPL](#developing-producers-with-kpl-role)
+ [Realize the advantages of using the KPL](#developing-producers-with-kpl-advantage)
+ [Understand when not to use the KPL](#developing-producers-with-kpl-when)
+ [Install the KPL](kinesis-kpl-dl-install.md)
+ [Migrate from KPL 0.x to KPL 1.x](kpl-migration-1x.md)
+ [Transition to Amazon Trust Services (ATS) certificates for the KPL](kinesis-kpl-upgrades.md)
+ [KPL supported platforms](kinesis-kpl-supported-plats.md)
+ [KPL key concepts](kinesis-kpl-concepts.md)
+ [Integrate the KPL with producer code](kinesis-kpl-integration.md)
+ [Write to your Kinesis data stream using the KPL](kinesis-kpl-writing.md)
+ [Configure the Amazon Kinesis Producer Library](kinesis-kpl-config.md)
+ [Implement consumer de-aggregation](kinesis-kpl-consumer-deaggregation.md)
+ [Use the KPL with Amazon Data Firehose](kpl-with-firehose.md)
+ [Use the KPL with the AWS Glue Schema Registry](kpl-with-schemaregistry.md)
+ [Configure the KPL proxy configuration](kpl-proxy-configuration.md)
+ [KPL version lifecycle policy](kpl-version-lifecycle-policy.md)

**Note**  
It is recommended that you upgrade to the latest KPL version. KPL is regularly updated with newer releases that include the latest dependency and security patches, bug fixes, and backward-compatible new features. For more information, see [https://github.com/awslabs/amazon-kinesis-producer/releases/](https://github.com/awslabs/amazon-kinesis-producer/releases/).

## Review the role of the KPL
<a name="developing-producers-with-kpl-role"></a>

The KPL is an easy-to-use, highly configurable library that helps you write to a Kinesis data stream. It acts as an intermediary between your producer application code and the Kinesis Data Streams API actions. The KPL performs the following primary tasks: 
+ Writes to one or more Kinesis data streams with an automatic and configurable retry mechanism
+ Collects records and uses `PutRecords` to write multiple records to multiple shards per request
+ Aggregates user records to increase payload size and improve throughput
+ Integrates seamlessly with the [Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) (KCL) to de-aggregate batched records on the consumer
+ Submits Amazon CloudWatch metrics on your behalf to provide visibility into producer performance

Note that the KPL is different from the Kinesis Data Streams API that is available in the [AWS SDKs](https://aws.amazon.com/tools/). The Kinesis Data Streams API helps you manage many aspects of Kinesis Data Streams (including creating streams, resharding, and putting and getting records), while the KPL provides a layer of abstraction specifically for ingesting data. For information about the Kinesis Data Streams API, see the [Amazon Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/).

## Realize the advantages of using the KPL
<a name="developing-producers-with-kpl-advantage"></a>

The following list represents some of the major advantages to using the KPL for developing Kinesis Data Streams producers.

The KPL can be used in either synchronous or asynchronous use cases. We suggest using the higher performance of the asynchronous interface unless there is a specific reason to use synchronous behavior. For more information about these two use cases and example code, see [Write to your Kinesis data stream using the KPL](kinesis-kpl-writing.md).

 **Performance Benefits**   
The KPL can help build high-performance producers. Consider a situation where your Amazon EC2 instances serve as a proxy for collecting 100-byte events from hundreds or thousands of low power devices and writing records into a Kinesis data stream. These EC2 instances must each write thousands of events per second to your data stream. To achieve the throughput needed, producers must implement complicated logic, such as batching or multithreading, in addition to retry logic and record de-aggregation at the consumer side. The KPL performs all of these tasks for you. 

 **Consumer-Side Ease of Use**   
For consumer-side developers using the KCL in Java, the KPL integrates without additional effort. When the KCL retrieves an aggregated Kinesis Data Streams record consisting of multiple KPL user records, it automatically invokes the KPL to extract the individual user records before returning them to the user.   
For consumer-side developers who do not use the KCL but instead use the API operation `GetRecords` directly, a KPL Java library is available to extract the individual user records before returning them to the user. 

 **Producer Monitoring**   
You can collect, monitor, and analyze your Kinesis Data Streams producers using Amazon CloudWatch and the KPL. The KPL emits throughput, error, and other metrics to CloudWatch on your behalf, and is configurable to monitor at the stream, shard, or producer level.

 **Asynchronous Architecture**   
Because the KPL may buffer records before sending them to Kinesis Data Streams, it does not force the caller application to block and wait for a confirmation that the record has arrived at the server before continuing runtime. A call to put a record into the KPL always returns immediately and does not wait for the record to be sent or a response to be received from the server. Instead, a `Future` object is created that receives the result of sending the record to Kinesis Data Streams at a later time. This is the same behavior as asynchronous clients in the AWS SDK.

## Understand when not to use the KPL
<a name="developing-producers-with-kpl-when"></a>

The KPL can incur an additional processing delay of up to `RecordMaxBufferedTime` within the library (user-configurable). Larger values of `RecordMaxBufferedTime` results in higher packing efficiencies and better performance. Applications that cannot tolerate this additional delay might need to use the AWS SDK directly. For more information about using the AWS SDK with Kinesis Data Streams, see [Develop producers using the Amazon Kinesis Data Streams API with the AWS SDK for Java](developing-producers-with-sdk.md). For more information about `RecordMaxBufferedTime` and other user-configurable properties of the KPL, see [Configure the Amazon Kinesis Producer Library](kinesis-kpl-config.md).

# Install the KPL
<a name="kinesis-kpl-dl-install"></a>

Amazon provides pre-built binaries of the C\$1\$1 Amazon Kinesis Producer Library (KPL) for macOS, Windows, and recent Linux distributions (for supported platform details, see the next section). These binaries are packaged as part of Java .jar files and are automatically invoked and used if you are using Maven to install the package. To locate the latest versions of the KPL and KCL, use the following Maven search links:
+ [KPL](https://search.maven.org/#search|ga|1|amazon-kinesis-producer)
+ [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client)

The Linux binaries have been compiled with the GNU Compiler Collection (GCC) and statically linked against libstdc\$1\$1 on Linux. They are expected to work on any 64-bit Linux distribution that includes a glibc version 2.5 or higher.

Users of earlier Linux distributions can build the KPL using the build instructions provided along with the source on GitHub. To download the KPL from GitHub, see [Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer).

**Important**  
Amazon Kinesis Producer Library (KPL) 0.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KPL applications using version 0.x to the latest KPL version before January 30, 2026. To find the latest KPL version, see the [KPL page on Github](https://github.com/awslabs/amazon-kinesis-producer). For information about migrating from KPL 0.x to KPL 1.x, see [Migrate from KPL 0.x to KPL 1.x](kpl-migration-1x.md).

# Migrate from KPL 0.x to KPL 1.x
<a name="kpl-migration-1x"></a>

This topic provides step-by-step instructions to migrate your consumer from KPL 0.x to KPL 1.x. KPL 1.x introduces support for the AWS SDK for Java 2.x while maintaining interface compatibility with previous versions. You don’t have to update your core data processing logic to migrate to KPL 1.x. 

1. **Make sure that you have the following prerequisites:**
   + Java Development Kit (JDK) 8 or later
   + AWS SDK for Java 2.x
   + Maven or Gradle for dependency management

1. **Add dependencies**

   If you're using Maven, add the following dependency to your pom.xml file. Make sure you updated the groupId from `com.amazonaws` to `software.amazon.kinesis` and the version `1.x.x` to the latest KPL version. 

   ```
   <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>1.x.x</version> <!-- Use the latest version -->
   </dependency>
   ```

   If you're using Gradle, add the following to your `build.gradle` file. Make sure to replace `1.x.x` with the latest KPL version. 

   ```
   implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'
   ```

   You can check for the latest version of the KPL on the [Maven Central Repository](https://central.sonatype.com/search?q=amazon-kinesis-producer). 

1. **Update import statements for KPL**

   KPL 1.x uses the AWS SDK for Java 2.x and uses an updated package name that starts with `software.amazon.kinesis`, compared to the package name in the previous KPL that starts with `com.amazonaws.services.kinesis`.

   Replace the import for `com.amazonaws.services.kinesis` with `software.amazon.kinesis`. The following table lists the imports that you must replace.  
**Import replacements**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kpl-migration-1x.html)

1. **Update import statements for AWS credentials provider classes**

   When migrating to KPL 1.x, you must update packages and classes in your imports in your KPL application code that are based on the AWS SDK for Java 1.x to corresponding ones based on the AWS SDK for Java 2.x. Common imports in the KPL application are credentials provider classes. See [Credentials provider changes](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html) in the AWS SDK for Java 2.x migration guide documentation for the full list of credentials provider changes. Here is the common import change that you might need to make in your KPL applications. 

   **Import in KPL 0.x**

   ```
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   ```

   **Import in KPL 1.x**

   ```
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   ```

   If you import any other credentials providers based on the AWS SDK for Java 1.x, you must update them to the AWS SDK for Java 2.x equivalent ones. If you didn’t import any classes/packages from the AWS SDK for Java 1.x, you can ignore this step.

1. **Update the credentials provider configuration in the KPL configuration**

   The credentials provider configuration in KPL 1.x requires the AWS SDK for Java 2.x credential providers. If you are passing credentials providers for the AWS SDK for Java 1.x in the `KinesisProducerConfiguration` by overriding the default credentials provider, you must update it with the AWS SDK for Java 2.x credential providers. See [Credentials provider changes](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html) in the AWS SDK for Java 2.x migration guide documentation for the full list of credentials provider changes. If you didn’t override the default credentials provider in the KPL configuration, you can ignore this step.

   For example, if you are overriding the default credentials provider for the KPL with the following code:

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // SDK v1 default credentials provider
   config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
   ```

   You must update them with the following code to use the AWS SDK for Java 2.x credentials provider:

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // New SDK v2 default credentials provider
   config.setCredentialsProvider(DefaultCredentialsProvider.create());
   ```

# Transition to Amazon Trust Services (ATS) certificates for the KPL
<a name="kinesis-kpl-upgrades"></a>

On February 9, 2018, at 9:00 AM PST, Amazon Kinesis Data Streams installed ATS certificates. To continue to be able to write records to Kinesis Data Streams using the Amazon Kinesis Producer Library (KPL), you must upgrade your installation of the KPL to [version 0.12.6](http://search.maven.org/#artifactdetails|com.amazonaws|amazon-kinesis-producer|0.12.6|jar) or later. This change affects all AWS Regions.

For information about the move to ATS, see [How to Prepare for AWS’s Move to Its Own Certificate Authority](https://aws.amazon.com/blogs/security/how-to-prepare-for-aws-move-to-its-own-certificate-authority/).

If you encounter problems and need technical support, [create a case](https://console.aws.amazon.com/support/v1#/case/create) with the AWS Support Center.

# KPL supported platforms
<a name="kinesis-kpl-supported-plats"></a>

The Amazon Kinesis Producer Library (KPL) is written in C\$1\$1 and runs as a child process to the main user process. Precompiled 64-bit native binaries are bundled with the Java release and are managed by the Java wrapper.

The Java package runs without the need to install any additional libraries on the following operating systems:
+ Linux distributions with kernel 2.6.18 (September 2006) and later
+ Apple iOS X 10.9 and later
+ Windows Server 2008 and later
**Important**  
Windows Server 2008 and later is supported for all KPL versions up to version 0.14.0.   
The Windows platform is NOT supported starting with KPL version 0.14.0 or higher.

Note that the KPL is 64-bit only.

## Source code
<a name="kinesis-kpl-supported-plats-source-code"></a>

If the binaries provided in the KPL installation are not sufficient for your environment, the core of the KPL is written as a C\$1\$1 module. The source code for the C\$1\$1 module and the Java interface are released under the Amazon Public License and are available on GitHub at [Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer). Although the KPL can be used on any platform for which a recent standards-compliant C\$1\$1 compiler and JRE are available, Amazon doesn't officially support any platform that is not on the supported platforms list.

# KPL key concepts
<a name="kinesis-kpl-concepts"></a>

The following sections contain concepts and terminology necessary to understand and benefit from the Amazon Kinesis Producer Library (KPL).

**Topics**
+ [Records](#kinesis-kpl-concepts-records)
+ [Batching](#kinesis-kpl-concepts-batching)
+ [Aggregation](#kinesis-kpl-concepts-aggretation)
+ [Collection](#kinesis-kpl-concepts-collection)

## Records
<a name="kinesis-kpl-concepts-records"></a>

In this guide, we distinguish between *KPL user records* and *Kinesis Data Streams records*. When we use the term *record* without a qualifier, we refer to a *KPL user record*. When we refer to a Kinesis Data Streams record, we explicitly say *Kinesis Data Streams record*.

A KPL user record is a blob of data that has particular meaning to the user. Examples include a JSON blob representing a UI event on a website, or a log entry from a web server.

A Kinesis Data Streams record is an instance of the `Record` data structure defined by the Kinesis Data Streams service API. It contains a partition key, sequence number, and a blob of data. 

## Batching
<a name="kinesis-kpl-concepts-batching"></a>

*Batching* refers to performing a single action on multiple items instead of repeatedly performing the action on each individual item. 

In this context, the "item" is a record, and the action is sending it to Kinesis Data Streams. In a non-batching situation, you would place each record in a separate Kinesis Data Streams record and make one HTTP request to send it to Kinesis Data Streams. With batching, each HTTP request can carry multiple records instead of just one.

The KPL supports two types of batching:
+ *Aggregation* – Storing multiple records within a single Kinesis Data Streams record. 
+ *Collection* – Using the API operation `PutRecords` to send multiple Kinesis Data Streams records to one or more shards in your Kinesis data stream. 

The two types of KPL batching are designed to coexist and can be turned on or off independently of one another. By default, both are turned on.

## Aggregation
<a name="kinesis-kpl-concepts-aggretation"></a>

*Aggregation* refers to the storage of multiple records in a Kinesis Data Streams record. Aggregation allows customers to increase the number of records sent per API call, which effectively increases producer throughput.

Kinesis Data Streams shards support up to 1,000 Kinesis Data Streams records per second, or 1 MB throughput. The Kinesis Data Streams records per second limit binds customers with records smaller than 1 KB. Record aggregation allows customers to combine multiple records into a single Kinesis Data Streams record. This allows customers to improve their per shard throughput. 

Consider the case of one shard in Region us-east-1 that is currently running at a constant rate of 1,000 records per second, with records that are 512 bytes each. With KPL aggregation, you can pack 1,000 records into only 10 Kinesis Data Streams records, reducing the RPS to 10 (at 50 KB each).

## Collection
<a name="kinesis-kpl-concepts-collection"></a>

*Collection* refers to batching multiple Kinesis Data Streams records and sending them in a single HTTP request with a call to the API operation `PutRecords`, instead of sending each Kinesis Data Streams record in its own HTTP request.

This increases throughput compared to using no collection because it reduces the overhead of making many separate HTTP requests. In fact, `PutRecords` itself was specifically designed for this purpose.

Collection differs from aggregation in that it is working with groups of Kinesis Data Streams records. The Kinesis Data Streams records being collected can still contain multiple records from the user. The relationship can be visualized as such:

```
record 0 --|
record 1   |        [ Aggregation ]
    ...    |--> Amazon Kinesis record 0 --|
    ...    |                              |
record A --|                              |
                                          |
    ...                   ...             |
                                          |
record K --|                              |
record L   |                              |      [ Collection ]
    ...    |--> Amazon Kinesis record C --|--> PutRecords Request
    ...    |                              |
record S --|                              |
                                          |
    ...                   ...             |
                                          |
record AA--|                              |
record BB  |                              |
    ...    |--> Amazon Kinesis record M --|
    ...    |
record ZZ--|
```

# Integrate the KPL with producer code
<a name="kinesis-kpl-integration"></a>

The Amazon Kinesis Producer Library (KPL) runs in a separate process, and communicates with your parent user process using IPC. This architecture is sometimes called a [microservice](http://en.wikipedia.org/wiki/Microservices), and is chosen for two main reasons:

**1) Your user process will not crash even if the KPL crashes**  
Your process could have tasks unrelated to Kinesis Data Streams, and may be able to continue operation even if the KPL crashes. It is also possible for your parent user process to restart the KPL and recover to a fully working state (this functionality is in the official wrappers).

An example is a web server that sends metrics to Kinesis Data Streams; the server can continue serving pages even if the Kinesis Data Streams part has stopped working. Crashing the whole server because of a bug in the KPL would therefore cause an unnecessary outage.

**2) Arbitrary clients can be supported**  
There are always customers who use languages other than the ones officially supported. These customers should also be able to use the KPL easily.

## Recommended usage matrix
<a name="kinesis-kpl-integration-usage"></a>

The following usage matrix lists the recommended settings for different users and advises you about whether and how you should use the KPL. Keep in mind that if aggregation is enabled, de-aggregation must also be used to extract your records on the consumer side. 


| Producer side language | Consumer side language | KCL Version | Checkpoint logic | Can you use the KPL? | Caveats | 
| --- | --- | --- | --- | --- | --- | 
| Anything but Java | \$1 | \$1 | \$1 | No | N/A | 
| Java | Java | Uses Java SDK directly | N/A | Yes | If aggregation is used, you have to use the provided de-aggregation library after GetRecords calls. | 
| Java | Anything but Java | Uses SDK directly | N/A | Yes | Must disable aggregation.  | 
| Java | Java | 1.3.x | N/A | Yes | Must disable aggregation. | 
| Java | Java  | 1.4.x | Calls checkpoint without any arguments | Yes | None | 
| Java | Java | 1.4.x | Calls checkpoint with an explicit sequence number | Yes | Either disable aggregation, or change the code to use extended sequence numbers for checkpointing. | 
| Java | Anything but Java  | 1.3.x \$1 Multilanguage daemon \$1 language-specific wrapper | N/A | Yes | Must disable aggregation.  | 

# Write to your Kinesis data stream using the KPL
<a name="kinesis-kpl-writing"></a>

The following sections show sample code in a progression from the most basic producer to fully asynchronous code.

## Barebones producer code
<a name="kinesis-kpl-writing-code"></a>

The following code is all that is needed to write a minimal working producer. The Amazon Kinesis Producer Library (KPL) user records are processed in the background.

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## Respond to results synchronously
<a name="kinesis-kpl-writing-synchronous"></a>

In the previous example, the code didn't check whether the KPL user records succeeded. The KPL performs any retries needed to account for failures. But if you want to check on the results, you can examine them using the `Future` objects that are returned from `addUserRecord`, as in the following example (previous example shown for context):

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## Respond to results asynchronously
<a name="kinesis-kpl-writing-asynchronous"></a>

The previous example is calling `get()` on a `Future` object, which blocks runtime. If you don't want to block runtime, you can use an asynchronous callback, as shown in the following example:

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```

# Configure the Amazon Kinesis Producer Library
<a name="kinesis-kpl-config"></a>

Although the default settings should work well for most use cases, you may want to change some of the default settings to tailor the behavior of the `KinesisProducer` to your needs. An instance of the `KinesisProducerConfiguration` class can be passed to the `KinesisProducer` constructor to do so, for example:

```
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion("us-west-1");
        
final KinesisProducer kinesisProducer = new KinesisProducer(config);
```

You can also load a configuration from a properties file:

```
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
```

You can substitute any path and file name that the user process has access to. You can additionally call set methods on the `KinesisProducerConfiguration` instance created this way to customize the config.

The properties file should specify parameters using their names in PascalCase. The names match those used in the set methods in the `KinesisProducerConfiguration` class. For example:

```
RecordMaxBufferedTime = 100
MaxConnections = 4
RequestTimeout = 6000
Region = us-west-1
```

For more information about configuration parameter usage rules and value limits, see the [sample configuration properties file on GitHub](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties).

Note that after `KinesisProducer` is initialized, changing the `KinesisProducerConfiguration` instance that was used has no further effect. `KinesisProducer` does not currently support dynamic reconfiguration.

# Implement consumer de-aggregation
<a name="kinesis-kpl-consumer-deaggregation"></a>

Beginning with release 1.4.0, the KCL supports automatic de-aggregation of KPL user records. Consumer application code written with previous versions of the KCL will compile without any modification after you update the KCL. However, if KPL aggregation is being used on the producer side, there is a subtlety involving checkpointing: all subrecords within an aggregated record have the same sequence number, so additional data has to be stored with the checkpoint if you need to distinguish between subrecords. This additional data is referred to as the *subsequence number*.

**Topics**
+ [Migrate from previous versions of the KCL](#kinesis-kpl-consumer-deaggregation-migration)
+ [Use KCL extensions for KPL de-aggregation](#kinesis-kpl-consumer-deaggregation-extensions)
+ [Use GetRecords directly](#kinesis-kpl-consumer-deaggregation-getrecords)

## Migrate from previous versions of the KCL
<a name="kinesis-kpl-consumer-deaggregation-migration"></a>

You are not required to change your existing calls to do checkpointing with aggregation. It is still guaranteed that you can retrieve all records successfully stored in Kinesis Data Streams. The KCL now provides two new checkpoint operations to support particular use cases, described following.

If your existing code was written for the KCL before KPL support, and your checkpoint operation is called without arguments, it is equivalent to checkpointing the sequence number of the last KPL user record in the batch. If your checkpoint operation is called with a sequence number string, it is equivalent to checkpointing the given sequence number of the batch along with the implicit subsequence number 0 (zero).

Calling the new KCL checkpoint operation `checkpoint()` without any arguments is semantically equivalent to checkpointing the sequence number of the last `Record` call in the batch, along with the implicit subsequence number 0 (zero). 

Calling the new KCL checkpoint operation `checkpoint(Record record)` is semantically equivalent to checkpointing the given `Record`’s sequence number along with the implicit subsequence number 0 (zero). If the `Record` call is actually a `UserRecord`, the `UserRecord` sequence number and subsequence number are checkpointed. 

Calling the new KCL checkpoint operation `checkpoint(String sequenceNumber, long subSequenceNumber)` explicitly checkpoints the given sequence number along with the given subsequence number. 

In any of these cases, after the checkpoint is stored in the Amazon DynamoDB checkpoint table, the KCL can correctly resume retrieving records even when the application crashes and restarts. If more records are contained within the sequence, retrieval occurs starting with the next subsequence number record within the record with the most recently checkpointed sequence number. If the most recent checkpoint included the very last subsequence number of the previous sequence number record, retrieval occurs starting with the record with the next sequence number. 

The next section discusses details of sequence and subsequence checkpointing for consumers that must avoid skipping and duplication of records. If skipping (or duplication) of records when stopping and restarting your consumer’s record processing is not important, you can run your existing code with no modification.

## Use KCL extensions for KPL de-aggregation
<a name="kinesis-kpl-consumer-deaggregation-extensions"></a>

KPL de-aggregation can involve subsequence checkpointing. To facilitate using subsequence checkpointing, a `UserRecord` class has been added to the KCL:

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

This class is now used instead of `Record`. This does not break existing code because it is a subclass of `Record`. The `UserRecord` class represents both actual subrecords and standard, non-aggregated records. Non-aggregated records can be thought of as aggregated records with exactly one subrecord.

In addition, two new operations are added to`IRecordProcessorCheckpointer`:

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

To begin using subsequence number checkpointing, you can perform the following conversion. Change the following form code:

```
checkpointer.checkpoint(record.getSequenceNumber());
```

New form code:

```
checkpointer.checkpoint(record);
```

We recommend that you use the `checkpoint(Record record)` form for subsequence checkpointing. However, if you are already storing `sequenceNumbers` in strings to use for checkpointing, you should now also store `subSequenceNumber`, as shown in the following example:

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

The cast from `Record`to`UserRecord` always succeeds because the implementation always uses `UserRecord`. Unless there is a need to perform arithmetic on the sequence numbers, this approach is not recommended.

While processing KPL user records, the KCL writes the subsequence number into Amazon DynamoDB as an extra field for each row. Previous versions of the KCL used `AFTER_SEQUENCE_NUMBER` to fetch records when resuming checkpoints. The current KCL with KPL support uses `AT_SEQUENCE_NUMBER` instead. When the record at the checkpointed sequence number is retrieved, the checkpointed subsequence number is checked, and subrecords are dropped as appropriate (which may be all of them, if the last subrecord is the one checkpointed). Again, non-aggregated records can be thought of as aggregated records with a single subrecord, so the same algorithm works for both aggregated and non-aggregated records.

## Use GetRecords directly
<a name="kinesis-kpl-consumer-deaggregation-getrecords"></a>

You can also choose not to use the KCL but instead invoke the API operation `GetRecords` directly to retrieve Kinesis Data Streams records. To unpack these retrieved records into your original KPL user records, call one of the following static operations in `UserRecord.java`:

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

The first operation uses the default value `0` (zero) for `startingHashKey` and the default value `2^128 -1` for `endingHashKey`.

Each of these operations de-aggregates the given list of Kinesis Data Streams records into a list of KPL user records. Any KPL user records whose explicit hash key or partition key falls outside the range of the `startingHashKey` (inclusive) and the `endingHashKey` (inclusive) are discarded from the returned list of records.

# Use the KPL with Amazon Data Firehose
<a name="kpl-with-firehose"></a>

If you use the Kinesis Producer Library (KPL) to write data to a Kinesis data stream, you can use aggregation to combine the records that you write to that Kinesis data stream. If you then use that data stream as a source for your Firehose delivery stream, Firehose de-aggregates the records before it delivers them to the destination. If you configure your delivery stream to transform the data, Firehose de-aggregates the records before it delivers them to AWS Lambda. For more information, see [Writing to Amazon Firehose Using Kinesis Data Streams](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html).

# Use the KPL with the AWS Glue Schema Registry
<a name="kpl-with-schemaregistry"></a>

You can integrate your Kinesis data streams with the AWS Glue Schema Registry. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS Glue Schema Registry enables you to improve end-to-end data quality and data governance within your streaming applications. For more information, see [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). One of the ways to set up this integration is through the KPL and Kinesis Client Library (KCL) libraries in Java. 

**Important**  
Currently, Kinesis Data Streams and AWS Glue schema registry integration is only supported for the Kinesis data streams that use KPL producers implemented in Java. Multi-language support is not provided. 

For detailed instructions on how to set up integration of Kinesis Data Streams with Schema Registry using the KPL, see the "Interacting with Data Using the KPL/KCL Libraries" section in [Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds).

# Configure the KPL proxy configuration
<a name="kpl-proxy-configuration"></a>

For applications that cannot directly connect to the internet, all AWS SDK clients support the use of HTTP or HTTPS proxies. In a typical enterprise environment, all outbound network traffic has to go through proxy servers. If your application uses Kinesis Producer Library (KPL) to collect and send data to AWS in an environment that uses proxy servers, your application will require KPL proxy configuration. KPL is a high level library built on top of the AWS Kinesis SDK. It is split into a native process and a wrapper. The native process performs all of the jobs of processing and sending records, while the wrapper manages the native process and communicates with it. For more information, see [Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library](https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/). 

The wrapper is written in Java and the native process is written in C\$1\$1 with the use of Kinesis SDK. KPL version 0.14.7 and higher now supports proxy configuration in the Java wrapper which can pass all proxy configurations to the native process. For more information, see [https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7](https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7).

You can use the following code to add proxy configurations to your KPL applications.

```
KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
// Next 4 lines used to configure proxy 
configuration.setProxyHost("10.0.0.0"); // required
configuration.setProxyPort(3128); // default port is set to 443
configuration.setProxyUserName("username"); // no default 
configuration.setProxyPassword("password"); // no default

KinesisProducer kinesisProducer = new KinesisProducer(configuration);
```

# KPL version lifecycle policy
<a name="kpl-version-lifecycle-policy"></a>

This topic outlines the version lifecycle policy for Amazon Kinesis Producer Library (KPL). AWS regularly provides new releases for KPL versions to support new features and enhancements, bug fixes, security patches, and dependency updates. We recommend that you stay up-to-date with KPL versions to keep up with the latest features, security updates, and underlying dependencies. We **don't** recommend continued use of an unsupported KPL version.

The lifecycle for major KPL versions consists of the following three phases:
+ **General availability (GA)** – During this phase, the major version is fully supported. AWS provides regular minor and patch version releases that include support for new features or API updates for Kinesis Data Streams, as well as bug and security fixes.
+ **Maintenance mode** – AWS limits patch version releases to address critical bug fixes and security issues only. The major version won't receive updates for new features or APIs of Kinesis Data Streams.
+ **End-of-support** – The major version will no longer receive updates or releases. Previously published releases will continue to be available through public package managers and the code will remain on GitHub. Use of a version which has reached end-of-support is done at the user’s discretion. We recommend that you upgrade to the latest major version.


| Major version | Current phase | Release date | Maintenance mode date | End-of-support date | 
| --- | --- | --- | --- | --- | 
| KPL 0.x | Maintenance mode | 2015-06-02 | 2025-04-17 | 2026-01-30 | 
| KPL 1.x | General availability | 2024-12-15 | -- | -- | 

# Develop producers using the Amazon Kinesis Data Streams API with the AWS SDK for Java
<a name="developing-producers-with-sdk"></a>

You can develop producers using the Amazon Kinesis Data Streams API with the AWS SDK for Java. If you are new to Kinesis Data Streams, start by becoming familiar with the concepts and terminology presented in [What is Amazon Kinesis Data Streams?](introduction.md) and [Use the AWS CLI to perform Amazon Kinesis Data Streams operations](getting-started.md).

These examples discuss the [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) and use the [AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) to add (put) data to a stream. However, for most use cases, you should prefer the Kinesis Data Streams KPL library. For more information, see [Develop producers using the Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md).

The Java example code in this chapter demonstrates how to perform basic Kinesis Data Streams API operations, and is divided up logically by operation type. These examples do not represent production-ready code, in that they do not check for all possible exceptions, or account for all possible security or performance considerations. Also, you can call the [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) using other programming languages. For more information about all available AWS SDKs, see [Start Developing with Amazon Web Services](https://aws.amazon.com/developers/getting-started/).

Each task has prerequisites; for example, you cannot add data to a stream until you have created a stream, which requires you to create a client . For more information, see [Create and manage Kinesis data streams](working-with-streams.md).

**Topics**
+ [Add data to a stream](#kinesis-using-sdk-java-add-data-to-stream)
+ [Interact with data using the AWS Glue Schema Registry](kinesis-integration-glue-schema-registry.md)

## Add data to a stream
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

Once a stream is created, you can add data to it in the form of records. A record is a data structure that contains the data to be processed in the form of a data blob. After you store the data in the record, Kinesis Data Streams does not inspect, interpret, or change the data in any way. Each record also has an associated sequence number and partition key.

There are two different operations in the Kinesis Data Streams API that add data to a stream, [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) and [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). The `PutRecords` operation sends multiple records to your stream per HTTP request, and the singular `PutRecord` operation sends records to your stream one at a time (a separate HTTP request is required for each record). You should prefer using `PutRecords` for most applications because it will achieve higher throughput per data producer. For more information about each of these operations, see the separate subsections below.

**Topics**
+ [Add multiple records with PutRecords](#kinesis-using-sdk-java-putrecords)
+ [Add a single record with PutRecord](#kinesis-using-sdk-java-putrecord)

Always keep in mind that, as your source application is adding data to the stream using the Kinesis Data Streams API, there are most likely one or more consumer applications that are simultaneously processing data off the stream. For information about how consumers get data using the Kinesis Data Streams API, see [Get data from a stream](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data).

**Important**  
[Change the data retention period](kinesis-extended-retention.md)

### Add multiple records with PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

The [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) operation sends multiple records to Kinesis Data Streams in a single request. By using `PutRecords`, producers can achieve higher throughput when sending data to their Kinesis data stream. Each `PutRecords` request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys. As with the single `PutRecord` operation described below, `PutRecords` uses sequence numbers and partition keys. However, the `PutRecord` parameter `SequenceNumberForOrdering` is not included in a `PutRecords` call. The `PutRecords` operation attempts to process all records in the natural order of the request. 

Each data record has a unique sequence number. The sequence number is assigned by Kinesis Data Streams after you call `client.putRecords` to add the data records to the stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between `PutRecords` requests, the larger the sequence numbers become.

**Note**  
Sequence numbers cannot be used as indexes to sets of data within the same stream. To logically separate sets of data, use partition keys or create a separate stream for each data set.

A `PutRecords` request can include records with different partition keys. The scope of the request is a stream; each request may include any combination of partition keys and records up to the request limits. Requests made with many different partition keys to streams with many different shards are generally faster than requests with a small number of partition keys to a small number of shards. The number of partition keys should be much larger than the number of shards to reduce latency and maximize throughput.

#### PutRecords example
<a name="kinesis-using-sdk-java-putrecords-example"></a>

The following code creates 100 data records with sequential partition keys and puts them in a stream called `DataStream`. 

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

The `PutRecords` response includes an array of response `Records`. Each record in the response array directly correlates with a record in the request array using natural ordering, from the top to the bottom of the request and response. The response `Records` array always includes the same number of records as the request array.

#### Handle failures when using PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

By default, failure of individual records within a request does not stop the processing of subsequent records in a `PutRecords` request. This means that a response `Records` array includes both successfully and unsuccessfully processed records. You must detect unsuccessfully processed records and include them in a subsequent call. 

Successful records include `SequenceNumber` and `ShardID` values, and unsuccessful records include `ErrorCode` and `ErrorMessage` values. The `ErrorCode` parameter reflects the type of error and can be one of the following values: `ProvisionedThroughputExceededException` or `InternalFailure`. `ErrorMessage` provides more detailed information about the `ProvisionedThroughputExceededException` exception including the account ID, stream name, and shard ID of the record that was throttled. The example below has three records in a `PutRecords` request. The second record fails and is reflected in the response. 

**Example PutRecords Request Syntax**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords Response Syntax**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

Records that were unsuccessfully processed can be included in subsequent `PutRecords` requests. First, check the `FailedRecordCount` parameter in the `putRecordsResult` to confirm if there are failed records in the request. If so, each `putRecordsEntry` that has an `ErrorCode` that is not `null` should be added to a subsequent request. For an example of this type of handler, refer to the following code.

**Example PutRecords failure handler**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### Add a single record with PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

Each call to [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) operates on a single record. Prefer the `PutRecords` operation described in [Add multiple records with PutRecords](#kinesis-using-sdk-java-putrecords) unless your application specifically needs to always send single records per request, or some other reason `PutRecords` can't be used.

Each data record has a unique sequence number. The sequence number is assigned by Kinesis Data Streams after you call `client.putRecord` to add the data record to the stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between `PutRecord` requests, the larger the sequence numbers become.

 When puts occur in quick succession, the returned sequence numbers are not guaranteed to increase because the put operations appear essentially as simultaneous to Kinesis Data Streams. To guarantee strictly increasing sequence numbers for the same partition key, use the `SequenceNumberForOrdering` parameter, as shown in the [PutRecord example](#kinesis-using-sdk-java-putrecord-example) code sample. 

 Whether or not you use `SequenceNumberForOrdering`, records that Kinesis Data Streams receives through a `GetRecords` call are strictly ordered by sequence number. 

**Note**  
Sequence numbers cannot be used as indexes to sets of data within the same stream. To logically separate sets of data, use partition keys or create a separate stream for each data set.

A partition key is used to group data within the stream. A data record is assigned to a shard within the stream based on its partition key. Specifically, Kinesis Data Streams uses the partition key as input to a hash function that maps the partition key (and associated data) to a specific shard.

 As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream. However, if the number of partition keys exceeds the number of shards, some shards necessarily contain records with different partition keys. From a design standpoint, to ensure that all your shards are well utilized, the number of shards (specified by the `setShardCount` method of `CreateStreamRequest`) should be substantially less than the number of unique partition keys, and the amount of data flowing to a single partition key should be substantially less than the capacity of the shard. 

#### PutRecord example
<a name="kinesis-using-sdk-java-putrecord-example"></a>

The following code creates ten data records, distributed across two partition keys, and puts them in a stream called `myStreamName`.

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

The preceding code sample uses `setSequenceNumberForOrdering` to guarantee strictly increasing ordering within each partition key. To use this parameter effectively, set the `SequenceNumberForOrdering` of the current record (record *n*) to the sequence number of the preceding record (record *n-1*). To get the sequence number of a record that has been added to the stream, call `getSequenceNumber` on the result of `putRecord`.

The `SequenceNumberForOrdering` parameter ensures strictly increasing sequence numbers for the same partition key. `SequenceNumberForOrdering` does not provide ordering of records across multiple partition keys. 

# Interact with data using the AWS Glue Schema Registry
<a name="kinesis-integration-glue-schema-registry"></a>

You can integrate your Kinesis data streams with the AWS Glue Schema Registry. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS Glue Schema Registry lets you improve end-to-end data quality and data governance within your streaming applications. For more information, see [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). One of the ways to set up this integration is through the `PutRecords` and `PutRecord` Kinesis Data Streams APIs available in the AWS Java SDK. 

For detailed instructions on how to set up integration of Kinesis Data Streams with schema registry using the PutRecords and PutRecord Kinesis Data Streams APIs, see the "Interacting with Data Using the Kinesis Data Streams APIs" section in [Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds).

# Write to Amazon Kinesis Data Streams using Kinesis Agent
<a name="writing-with-agents"></a>

Kinesis Agent is a stand-alone Java software application that offers an easy way to collect and send data to Kinesis Data Streams. The agent continuously monitors a set of files and sends new data to your stream. The agent handles file rotation, checkpointing, and retry upon failures. It delivers all of your data in a reliable, timely, and simple manner. It also emits Amazon CloudWatch metrics to help you better monitor and troubleshoot the streaming process.

By default, records are parsed from each file based on the newline (`'\n'`) character. However, the agent can also be configured to parse multi-line records (see [Specify the agent configuration settings](#agent-config-settings)). 

You can install the agent on Linux-based server environments such as web servers, log servers, and database servers. After installing the agent, configure it by specifying the files to monitor and the stream for the data. After the agent is configured, it durably collects data from the files and reliably sends it to the stream.

**Topics**
+ [Complete the prerequisites for Kinesis Agent](#prereqs)
+ [Download and install the agent](#download-install)
+ [Configure and start the agent](#config-start)
+ [Specify the agent configuration settings](#agent-config-settings)
+ [Monitor multiple file directories and write to multiple streams](#sim-writes)
+ [Use the agent to pre-process data](#pre-processing)
+ [Use agent CLI commands](#cli-commands)
+ [FAQ](#agent-faq)

## Complete the prerequisites for Kinesis Agent
<a name="prereqs"></a>
+ Your operating system must be either Amazon Linux AMI with version 2015.09 or later, or Red Hat Enterprise Linux version 7 or later.
+ If you are using Amazon EC2 to run your agent, launch your EC2 instance.
+ Manage your AWS credentials using one of the following methods:
  + Specify an IAM role when you launch your EC2 instance.
  + Specify AWS credentials when you configure the agent (see [awsAccessKeyId](#awsAccessKeyId) and [awsSecretAccessKey](#awsSecretAccessKey)).
  + Edit `/etc/sysconfig/aws-kinesis-agent` to specify your region and AWS access keys.
  + If your EC2 instance is in a different AWS account, create an IAM role to provide access to the Kinesis Data Streams service, and specify that role when you configure the agent (see [assumeRoleARN](#assumeRoleARN) and [assumeRoleExternalId](#assumeRoleExternalId)). Use one of the previous methods to specify the AWS credentials of a user in the other account who has permission to assume this role.
+ The IAM role or AWS credentials that you specify must have permission to perform the Kinesis Data Streams [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) operation for the agent to send data to your stream. If you enable CloudWatch monitoring for the agent, permission to perform the CloudWatch [PutMetricData](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html) operation is also needed. For more information, see [Controlling access to Amazon Kinesis Data Streams resources using IAM](controlling-access.md), [Monitor Kinesis Data Streams Agent health with Amazon CloudWatch](agent-health.md), and [CloudWatch Access Control](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/UsingIAM.html).

## Download and install the agent
<a name="download-install"></a>

First, connect to your instance. For more information, see [Connect to Your Instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-connect-to-instance-linux.html) in the *Amazon EC2 User Guide*. If you have trouble connecting, see [Troubleshooting Connecting to Your Instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/TroubleshootingInstancesConnecting.html) in the *Amazon EC2 User Guide*.

**To set up the agent using the Amazon Linux AMI**  
Use the following command to download and install the agent:

```
sudo yum install –y aws-kinesis-agent
```

**To set up the agent using Red Hat Enterprise Linux**  
Use the following command to download and install the agent:

```
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
```

**To set up the agent using GitHub**

1. Download the agent from [awlabs/amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent). 

1. Install the agent by navigating to the download directory and running the following command:

   ```
   sudo ./setup --install
   ```

**To set up the agent in a Docker container**  
Kinesis Agent can be run in a container as well via the [amazonlinux](https://docs.aws.amazon.com/AmazonECR/latest/userguide/amazon_linux_container_image.html) container base. Use the following Dockerfile and then run `docker build`.

```
FROM amazonlinux

RUN yum install -y aws-kinesis-agent which findutils
COPY agent.json /etc/aws-kinesis/agent.json

CMD ["start-aws-kinesis-agent"]
```

## Configure and start the agent
<a name="config-start"></a>

**To configure and start the agent**

1. Open and edit the configuration file (as superuser if using default file access permissions): `/etc/aws-kinesis/agent.json` 

   In this configuration file, specify the files ( `"filePattern"` ) from which the agent collects data, and the name of the stream ( `"kinesisStream"` ) to which the agent sends data. Note that the file name is a pattern, and the agent recognizes file rotations. You can rotate files or create new files no more than once per second. The agent uses the file creation timestamp to determine which files to track and tail into your stream; creating new files or rotating files more frequently than once per second does not allow the agent to differentiate properly between them.

   ```
   { 
      "flows": [
           { 
               "filePattern": "/tmp/app.log*", 
               "kinesisStream": "yourkinesisstream"
           } 
      ] 
   }
   ```

1. Start the agent manually:

   ```
   sudo service aws-kinesis-agent start
   ```

1. (Optional) Configure the agent to start on system startup:

   ```
   sudo chkconfig aws-kinesis-agent on
   ```

The agent is now running as a system service in the background. It continuously monitors the specified files and sends data to the specified stream. Agent activity is logged in `/var/log/aws-kinesis-agent/aws-kinesis-agent.log`. 

## Specify the agent configuration settings
<a name="agent-config-settings"></a>

The agent supports the two mandatory configuration settings, `filePattern` and `kinesisStream`, plus optional configuration settings for additional features. You can specify both mandatory and optional configuration in `/etc/aws-kinesis/agent.json`.

Whenever you change the configuration file, you must stop and start the agent, using the following commands:

```
sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start
```

Alternatively, you could use the following command:

```
sudo service aws-kinesis-agent restart
```

The following are the general configuration settings.


| Configuration Setting | Description | 
| --- | --- | 
| <a name="assumeRoleARN"></a>assumeRoleARN |  The ARN of the role to be assumed by the user. For more information, see [Delegate Access Across AWS Accounts Using IAM Roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html) in the *IAM User Guide*.  | 
| <a name="assumeRoleExternalId"></a>assumeRoleExternalId |  An optional identifier that determines who can assume the role. For more information, see [How to Use an External ID](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html) in the *IAM User Guide*.  | 
| <a name="awsAccessKeyId"></a>awsAccessKeyId |  AWS access key ID that overrides the default credentials. This setting takes precedence over all other credential providers.  | 
| <a name="awsSecretAccessKey"></a>awsSecretAccessKey |  AWS secret key that overrides the default credentials. This setting takes precedence over all other credential providers.  | 
| cloudwatch.emitMetrics |  Enables the agent to emit metrics to CloudWatch if set (true). Default: true  | 
| cloudwatch.endpoint |  The regional endpoint for CloudWatch. Default: `monitoring.us-east-1.amazonaws.com`  | 
| kinesis.endpoint |  The regional endpoint for Kinesis Data Streams. Default: `kinesis.us-east-1.amazonaws.com`  | 

The following are the flow configuration settings.


| Configuration Setting | Description | 
| --- | --- | 
| dataProcessingOptions |  The list of processing options applied to each parsed record before it is sent to the stream. The processing options are performed in the specified order. For more information, see [Use the agent to pre-process data](#pre-processing).  | 
| kinesisStream |  [Required] The name of the stream.  | 
| filePattern |  [Required] The directory and file pattern that must be matched to be picked up by the agent. For all files matching this pattern, read permission must be granted to `aws-kinesis-agent-user`. For the directory containing the files, read and execute permissions must be granted to `aws-kinesis-agent-user`.  | 
| initialPosition |  The initial position from which the file started to be parsed. Valid values are `START_OF_FILE` and `END_OF_FILE`. Default: `END_OF_FILE`  | 
| maxBufferAgeMillis |  The maximum time, in milliseconds, for which the agent buffers data before sending it to the stream. Value range: 1,000 to 900,000 (1 second to 15 minutes) Default: 60,000 (1 minute)  | 
| maxBufferSizeBytes |  The maximum size, in bytes, for which the agent buffers data before sending it to the stream. Value range: 1 to 4,194,304 (4 MB) Default: 4,194,304 (4 MB)  | 
| maxBufferSizeRecords |  The maximum number of records for which the agent buffers data before sending it to the stream. Value range: 1 to 500 Default: 500  | 
| minTimeBetweenFilePollsMillis |  The time interval, in milliseconds, at which the agent polls and parses the monitored files for new data. Value range: 1 or more Default: 100  | 
| multiLineStartPattern |  The pattern for identifying the start of a record. A record is made of a line that matches the pattern and any following lines that don't match the pattern. The valid values are regular expressions. By default, each new line in the log files is parsed as one record.  | 
| partitionKeyOption |  The method for generating the partition key. Valid values are `RANDOM` (randomonly generated integer) and `DETERMINISTIC` (a hash value computed from the data). Default: `RANDOM`  | 
| skipHeaderLines |  The number of lines for the agent to skip parsing at the beginning of monitored files. Value range: 0 or more Default: 0 (zero)  | 
| truncatedRecordTerminator |  The string that the agent uses to truncate a parsed record when the record size exceeds the Kinesis Data Streams record size limit. (1,000 KB) Default: `'\n'` (newline)  | 

## Monitor multiple file directories and write to multiple streams
<a name="sim-writes"></a>

By specifying multiple flow configuration settings, you can configure the agent to monitor multiple file directories and send data to multiple streams. In the following configuration example, the agent monitors two file directories and sends data to an Kinesis stream and a Firehose delivery stream respectively. Note that you can specify different endpoints for Kinesis Data Streams and Firehose so that your Kinesis stream and Firehose delivery stream don’t need to be in the same region.

```
{
    "cloudwatch.emitMetrics": true,
    "kinesis.endpoint": "https://your/kinesis/endpoint", 
    "firehose.endpoint": "https://your/firehose/endpoint", 
    "flows": [
        {
            "filePattern": "/tmp/app1.log*", 
            "kinesisStream": "yourkinesisstream"
        }, 
        {
            "filePattern": "/tmp/app2.log*",
            "deliveryStream": "yourfirehosedeliverystream" 
        }
    ] 
}
```

For more detailed information about using the agent with Firehose, see [Writing to Amazon Data Firehose with Kinesis Agent](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-agents.html).

## Use the agent to pre-process data
<a name="pre-processing"></a>

The agent can pre-process the records parsed from monitored files before sending them to your stream. You can enable this feature by adding the `dataProcessingOptions` configuration setting to your file flow. One or more processing options can be added and they will be performed in the specified order.

The agent supports the following processing options listed. Because the agent is open-source, you can further develop and extend its processing options. You can download the agent from [Kinesis Agent](https://github.com/awslabs/amazon-kinesis-agent).Processing Options

`SINGLELINE`  
Converts a multi-line record to a single line record by removing newline characters, leading spaces, and trailing spaces.  

```
{
    "optionName": "SINGLELINE"
}
```

`CSVTOJSON`  
Converts a record from delimiter separated format to JSON format.  

```
{
    "optionName": "CSVTOJSON",
    "customFieldNames": [ "field1", "field2", ... ],
    "delimiter": "yourdelimiter"
}
```  
`customFieldNames`  
[Required] The field names used as keys in each JSON key value pair. For example, if you specify `["f1", "f2"]`, the record "v1, v2" will be converted to `{"f1":"v1","f2":"v2"}`.  
`delimiter`  
The string used as the delimiter in the record. The default is a comma (,).

`LOGTOJSON`  
Converts a record from a log format to JSON format. The supported log formats are **Apache Common Log**, **Apache Combined Log**, **Apache Error Log**, and **RFC3164 Syslog**.  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "logformat",
    "matchPattern": "yourregexpattern",
    "customFieldNames": [ "field1", "field2", … ]
}
```  
`logFormat`  
[Required] The log entry format. The following are possible values:  
+ `COMMONAPACHELOG` — The Apache Common Log format. Each log entry has the following pattern by default: "`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}`".
+ `COMBINEDAPACHELOG` — The Apache Combined Log format. Each log entry has the following pattern by default: "`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}`".
+ `APACHEERRORLOG` — The Apache Error Log format. Each log entry has the following pattern by default: "`[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}`".
+ `SYSLOG` — The RFC3164 Syslog format. Each log entry has the following pattern by default: "`%{timestamp} %{hostname} %{program}[%{processid}]: %{message}`".  
`matchPattern`  
The regular expression pattern used to extract values from log entries. This setting is used if your log entry is not in one of the predefined log formats. If this setting is used, you must also specify `customFieldNames`.  
`customFieldNames`  
The custom field names used as keys in each JSON key value pair. You can use this setting to define field names for values extracted from `matchPattern`, or override the default field names of predefined log formats.

**Example : LOGTOJSON Configuration**  <a name="example-logtojson"></a>
Here is one example of a `LOGTOJSON` configuration for an Apache Common Log entry converted to JSON format:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG"
}
```
Before conversion:  

```
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
```
After conversion:  

```
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
```

**Example : LOGTOJSON Configuration With Custom Fields**  <a name="example-logtojson-custom-fields"></a>
Here is another example `LOGTOJSON` configuration:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"]
}
```
With this configuration setting, the same Apache Common Log entry from the previous example is converted to JSON format as follows:  

```
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
```

**Example : Convert Apache Common Log Entry**  <a name="example-apache-common-log-entry"></a>
The following flow configuration converts an Apache Common Log entry to a single line record in JSON format:  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "dataProcessingOptions": [
                {
                    "optionName": "LOGTOJSON",
                    "logFormat": "COMMONAPACHELOG"
                }
            ]
        }
    ] 
}
```

**Example : Convert Multi-Line Records**  <a name="example-convert-multiline"></a>
The following flow configuration parses multi-line records whose first line starts with "`[SEQUENCE=`". Each record is first converted to a single line record. Then, values are extracted from the record based on a tab delimiter. Extracted values are mapped to specified `customFieldNames` values to form a single-line record in JSON format.  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "multiLineStartPattern": "\\[SEQUENCE=",
            "dataProcessingOptions": [
                {
                    "optionName": "SINGLELINE"
                },
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": [ "field1", "field2", "field3" ],
                    "delimiter": "\\t"
                }
            ]
        }
    ] 
}
```

**Example : LOGTOJSON Configuration with Match Pattern**  <a name="example-logtojson-match-pattern"></a>
Here is one example of a `LOGTOJSON` configuration for an Apache Common Log entry converted to JSON format, with the last field (bytes) omitted:  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})",
    "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"]
}
```
Before conversion:  

```
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
```
After conversion:  

```
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
```

## Use agent CLI commands
<a name="cli-commands"></a>

Automatically start the agent on system startup: 

```
sudo chkconfig aws-kinesis-agent on
```

Check the status of the agent: 

```
sudo service aws-kinesis-agent status
```

Stop the agent: 

```
sudo service aws-kinesis-agent stop
```

Read the agent's log file from this location:

```
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
```

Uninstall the agent:

```
sudo yum remove aws-kinesis-agent
```

## FAQ
<a name="agent-faq"></a>

### Is there a Kinesis Agent for Windows?
<a name="agent-faq-1"></a>

[Kinesis Agent for Windows](https://docs.aws.amazon.com/kinesis-agent-windows/latest/userguide/what-is-kinesis-agent-windows.html) is different software than Kinesis Agent for Linux platforms.

### Why is Kinesis Agent slowing down and/or `RecordSendErrors` increasing?
<a name="agent-faq-2"></a>

This is usually due to throttling from Kinesis. Check the `WriteProvisionedThroughputExceeded` metric for Kinesis Data Streams or the `ThrottledRecords` metric for Firehose Delivery Streams. Any increase from 0 in these metrics indicates that the stream limits need to be increased. For more information, see [Kinesis Data Stream limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) and [Amazon Firehose Delivery Streams](https://docs.aws.amazon.com/firehose/latest/dev/limits.html).

Once you rule out throttling, see if the Kinesis Agent is configured to tail a large amount of small files. There is a delay when Kinesis Agent tails a new file, so Kinesis Agent should be tailing a small amount of larger files. Try consolidating your log files into larger files.

### Why am I getting `java.lang.OutOfMemoryError`exceptions?
<a name="agent-faq-4"></a>

Kinesis Agent does not have enough memory to handle its current workload. Try increasing `JAVA_START_HEAP` and `JAVA_MAX_HEAP` in `/usr/bin/start-aws-kinesis-agent` and restarting the agent.

### Why am I getting `IllegalStateException : connection pool shut down` exceptions?
<a name="agent-faq-5"></a>

Kinesis Agent does not have enough connections to handle its current workload. Try increasing `maxConnections` and `maxSendingThreads` in your general agent configuration settings at `/etc/aws-kinesis/agent.json`. The default value for these fields is 12 times the runtime processors available. See [AgentConfiguration.java](https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/config/AgentConfiguration.java) for more about advanced agent configurations settings. 

### How can I debug another issue with Kinesis Agent?
<a name="agent-faq-6"></a>

`DEBUG` level logs can be enabled in `/etc/aws-kinesis/log4j.xml` .

### How should I configure Kinesis Agent?
<a name="agent-faq-7"></a>

The smaller the `maxBufferSizeBytes`, the more frequently Kinesis Agent will send data. This can be good as it decreases delivery time of records, but it also increases the requests per second to Kinesis. 

### Why is Kinesis Agent sending duplicate records?
<a name="agent-faq-8"></a>

This occurs due to a misconfiguration in file tailing. Make sure that each `fileFlow’s filePattern` is only matching one file. This can also occur if the `logrotate` mode being used is in `copytruncate` mode. Try changing the mode to the default or create mode to avoid duplication. For more information on handling duplicate records, see [Handling Duplicate Records](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html).

# Write to Kinesis Data Streams using other AWS services
<a name="using-other-services"></a>

The following AWS services can integrate directly with Amazon Kinesis Data Streams to write data to Kinesis data streams. Review the information for each service that you are interested in and refer to the provided references.

**Topics**
+ [Write to Kinesis Data Streams using AWS Amplify](using-other-services-amplify.md)
+ [Write to Kinesis Data Streams using Amazon Aurora](using-other-services-aurora.md)
+ [Write to Kinesis Data Streams using Amazon CloudFront](using-other-services-CloudFront.md)
+ [Write to Kinesis Data Streams using Amazon CloudWatch Logs](using-other-services-cw-logs.md)
+ [Write to Kinesis Data Streams using Amazon Connect](using-other-services-connect.md)
+ [Write to Kinesis Data Streams using AWS Database Migration Service](using-other-services-migration.md)
+ [Write to Kinesis Data Streams using Amazon DynamoDB](using-other-services-ddb.md)
+ [Write to Kinesis Data Streams using Amazon EventBridge](using-other-services-eventbridges.md)
+ [Write to Kinesis Data Streams using AWS IoT Core](using-other-services-iot-core.md)
+ [Write to Kinesis Data Streams using Amazon Relational Database Service](using-other-services-rds.md)
+ [Write to Kinesis Data Streams usingAmazon Pinpoint](using-other-services-pinpoint.md)
+ [Write to Kinesis Data Streams using Amazon Quantum Ledger Database (Amazon QLDB)](using-other-services-quantum-ledger.md)

# Write to Kinesis Data Streams using AWS Amplify
<a name="using-other-services-amplify"></a>

You can use Amazon Kinesis Data Streams to stream data from your mobile applications built with AWS Amplify for real-time processing. You can then build real-time dashboards, capture exceptions and generate alerts, drive recommendations, and make other real-time business or operational decisions. You can also send data to other services such as Amazon Simple Storage Service, Amazon DynamoDB, and Amazon Redshift.

For more information, see [Using Amazon Kinesis](https://docs.amplify.aws/react/build-a-backend/more-features/analytics/streaming-data/) in the *AWS Amplify Developer Center*. 

# Write to Kinesis Data Streams using Amazon Aurora
<a name="using-other-services-aurora"></a>

You can use Amazon Kinesis Data Streams to monitor activities on your Amazon Aurora DB clusters. Using Database Activity Streams, your Aurora DB cluster pushes activities to an Amazon Kinesis Data Stream in real-time. You can then build applications for compliance management that consume these activities, audit them and generate alerts. You can also use Amazon Amazon Firehose to store the data.

For more information, see [Database Activity Streams](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html) in the *Amazon Aurora Developer Guide*. 

# Write to Kinesis Data Streams using Amazon CloudFront
<a name="using-other-services-CloudFront"></a>

You can use Amazon Kinesis Data Streams with CloudFront real-time logs and get information about requests made to a distribution in real time. You can then build your own [Kinesis data stream consumer](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html), or use Amazon Data Firehose to send the log data to Amazon S3, Amazon Redshift, Amazon OpenSearch Service, or a third-party log processing service.

For more information, see [Real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) in the *Amazon CloudFront Developer Guide*. 

# Write to Kinesis Data Streams using Amazon CloudWatch Logs
<a name="using-other-services-cw-logs"></a>

You can use CloudWatch subscriptions to get access to a real-time feed of log events from Amazon CloudWatch Logs and have it delivered to a Kinesis data stream for processing, analysis, and loading to to other systems. 

For more information, see [Real-time processing of log data with subscriptions](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Subscriptions.html) in the *Amazon CloudWatch Logs User Guide*. 

# Write to Kinesis Data Streams using Amazon Connect
<a name="using-other-services-connect"></a>

You can use Kinesis Data Streams to export contact records and agent events in real-time from your Amazon Connect instance. You can also enable data streaming from Amazon Connect Customer Profiles to automatically receive updates to a Kinesis data stream about creation of new profiles or changes to existing ones.

You can then build consumer applications to process and analyze the data in real time. For example, using contact records and customer profile data, you can keep your source systems data, such as CRMs and marketing automation tools, up-to-date with the latest information. Using the agents event data, you can create dashboards that display agent information and events, and trigger custom notifications of specific agent activity.

For more information, see [data streaming for your instance](https://docs.aws.amazon.com/connect/latest/adminguide/data-streaming.html), [set up real-time export](https://docs.aws.amazon.com/connect/latest/adminguide/set-up-real-time-export.html), and [agent event streams](https://docs.aws.amazon.com/connect/latest/adminguide/agent-event-streams.html) in the * Amazon Connect Administrator Guide*. 

# Write to Kinesis Data Streams using AWS Database Migration Service
<a name="using-other-services-migration"></a>

You can use AWS Database Migration Service to migrate data to a Kinesis data stream. You can than build consumer applications that process the data records in real time. You can also easily send data downstream to other services such as Amazon Simple Storage Service, Amazon DynamoDB, and Amazon Redshift

For more information, see [Using Kinesis Data Streams](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html) in the *AWS Database Migration Service User Guide*. 

# Write to Kinesis Data Streams using Amazon DynamoDB
<a name="using-other-services-ddb"></a>

You can use Amazon Kinesis Data Streams to capture changes to Amazon DynamoDB. Kinesis Data Streams captures item-level modifications in any DynamoDB table and replicates them to a Kinesis data stream. Your consumer applications can access this stream to view item-level changes in real time and deliver those changes downstream or take action based on the content.

For more information, see [how Kinesis Data Streams work with DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html) in the *Amazon DynamoDB Developer Guide*. 

# Write to Kinesis Data Streams using Amazon EventBridge
<a name="using-other-services-eventbridges"></a>

Using Kinesis Data Streams, you can send AWS API call [events](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-events.html) in EventBridge to a stream, build consumer applications, and process large amounts of data. You can also use Kinesis Data Streams as a target in EventBridge Pipes and deliver records a stream from one of the available sources after optional filtering and enrichment.

For more information, see [Send events to an Amazon Kinesis stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-relay-events-kinesis-stream.html) and [EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html) in the *Amazon EventBridge User Guide*. 

# Write to Kinesis Data Streams using AWS IoT Core
<a name="using-other-services-iot-core"></a>

You can write data in real time from MQTT messages in AWS IoT Core by using AWS IoT Rule actions. You can then build applications that process the data, analyze its contents and generate alerts, and deliver it into analytics applications or other AWS services, 

For more information, see [Kinesis Data Streams](https://docs.aws.amazon.com/iot/latest/developerguide/kinesis-rule-action.html) in the *AWS IoT Core Developer Guide*. 

# Write to Kinesis Data Streams using Amazon Relational Database Service
<a name="using-other-services-rds"></a>

You can use Amazon Kinesis Data Streams to monitor activities on your Amazon RDS instances. Using Database Activity Streams, Amazon RDS pushes activities to a Kinesis data stream in real-time. You can then build applications for compliance management that consume these activities, audit them and generate alerts. You can also use Amazon Data Firehose to store the data.

For more information, see [Database Activity Streams](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/DBActivityStreams.html) in the *Amazon RDS Developer Guide*. 

# Write to Kinesis Data Streams usingAmazon Pinpoint
<a name="using-other-services-pinpoint"></a>

You can set up Amazon Pinpoint to send event data to Amazon Kinesis Data Streams. Amazon Pinpoint can send event data for campaigns, journeys, and transactional email and SMS messages. You can then ingest the data into analytics applications or build your own consumer applications that take actions based on the contents of the events.

For more information, see [Streaming Events](https://docs.aws.amazon.com/pinpoint/latest/developerguide/event-streams.html) in the *Amazon Pinpoint Developer Guide*. 

# Write to Kinesis Data Streams using Amazon Quantum Ledger Database (Amazon QLDB)
<a name="using-other-services-quantum-ledger"></a>

You can create a stream in Amazon QLDB that captures every document revision that is committed to your journal and delivers this data to Amazon Kinesis Data Streams in real time. A QLDB stream is a continuous flow of data from your ledger's journal to a Kinesis data stream resource. Then, you can use the Kinesis streaming platform or the Kinesis Client Library to consume your stream, process the data records, and analyze the data contents. A QLDB stream writes your data to Kinesis Data Streams in three types of records: `control`, `block summary`, and `revision details`. 

For more information, see [Streams](https://docs.aws.amazon.com/qldb/latest/developerguide/streams.html) in the *Amazon QLDB developer Guide*. 

# Write to Kinesis Data Streams using third-party integrations
<a name="using-other-services-third-party"></a>

You can write data to Kinesis Data Streams using one of the following third-party options that integrate with Kinesis Data Streams. Select the option that you want to learn more about and find resources and links to relevant documentation.

**Topics**
+ [Apache Flink](using-other-services-flink.md)
+ [Fluentd](using-other-services-Fluentd.md)
+ [Debezium](using-other-services-Debezium.md)
+ [Oracle GoldenGate](using-other-services-Oracle-GoldenGate.md)
+ [Kafka Connect](using-other-services-kafka-connect.md)
+ [Adobe Experience](using-other-services-adobe.md)
+ [Striim](using-other-services-Striim.md)

# Apache Flink
<a name="using-other-services-flink"></a>

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. For more information on writing to Kinesis Data Streams from Apache Flink, see [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/). 

# Fluentd
<a name="using-other-services-Fluentd"></a>

Fluentd is an open source data collector for unified logging layer. For more information on writing to Kinesis Data Streams from Fluentd. For more information see [Stream processing with Kinesis](https://docs.fluentd.org/how-to-guides/kinesis-stream). 

# Debezium
<a name="using-other-services-Debezium"></a>

Debezium is an open source distributed platform for change data capture. For more information on writing to Kinesis Data Streams from Debezium, see [Streaming MySQL Data Changes to Amazon Kinesis](https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/). 

# Oracle GoldenGate
<a name="using-other-services-Oracle-GoldenGate"></a>

Oracle GoldenGate is a software product that allows you to replicate, filter, and transform data from one database to another database. For more information on writing to Kinesis Data Streams from Oracle GoldenGate, see [Data replication to Kinesis Data Stream using Oracle GoldenGate](https://blogs.oracle.com/dataintegration/post/data-replication-to-aws-kinesis-data-stream-using-oracle-goldengate). 

# Kafka Connect
<a name="using-other-services-kafka-connect"></a>

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. For more information on writing data from Apache Kafka to Kinesis Data Streams, see [the Kinesis kafka connector](https://github.com/awslabs/kinesis-kafka-connector). 

# Adobe Experience
<a name="using-other-services-adobe"></a>

Adobe Experience Platform enables organizations to centralize and standardize customer data from any system. It then applies data science and machine learning to dramatically improve the design and delivery of rich, personalized experiences. For more information on writing data from the Adobe Experience Platform to Kinesis Data Streams. see how to create an [Amazon Kinesis connection](https://experienceleague.adobe.com/docs/experience-platform/destinations/catalog/cloud-storage/amazon-kinesis.html?lang=en). 

# Striim
<a name="using-other-services-Striim"></a>

Striim is a complete, end-to-end, in-memory platform for collecting, filtering, transforming, enriching, aggregating, analyzing, and delivering data in real time. For more information on how to write data to Kinesis Data Streams from Striim, see the [Kinesis Writer](https://www.striim.com/docs/en/kinesis-writer.html). 

# Troubleshoot Amazon Kinesis Data Streams producers
<a name="troubleshooting-producers"></a>

**Topics**
+ [My producer application is writing at a slower rate than expected](#producer-writing-at-slower-rate)
+ [I receive an unauthorized KMS master key permission error](#unauthorized-kms-producer)
+ [Troubleshoot other common issues for producers](#misc-troubleshooting-producer)

## My producer application is writing at a slower rate than expected
<a name="producer-writing-at-slower-rate"></a>

**Topics**
+ [Service limits exceeded](#service-limits-exceeded)
+ [I want to optimize my producer](#producer-optimization)
+ [Misuse of `flushSync()` operations](#misuse-tag)

### Service limits exceeded
<a name="service-limits-exceeded"></a>

To find out if service limits are being exceeded, check to see if your producer is throwing throughput exceptions from the service, and validate what API operations are being throttled. Keep in mind that there are different limits based on the call, see [Quotas and limits](service-sizes-and-limits.md). For example, in addition to the shard-level limits for writes and reads that are most commonly known, there are the following stream-level limits:
+ [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)
+ [DeleteStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html)
+ [ListStreams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html)
+ [GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)
+ [MergeShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html)
+ [DescribeStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)
+ [DescribeStreamSummary](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)

The operations `CreateStream`, `DeleteStream`, `ListStreams`, `GetShardIterator`, and `MergeShards` are limited to 5 calls per second. The `DescribeStream` operation is limited to 10 calls per second. The `DescribeStreamSummary` operation is limited to 20 calls per second.

If these calls aren't the issue, make sure you've selected a partition key that allows you to distribute *put* operations evenly across all shards, and that you don't have a particular partition key that's bumping into the service limits when the rest are not. This requires that you measure peak throughput and take into account the number of shards in your stream. For more information about managing streams, see [Create and manage Kinesis data streams](working-with-streams.md).

**Tip**  
Remember to round up to the nearest kilobyte for throughput throttling calculations when using the single-record operation [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html), while the multi-record operation [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) rounds on the cumulative sum of the records in each call. For example, a `PutRecords` request with 600 records that are 1.1 KB in size will not get throttled. 

### I want to optimize my producer
<a name="producer-optimization"></a>

Before you begin optimizing your producer, complete the following key tasks. First, identify your desired peak throughput in terms of record size and records per second. Next, rule out stream capacity as the limiting factor ([Service limits exceeded](#service-limits-exceeded)). If you've ruled out stream capacity, use the following troubleshooting tips and optimization guidelines for the two common types of producers.

**Large Producer**

A large producer is usually running from an on-premises server or Amazon EC2 instance. Customers who need higher throughput from a large producer typically care about per-record latency. Strategies for dealing with latency include the following: If the customer can micro-batch/buffer records, use the [Amazon Kinesis Producer Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) (which has advanced aggregation logic), the multi-record operation [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html), or aggregate records into a larger file before using the single-record operation [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). If you are unable to batch/buffer, use multiple threads to write to the Kinesis Data Streams service at the same time. The AWS SDK for Java and other SDKs include async clients that can do this with very little code.

**Small Producer**

A small producer is usually a mobile app, IoT device, or web client. If it’s a mobile app, we recommend using the `PutRecords` operation or the Kinesis Recorder in the AWS Mobile SDKs. For more information, see AWS Mobile SDK for Android Getting Started Guide and AWS Mobile SDK for iOS Getting Started Guide. Mobile apps must handle intermittent connections inherently and need some sort of batch put, such as `PutRecords`. If you are unable to batch for some reason, see the Large Producer information above. If your producer is a browser, the amount of data being generated is typically very small. However, you are putting the *put* operations on the critical path of the application, which we don’t recommend.

### Misuse of `flushSync()` operations
<a name="misuse-tag"></a>

Using `flushSync()` incorrectly can significantly impact write performance. The `flushSync()` operation is designed for shutdown scenarios to make sure that all buffered records are sent before the KPL application terminates. If you implemented this operation after every write operation, it can add substantial extra latency, around 500ms per write. Make sure that you have implemented `flushSync()` only for the application shutdown to avoid unnecessary extra delay in write performance. 

## I receive an unauthorized KMS master key permission error
<a name="unauthorized-kms-producer"></a>

This error occurs when a producer application writes to an encrypted stream without permissions on the KMS master key. To assign permissions to an application to access a KMS key, see [Using Key Policies in AWS KMS](https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html) and [Using IAM Policies with AWS KMS](https://docs.aws.amazon.com/kms/latest/developerguide/iam-policies.html).

## Troubleshoot other common issues for producers
<a name="misc-troubleshooting-producer"></a>
+ [Why is my Kinesis data stream returning a 500 Internal Server Error?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-500-error/)
+ [How do I troubleshoot timeout errors when writing from Flink to Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/)
+ [How do I troubleshoot throttling errors in Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling-errors/)
+ [Why is my Kinesis data stream throttling?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling/)
+ [How can I put data records into a Kinesis data stream using the KPL?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-kpl/)

# Optimize Kinesis Data Streams producers
<a name="advanced-producers"></a>

You can further optimize your Amazon Kinesis Data Streams producers depending on specific behavior you see. Review the following topics to identify solutions.

**Topics**
+ [Customize KPL retries and rate limit behavior](kinesis-producer-adv-retries-rate-limiting.md)
+ [Apply best practices to KPL aggregation](kinesis-producer-adv-aggregation.md)

# Customize KPL retries and rate limit behavior
<a name="kinesis-producer-adv-retries-rate-limiting"></a>

When you add Amazon Kinesis Producer Library (KPL) user records using the KPL `addUserRecord()` operation, a record is given a time stamp and added to a buffer with a deadline set by the `RecordMaxBufferedTime` configuration parameter. This time stamp/deadline combination sets the buffer priority. Records are flushed from the buffer based on the following criteria:
+ Buffer priority
+ Aggregation configuration
+ Collection configuration

The aggregation and collection configuration parameters affecting buffer behavior are as follows:
+ `AggregationMaxCount`
+ `AggregationMaxSize`
+ `CollectionMaxCount`
+ `CollectionMaxSize`

Records flushed are then sent to your Kinesis data stream as Amazon Kinesis Data Streams records using a call to the Kinesis Data Streams API operation `PutRecords`. The `PutRecords` operation sends requests to your stream that occasionally exhibit full or partial failures. Records that fail are automatically added back to the KPL buffer. The new deadline is set based on the minimum of these two values: 
+ Half the current `RecordMaxBufferedTime` configuration
+ The record’s time-to-live value

This strategy allows retried KPL user records to be included in subsequent Kinesis Data Streams API calls, to improve throughput and reduce complexity while enforcing the Kinesis Data Streams record’s time-to-live value. There is no backoff algorithm, making this a relatively aggressive retry strategy. Spamming due to excessive retries is prevented by rate limiting, discussed in the next section.

## Rate limiting
<a name="kinesis-producer-adv-retries-rate-limiting-rate-limit"></a>

The KPL includes a rate limiting feature, which limits per-shard throughput sent from a single producer. Rate limiting is implemented using a token bucket algorithm with separate buckets for both Kinesis Data Streams records and bytes. Each successful write to a Kinesis data stream adds a token (or multiple tokens) to each bucket, up to a certain threshold. This threshold is configurable but by default is set 50 percent higher than the actual shard limit, to allow shard saturation from a single producer. 

You can lower this limit to reduce spamming due to excessive retries. However, the best practice is for each producer to retry for maximum throughput aggressively and to handle any resulting throttling determined as excessive by expanding the capacity of the stream and implementing an appropriate partition key strategy.

# Apply best practices to KPL aggregation
<a name="kinesis-producer-adv-aggregation"></a>

While the sequence number scheme of the resulting Amazon Kinesis Data Streams records remains the same, aggregation causes the indexing of Amazon Kinesis Producer Library (KPL) user records contained within an aggregated Kinesis Data Streams record to start at 0 (zero); however, as long as you do not rely on sequence numbers to uniquely identify your KPL user records, your code can ignore this, as the aggregation (of your KPL user records into a Kinesis Data Streams record) and subsequent de-aggregation (of a Kinesis Data Streams record into your KPL user records) automatically takes care of this for you. This applies whether your consumer is using the KCL or the AWS SDK. To use this aggregation functionality, you’ll need to pull the Java part of the KPL into your build if your consumer is written using the API provided in the AWS SDK.

If you intend to use sequence numbers as unique identifiers for your KPL user records, we recommend that you use the contract-abiding `public int hashCode()` and `public boolean equals(Object obj)` operations provided in `Record` and `UserRecord` to enable the comparison of your KPL user records. Additionally, if you want to examine the subsequence number of your KPL user record, you can cast it to a `UserRecord` instance and retrieve its subsequence number.

For more information, see [Implement consumer de-aggregation](kinesis-kpl-consumer-deaggregation.md).