

# Authentication and authorization for Apache Kafka APIs
<a name="kafka_apis_iam"></a>

You can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.

For information on how to control who can perform [Amazon MSK operations](https://docs.aws.amazon.com/msk/1.0/apireference/operations.html) on your cluster, see [Authentication and authorization for Amazon MSK APIs](security-iam.md).

**Topics**
+ [IAM access control](iam-access-control.md)
+ [Mutual TLS client authentication for Amazon MSK](msk-authentication.md)
+ [Sign-in credentials authentication with AWS Secrets Manager](msk-password.md)
+ [Apache Kafka ACLs](msk-acls.md)

# IAM access control
<a name="iam-access-control"></a>

IAM access control for Amazon MSK enables you to handle both authentication and authorization for your MSK cluster. This eliminates the need to use one mechanism for authentication and another for authorization. For example, when a client tries to write to your cluster, Amazon MSK uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

IAM access control works for Java and non-Java clients, including Kafka clients written in Python, Go, JavaScript, and .NET. IAM access control for non-Java clients is available for MSK clusters with Kafka version 2.7.1 or above.

To make IAM access control possible, Amazon MSK makes minor modifications to Apache Kafka source code. These modifications won't cause a noticeable difference in your Apache Kafka experience. Amazon MSK logs access events so you can audit them.

You can invoke Apache Kafka ACL APIs for an MSK cluster that uses IAM access control. However, Apache Kafka ACLs have no effect on authorization for IAM identities. You must use IAM policies to control access for IAM identities.

**Important considerations**  
When you use IAM access control with your MSK cluster, keep in mind the following important considerations:  
IAM access control doesn't apply to Apache ZooKeeper nodes. For information about how you can control access to those nodes, see [Control access to Apache ZooKeeper nodes in your Amazon MSK cluster](zookeeper-security.md).
The `allow.everyone.if.no.acl.found` Apache Kafka setting has no effect if your cluster uses IAM access control. 
You can invoke Apache Kafka ACL APIs for an MSK cluster that uses IAM access control. However, Apache Kafka ACLs have no effect on authorization for IAM identities. You must use IAM policies to control access for IAM identities.

# How IAM access control for Amazon MSK works
<a name="how-to-use-iam-access-control"></a>

To use IAM access control for Amazon MSK, perform the following steps, which are described in detail in these topics:
+ [Create a Amazon MSK cluster that uses IAM access control](create-iam-access-control-cluster-in-console.md) 
+ [Configure clients for IAM access control](configure-clients-for-iam-access-control.md)
+ [Create authorization policies for the IAM role](create-iam-access-control-policies.md)
+ [Get the bootstrap brokers for IAM access control](get-bootstrap-brokers-for-iam.md)

# Create a Amazon MSK cluster that uses IAM access control
<a name="create-iam-access-control-cluster-in-console"></a>

This section explains how you can use the AWS Management Console, the API, or the AWS CLI to create a Amazon MSK cluster that uses IAM access control. For information about how to turn on IAM access control for an existing cluster, see [Update security settings of a Amazon MSK cluster](msk-update-security.md).

**Use the AWS Management Console to create a cluster that uses IAM access control**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Choose **Create cluster**.

1. Choose **Create cluster with custom settings**.

1. In the **Authentication** section, choose **IAM access control**.

1. Complete the rest of the workflow for creating a cluster.

**Use the API or the AWS CLI to create a cluster that uses IAM access control**
+ To create a cluster with IAM access control enabled, use the [CreateCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters.html#CreateCluster) API or the [create-cluster](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/create-cluster.html) CLI command, and pass the following JSON for the `ClientAuthentication` parameter: `"ClientAuthentication": { "Sasl": { "Iam": { "Enabled": true } }`. 

# Configure clients for IAM access control
<a name="configure-clients-for-iam-access-control"></a>

To enable clients to communicate with an MSK cluster that uses IAM access control, you can use either of these mechanisms:
+ Non-Java client configuration using SASL\$1OAUTHBEARER mechanism
+ Java client configuration using SASL\$1OAUTHBEARER mechanism or AWS\$1MSK\$1IAM mechanism

## Use the SASL\$1OAUTHBEARER mechanism to configure IAM
<a name="configure-clients-for-iam-access-control-sasl-oauthbearer"></a>

1. Edit your client.properties configuration file using the following Python Kafka client example. Configuration changes are similar in other languages.

   ```
   from kafka import KafkaProducer
   from kafka.errors import KafkaError
   from kafka.sasl.oauth import AbstractTokenProvider
   import socket
   import time
   from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
   
   class MSKTokenProvider():
       def token(self):
           token, _ = MSKAuthTokenProvider.generate_auth_token('<my AWS Region>')
           return token
   
   tp = MSKTokenProvider()
   
   producer = KafkaProducer(
       bootstrap_servers='<myBootstrapString>',
       security_protocol='SASL_SSL',
       sasl_mechanism='OAUTHBEARER',
       sasl_oauth_token_provider=tp,
       client_id=socket.gethostname(),
   )
   
   topic = "<my-topic>"
   while True:
       try:
           inp=input(">")
           producer.send(topic, inp.encode())
           producer.flush()
           print("Produced!")
       except Exception:
           print("Failed to send message:", e)
   
   producer.close()
   ```

1. Download the helper library for your chosen configuration language and follow the instructions in the *Getting started* section of that language library’s homepage.
   + JavaScript: [https://github.com/aws/aws-msk-iam-sasl-signer-js\$1getting-started](https://github.com/aws/aws-msk-iam-sasl-signer-js#getting-started)
   + Python: [https://github.com/aws/aws-msk-iam-sasl-signer-python\$1get-started](https://github.com/aws/aws-msk-iam-sasl-signer-python#get-started)
   + Go: [https://github.com/aws/aws-msk-iam-sasl-signer-go\$1getting-started](https://github.com/aws/aws-msk-iam-sasl-signer-go#getting-started)
   + .NET: [https://github.com/aws/aws-msk-iam-sasl-signer-net\$1getting-started](https://github.com/aws/aws-msk-iam-sasl-signer-net#getting-started)
   + JAVA: SASL\$1OAUTHBEARER support for Java is available through the [https://github.com/aws/aws-msk-iam-auth/releases](https://github.com/aws/aws-msk-iam-auth/releases) jar file

## Use the MSK custom AWS\$1MSK\$1IAM mechanism to configure IAM
<a name="configure-clients-for-iam-access-control-msk-iam"></a>

1. Add the following to the `client.properties` file. Replace *<PATH\$1TO\$1TRUST\$1STORE\$1FILE>* with the fully-qualified path to the trust store file on the client.
**Note**  
If you don't want to use a specific certificate, you can remove `ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>` from your `client.properties` file. When you don't specify a value for `ssl.truststore.location`, the Java process uses the default certificate.

   ```
   ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
   sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
   ```

   To use a named profile that you created for AWS credentials, include `awsProfileName="your profile name";` in your client configuration file. For information about named profiles, see [Named profiles](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) in the AWS CLI documentation.

1. Download the latest stable [aws-msk-iam-auth](https://github.com/aws/aws-msk-iam-auth/releases) JAR file, and place it in the class path. If you use Maven, add the following dependency, adjusting the version number as needed:

   ```
   <dependency>
       <groupId>software.amazon.msk</groupId>
       <artifactId>aws-msk-iam-auth</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

The Amazon MSK client plugin is open-sourced under the Apache 2.0 license.

# Create authorization policies for the IAM role
<a name="create-iam-access-control-policies"></a>

Attach an authorization policy to the IAM role that corresponds to the client. In an authorization policy, you specify which actions to allow or deny for the role. If your client is on an Amazon EC2 instance, associate the authorization policy with the IAM role for that Amazon EC2 instance. Alternatively, you can configure your client to use a named profile, and then you associate the authorization policy with the role for that named profile. [Configure clients for IAM access control](configure-clients-for-iam-access-control.md) describes how to configure a client to use a named profile.

For information about how to create an IAM policy, see [Creating IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_create.html). 

The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the `Action` and `Resource` elements, see [Semantics of IAM authorization policy actions and resources](kafka-actions.md).

**Important**  
Changes that you make to an IAM policy are reflected in the IAM APIs and the AWS CLI immediately. However, it can take noticeable time for the policy change to take effect. In most cases, policy changes take effect in less than a minute. Network conditions may sometimes increase the delay.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}
```

------

To learn how to create a policy with action elements that correspond to common Apache Kafka use cases, like producing and consuming data, see [Common use cases for client authorization policy](iam-access-control-use-cases.md).

For Kafka versions 2.8.0 and above, the **WriteDataIdempotently** permission is deprecated ([KIP-679](https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default)). By default,`enable.idempotence = true` is set. Therefore, for Kafka versions 2.8.0 and above, IAM doesn't offer the same functionality as Kafka ACLs. It isn't possible to `WriteDataIdempotently` to a topic by only providing `WriteData` access to that topic. This doesn't affect the case when `WriteData` is provided to **ALL** topics. In that case, `WriteDataIdempotently` is allowed. This is due to differences in implementation of IAM logic and how the Kafka ACLs are implemented. Additonally, writing to a topic idempotently also requires access to `transactional-ids`.

To work around this, we recommend using a policy similar to the following policy.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:WriteDataIdempotently"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/TestTopic",
                "arn:aws:kafka:us-east-1:123456789012:transactional-id/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*"
            ]
        }
    ]
}
```

------

In this case, `WriteData` allows writes to `TestTopic`, while `WriteDataIdempotently` allows idempotent writes to the cluster. This policy also adds access to the `transactional-id` resources that will be needed.

Because `WriteDataIdempotently` is a cluster level permission, you can't use it at the topic level. If `WriteDataIdempotently` is restricted to the topic level, this policy won't work.

# Get the bootstrap brokers for IAM access control
<a name="get-bootstrap-brokers-for-iam"></a>

See [Get the bootstrap brokers for an Amazon MSK cluster](msk-get-bootstrap-brokers.md).

# Semantics of IAM authorization policy actions and resources
<a name="kafka-actions"></a>

**Note**  
For clusters running Apache Kafka version 3.8 or later, IAM access control supports the WriteTxnMarkers API for terminating transactions. For clusters running Kafka versions earlier than 3.8, IAM access control doesn't support internal cluster actions including WriteTxnMarkers. For these earlier versions, to terminate transactions, use SCRAM or mTLS authentication with appropriate ACLs instead of IAM authentication.

This section explains the semantics of the action and resource elements that you can use in an IAM authorization policy. For an example policy, see [Create authorization policies for the IAM role](create-iam-access-control-policies.md).

## Authorization policy actions
<a name="actions"></a>

The following table lists the actions that you can include in an authorization policy when you use IAM access control for Amazon MSK. When you include in your authorization policy an action from the *Action* column of the table, you must also include the corresponding actions from the *Required actions* column. 


| Action | Description | Required actions | Required resources | Applicable to serverless clusters | 
| --- | --- | --- | --- | --- | 
| kafka-cluster:Connect | Grants permission to connect and authenticate to the cluster. | None | cluster | Yes | 
| kafka-cluster:DescribeCluster | Grants permission to describe various aspects of the cluster, equivalent to Apache Kafka's DESCRIBE CLUSTER ACL. |  `kafka-cluster:Connect`  | cluster | Yes | 
| kafka-cluster:AlterCluster | Grants permission to alter various aspects of the cluster, equivalent to Apache Kafka's ALTER CLUSTER ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeCluster`  | cluster | No | 
| kafka-cluster:DescribeClusterDynamicConfiguration | Grants permission to describe the dynamic configuration of a cluster, equivalent to Apache Kafka's DESCRIBE\$1CONFIGS CLUSTER ACL. |  `kafka-cluster:Connect`  | cluster | No | 
| kafka-cluster:AlterClusterDynamicConfiguration | Grants permission to alter the dynamic configuration of a cluster, equivalent to Apache Kafka's ALTER\$1CONFIGS CLUSTER ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration`  | cluster | No | 
| kafka-cluster:WriteDataIdempotently | Grants permission to write data idempotently on a cluster, equivalent to Apache Kafka's IDEMPOTENT\$1WRITE CLUSTER ACL. |  `kafka-cluster:Connect` `kafka-cluster:WriteData`  | cluster | Yes | 
| kafka-cluster:CreateTopic | Grants permission to create topics on a cluster, equivalent to Apache Kafka's CREATE CLUSTER/TOPIC ACL. |  `kafka-cluster:Connect`  | topic | Yes | 
| kafka-cluster:DescribeTopic | Grants permission to describe topics on a cluster, equivalent to Apache Kafka's DESCRIBE TOPIC ACL. |  `kafka-cluster:Connect`  | topic | Yes | 
| kafka-cluster:AlterTopic | Grants permission to alter topics on a cluster, equivalent to Apache Kafka's ALTER TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | Yes | 
| kafka-cluster:DeleteTopic | Grants permission to delete topics on a cluster, equivalent to Apache Kafka's DELETE TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | Yes | 
| kafka-cluster:DescribeTopicDynamicConfiguration | Grants permission to describe the dynamic configuration of topics on a cluster, equivalent to Apache Kafka's DESCRIBE\$1CONFIGS TOPIC ACL. |  `kafka-cluster:Connect`  | topic | Yes | 
| kafka-cluster:AlterTopicDynamicConfiguration | Grants permission to alter the dynamic configuration of topics on a cluster, equivalent to Apache Kafka's ALTER\$1CONFIGS TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration`  | topic | Yes | 
| kafka-cluster:ReadData | Grants permission to read data from topics on a cluster, equivalent to Apache Kafka's READ TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:AlterGroup`  | topic | Yes | 
| kafka-cluster:WriteData | Grants permission to write data to topics on a cluster, equivalent to Apache Kafka's WRITE TOPIC ACL |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | Yes | 
| kafka-cluster:DescribeGroup | Grants permission to describe groups on a cluster, equivalent to Apache Kafka's DESCRIBE GROUP ACL. |  `kafka-cluster:Connect`  | group | Yes | 
| kafka-cluster:AlterGroup | Grants permission to join groups on a cluster, equivalent to Apache Kafka's READ GROUP ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeGroup`  | group | Yes | 
| kafka-cluster:DeleteGroup | Grants permission to delete groups on a cluster, equivalent to Apache Kafka's DELETE GROUP ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeGroup`  | group | Yes | 
| kafka-cluster:DescribeTransactionalId | Grants permission to describe transactional IDs on a cluster, equivalent to Apache Kafka's DESCRIBE TRANSACTIONAL\$1ID ACL. |  `kafka-cluster:Connect`  | transactional-id | Yes | 
| kafka-cluster:AlterTransactionalId | Grants permission to alter transactional IDs on a cluster, equivalent to Apache Kafka's WRITE TRANSACTIONAL\$1ID ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTransactionalId` `kafka-cluster:WriteData`  | transactional-id | Yes | 

You can use the asterisk (\$1) wildcard any number of times in an action after the colon. The following are examples.
+ `kafka-cluster:*Topic` stands for `kafka-cluster:CreateTopic`, `kafka-cluster:DescribeTopic`, `kafka-cluster:AlterTopic`, and `kafka-cluster:DeleteTopic`. It doesn't include `kafka-cluster:DescribeTopicDynamicConfiguration` or `kafka-cluster:AlterTopicDynamicConfiguration`.
+ `kafka-cluster:*` stands for all permissions.

## Authorization policy resources
<a name="msk-iam-resources"></a>

The following table shows the four types of resources that you can use in an authorization policy when you use IAM access control for Amazon MSK. You can get the cluster Amazon Resource Name (ARN) from the AWS Management Console or by using the [DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster) API or the [describe-cluster](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/describe-cluster.html) AWS CLI command. You can then use the cluster ARN to construct topic, group, and transactional ID ARNs. To specify a resource in an authorization policy, use that resource's ARN.


| Resource | ARN format | 
| --- | --- | 
| Cluster | arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid | 
| Topic | arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name | 
| Group | arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/group-name | 
| Transactional ID | arn:aws:kafka:region:account-id:transactional-id/cluster-name/cluster-uuid/transactional-id | 

You can use the asterisk (\$1) wildcard any number of times anywhere in the part of the ARN that comes after `:cluster/`, `:topic/`, `:group/`, and `:transactional-id/`. The following are some examples of how you can use the asterisk (\$1) wildcard to refer to multiple resources:
+ `arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*`: all the topics in any cluster named MyTestCluster, regardless of the cluster's UUID.
+ `arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*_test`: all topics whose name ends with "\$1test" in the cluster whose name is MyTestCluster and whose UUID is abcd1234-0123-abcd-5678-1234abcd-1.
+ `arn:aws:kafka:us-east-1:0123456789012:transactional-id/MyTestCluster/*/5555abcd-1111-abcd-1234-abcd1234-1`: all transactions whose transactional ID is 5555abcd-1111-abcd-1234-abcd1234-1, across all incarnations of a cluster named MyTestCluster in your account. This means that if you create a cluster named MyTestCluster, then delete it, and then create another cluster by the same name, you can use this resource ARN to represent the same transactions ID on both clusters. However, the deleted cluster isn't accessible.

# Common use cases for client authorization policy
<a name="iam-access-control-use-cases"></a>

The first column in the following table shows some common use cases. To authorize a client to carry out a given use case, include the required actions for that use case in the client's authorization policy, and set `Effect` to `Allow`.

For information about all the actions that are part of IAM access control for Amazon MSK, see [Semantics of IAM authorization policy actions and resources](kafka-actions.md).

**Note**  
Actions are denied by default. You must explicitly allow every action that you want to authorize the client to perform.


****  

| Use case | Required actions | 
| --- | --- | 
| Admin |  `kafka-cluster:*`  | 
| Create a topic |  `kafka-cluster:Connect` `kafka-cluster:CreateTopic`  | 
| Produce data |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData`  | 
| Consume data |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:DescribeGroup` `kafka-cluster:AlterGroup` `kafka-cluster:ReadData`  | 
| Produce data idempotently |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData` `kafka-cluster:WriteDataIdempotently`  | 
| Produce data transactionally |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData` `kafka-cluster:DescribeTransactionalId` `kafka-cluster:AlterTransactionalId`  | 
| Describe the configuration of a cluster |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration`  | 
| Update the configuration of a cluster |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration` `kafka-cluster:AlterClusterDynamicConfiguration`  | 
| Describe the configuration of a topic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration` | 
| Update the configuration of a topic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration` `kafka-cluster:AlterTopicDynamicConfiguration`  | 
| Alter a topic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:AlterTopic`  | 

# Mutual TLS client authentication for Amazon MSK
<a name="msk-authentication"></a>

You can enable client authentication with TLS for connections from your applications to your Amazon MSK brokers. To use client authentication, you need an AWS Private CA. The AWS Private CA can be either in the same AWS account as your cluster, or in a different account. For information about AWS Private CAs, see [Creating and Managing a AWS Private CA](https://docs.aws.amazon.com/acm-pca/latest/userguide/create-CA.html).

Amazon MSK doesn't support certificate revocation lists (CRLs). To control access to your cluster topics or block compromised certificates, use Apache Kafka ACLs and AWS security groups. For information about using Apache Kafka ACLs, see [Apache Kafka ACLs](msk-acls.md).

**Topics**
+ [Create a Amazon MSK cluster that supports client authentication](msk-authentication-cluster.md)
+ [Set up a client to use authentication](msk-authentication-client.md)
+ [Produce and consume messages using authentication](msk-authentication-messages.md)

# Create a Amazon MSK cluster that supports client authentication
<a name="msk-authentication-cluster"></a>

This procedure shows you how to enable client authentication using a AWS Private CA.
**Note**  
We highly recommend using independent AWS Private CA for each MSK cluster when you use mutual TLS to control access. Doing so will ensure that TLS certificates signed by PCAs only authenticate with a single MSK cluster.

1. Create a file named `clientauthinfo.json` with the following contents. Replace *Private-CA-ARN* with the ARN of your PCA.

   ```
   {
      "Tls": {
          "CertificateAuthorityArnList": ["Private-CA-ARN"]
       }
   }
   ```

1. Create a file named `brokernodegroupinfo.json` as described in [Create a provisioned Amazon MSK cluster using the AWS CLI](create-cluster-cli.md).

1. Client authentication requires that you also enable encryption in transit between clients and brokers. Create a file named `encryptioninfo.json` with the following contents. Replace *KMS-Key-ARN* with the ARN of your KMS key. You can set `ClientBroker` to `TLS` or `TLS_PLAINTEXT`.

   ```
   {
      "EncryptionAtRest": {
          "DataVolumeKMSKeyId": "KMS-Key-ARN"
       },
      "EncryptionInTransit": {
           "InCluster": true,
           "ClientBroker": "TLS"
       }
   }
   ```

   For more information about encryption, see [Amazon MSK encryption](msk-encryption.md).

1. On a machine where you have the AWS CLI installed, run the following command to create a cluster with authentication and in-transit encryption enabled. Save the cluster ARN provided in the response.

   ```
   aws kafka create-cluster --cluster-name "AuthenticationTest" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --client-authentication file://clientauthinfo.json --kafka-version "{YOUR KAFKA VERSION}" --number-of-broker-nodes 3
   ```

# Set up a client to use authentication
<a name="msk-authentication-client"></a>

This process describes how to set up an Amazon EC2 instance to use as a client to use authentication.

This process describes how to produce and consume messages using authentication by creating a client machine, creating a topic, and configuring the required security settings.

1. Create an Amazon EC2 instance to use as a client machine. For simplicity, create this instance in the same VPC you used for the cluster. See [Step 3: Create a client machine](create-client-machine.md) for an example of how to create such a client machine.

1. Create a topic. For an example, see the instructions under [Step 4: Create a topic in the Amazon MSK cluster](create-topic.md).

1. On a machine where you have the AWS CLI installed, run the following command to get the bootstrap brokers of the cluster. Replace *Cluster-ARN* with the ARN of your cluster.

   ```
   aws kafka get-bootstrap-brokers --cluster-arn Cluster-ARN
   ```

   Save the string associated with `BootstrapBrokerStringTls` in the response.

1. On your client machine, run the following command to use the JVM trust store to create your client trust store. If your JVM path is different, adjust the command accordingly.

   ```
   cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64/jre/lib/security/cacerts kafka.client.truststore.jks
   ```

1. On your client machine, run the following command to create a private key for your client. Replace *Distinguished-Name*, *Example-Alias*, *Your-Store-Pass*, and *Your-Key-Pass* with strings of your choice.

   ```
   keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass Your-Store-Pass -keypass Your-Key-Pass -dname "CN=Distinguished-Name" -alias Example-Alias -storetype pkcs12 -keyalg rsa
   ```

1. On your client machine, run the following command to create a certificate request with the private key you created in the previous step.

   ```
   keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass
   ```

1. Open the `client-cert-sign-request` file and ensure that it starts with `-----BEGIN CERTIFICATE REQUEST-----` and ends with `-----END CERTIFICATE REQUEST-----`. If it starts with `-----BEGIN NEW CERTIFICATE REQUEST-----`, delete the word `NEW` (and the single space that follows it) from the beginning and the end of the file.

1. On a machine where you have the AWS CLI installed, run the following command to sign your certificate request. Replace *Private-CA-ARN* with the ARN of your PCA. You can change the validity value if you want. Here we use 300 as an example.

   ```
   aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS"
   ```

   Save the certificate ARN provided in the response.
**Note**  
To retrieve your client certificate, use the `acm-pca get-certificate` command and specify your certificate ARN. For more information, see [get-certificate](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/acm-pca/get-certificate.html) in the *AWS CLI Command Reference*.

1. Run the following command to get the certificate that AWS Private CA signed for you. Replace *Certificate-ARN* with the ARN you obtained from the response to the previous command.

   ```
   aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN
   ```

1. From the JSON result of running the previous command, copy the strings associated with `Certificate` and `CertificateChain`. Paste these two strings in a new file named signed-certificate-from-acm. Paste the string associated with `Certificate` first, followed by the string associated with `CertificateChain`. Replace the `\n` characters with new lines. The following is the structure of the file after you paste the certificate and certificate chain in it.

   ```
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   ```

1. Run the following command on the client machine to add this certificate to your keystore so you can present it when you talk to the MSK brokers.

   ```
   keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass
   ```

1. Create a file named `client.properties` with the following contents. Adjust the truststore and keystore locations to the paths where you saved `kafka.client.truststore.jks`. Substitute your Kafka client version for the *\$1YOUR KAFKA VERSION\$1* placeholders.

   ```
   security.protocol=SSL
   ssl.truststore.location=/tmp/kafka_2.12-{YOUR KAFKA VERSION}/kafka.client.truststore.jks
   ssl.keystore.location=/tmp/kafka_2.12-{YOUR KAFKA VERSION}/kafka.client.keystore.jks
   ssl.keystore.password=Your-Store-Pass
   ssl.key.password=Your-Key-Pass
   ```

# Produce and consume messages using authentication
<a name="msk-authentication-messages"></a>

This process describes how to produce and consume messages using authentication.

1. Run the following command to create a topic. The file named `client.properties` is the one you created in the previous procedure.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapBroker-String --replication-factor 3 --partitions 1 --topic ExampleTopic --command-config client.properties
   ```

1. Run the following command to start a console producer. The file named `client.properties` is the one you created in the previous procedure.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --bootstrap-server BootstrapBroker-String --topic ExampleTopic --producer.config client.properties
   ```

1. In a new command window on your client machine, run the following command to start a console consumer.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBroker-String --topic ExampleTopic --consumer.config client.properties
   ```

1. Type messages in the producer window and watch them appear in the consumer window.

# Sign-in credentials authentication with AWS Secrets Manager
<a name="msk-password"></a>

You can control access to your Amazon MSK clusters using sign-in credentials that are stored and secured using AWS Secrets Manager. Storing user credentials in Secrets Manager reduces the overhead of cluster authentication such as auditing, updating, and rotating credentials. Secrets Manager also lets you share user credentials across clusters.

After you associate a secret with an MSK cluster, MSK syncs the credential data periodically.

**Topics**
+ [How sign-in credentials authentication works](msk-password-howitworks.md)
+ [Set up SASL/SCRAM authentication for an Amazon MSK cluster](msk-password-tutorial.md)
+ [Working with users](msk-password-users.md)
+ [Limitations when using SCRAM secrets](msk-password-limitations.md)

# How sign-in credentials authentication works
<a name="msk-password-howitworks"></a>

Sign-in credentials authentication for Amazon MSK uses SASL/SCRAM (Simple Authentication and Security Layer/ Salted Challenge Response Mechanism) authentication. To set up sign-in credentials authentication for a cluster, you create a Secret resource in [AWS Secrets Manager](https://docs.aws.amazon.com//secretsmanager/?id=docs_gateway), and associate sign-in credentials with that secret. 

SASL/SCRAM is defined in [RFC 5802](https://tools.ietf.org/html/rfc5802). SCRAM uses secured hashing algorithms, and does not transmit plaintext sign-in credentials between client and server. 

**Note**  
When you set up SASL/SCRAM authentication for your cluster, Amazon MSK turns on TLS encryption for all traffic between clients and brokers.

# Set up SASL/SCRAM authentication for an Amazon MSK cluster
<a name="msk-password-tutorial"></a>

To set up a secret in AWS Secrets Manager, follow the [Creating and Retrieving a Secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/tutorials_basic.html) tutorial in the [AWS Secrets Manager User Guide](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html).

Note the following requirements when creating a secret for an Amazon MSK cluster:
+ Choose **Other type of secrets (e.g. API key)** for the secret type.
+ Your secret name must begin with the prefix **AmazonMSK\$1**.
+ You must either use an existing custom AWS KMS key or create a new custom AWS KMS key for your secret. Secrets Manager uses the default AWS KMS key for a secret by default. 
**Important**  
A secret created with the default AWS KMS key cannot be used with an Amazon MSK cluster.
+ Your sign-in credential data must be in the following format to enter key-value pairs using the **Plaintext** option.

  ```
  {
    "username": "alice",
    "password": "alice-secret"
  }
  ```
+ Record the ARN (Amazon Resource Name) value for your secret. 
+ 
**Important**  
You can't associate a Secrets Manager secret with a cluster that exceeds the limits described in [Right-size your cluster: Number of partitions per Standard broker](bestpractices.md#partitions-per-broker).
+ If you use the AWS CLI to create the secret, specify a key ID or ARN for the `kms-key-id` parameter. Don't specify an alias.
+ To associate the secret with your cluster, use either the Amazon MSK console, or the [ BatchAssociateScramSecret](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html#BatchAssociateScramSecret) operation. 
**Important**  
When you associate a secret with a cluster, Amazon MSK attaches a resource policy to the secret that allows your cluster to access and read the secret values that you defined. You should not modify this resource policy. Doing so can prevent your cluster from accessing your secret. If you make any changes to the Secrets resource policy and/ or the KMS key used for secret encryption, make sure you re-associate the secrets to your MSK cluster. This will make sure that your cluster can continue accessing your secret.

  The following example JSON input for the `BatchAssociateScramSecret` operation associates a secret with a cluster:

  ```
  {
    "clusterArn" : "arn:aws:kafka:us-west-2:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",          
    "secretArnList": [
      "arn:aws:secretsmanager:us-west-2:0123456789019:secret:AmazonMSK_MyClusterSecret"
    ]
  }
  ```

# Connecting to your cluster with sign-in credentials
<a name="msk-password-tutorial-connect"></a>

After you create a secret and associate it with your cluster, you can connect your client to the cluster. The following procedure demonstrates how to connect a client to a cluster that uses SASL/SCRAM authentication. It also shows how to produce to and consume from an example topic.

**Topics**
+ [Connecting a client to cluster using SASL/SCRAM authentication](#w2aab9c13c29c17c13c11b9b7)
+ [Troubleshooting connection issues](#msk-password-tutorial-connect-troubleshooting)

## Connecting a client to cluster using SASL/SCRAM authentication
<a name="w2aab9c13c29c17c13c11b9b7"></a>

1. Run the following command on a machine that has AWS CLI installed. Replace *clusterARN* with the ARN of your cluster.

   ```
   aws kafka get-bootstrap-brokers --cluster-arn clusterARN
   ```

   From the JSON result of this command, save the value associated with the string named `BootstrapBrokerStringSaslScram`. You'll use this value in later steps.

1. On your client machine, create a JAAS configuration file that contains the user credentials stored in your secret. For example, for the user **alice**, create a file called `users_jaas.conf` with the following content.

   ```
   KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="alice"
      password="alice-secret";
   };
   ```

1. Use the following command to export your JAAS config file as a `KAFKA_OPTS` environment parameter.

   ```
   export KAFKA_OPTS=-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf
   ```

1. Create a file named `kafka.client.truststore.jks` in a `/tmp` directory.

1. (Optional) Use the following command to copy the JDK key store file from your JVM `cacerts` folder into the `kafka.client.truststore.jks` file that you created in the previous step. Replace *JDKFolder* with the name of the JDK folder on your instance. For example, your JDK folder might be named `java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64`.

   ```
   cp /usr/lib/jvm/JDKFolder/lib/security/cacerts /tmp/kafka.client.truststore.jks
   ```

1. In the `bin` directory of your Apache Kafka installation, create a client properties file called `client_sasl.properties` with the following contents. This file defines the SASL mechanism and protocol.

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=SCRAM-SHA-512
   ```

1. To create an example topic, run the following command. Replace *BootstrapBrokerStringSaslScram* with the bootstrap broker string that you obtained in step 1 of this topic.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapBrokerStringSaslScram --command-config <path-to-client-properties>/client_sasl.properties --replication-factor 3 --partitions 1 --topic ExampleTopicName
   ```

1. To produce to the example topic that you created, run the following command on your client machine. Replace *BootstrapBrokerStringSaslScram* with the bootstrap broker string that you retrieved in step 1 of this topic.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerStringSaslScram --topic ExampleTopicName --producer.config client_sasl.properties
   ```

1. To consume from the topic you created, run the following command on your client machine. Replace *BootstrapBrokerStringSaslScram* with the bootstrap broker string that you obtained in step 1 of this topic.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --from-beginning --consumer.config client_sasl.properties
   ```

## Troubleshooting connection issues
<a name="msk-password-tutorial-connect-troubleshooting"></a>

When running Kafka client commands, you might encounter Java heap memory errors, especially when working with large topics or datasets. These errors occur because Kafka tools run as Java applications with default memory settings that might be insufficient for your workload.

To resolve `Out of Memory Java Heap` errors, you can increase the Java heap size by modifying the `KAFKA_OPTS` environment variable to include memory settings.

The following example sets the maximum heap size to 1GB (`-Xmx1G`). You can adjust this value based on your available system memory and requirements.

```
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf -Xmx1G"
```

For consuming large topics, consider using time-based or offset-based parameters instead of `--from-beginning` to limit memory usage:

```
<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --max-messages 1000 --consumer.config client_sasl.properties
```

# Working with users
<a name="msk-password-users"></a>

**Creating users:** You create users in your secret as key-value pairs. When you use the **Plaintext** option in the Secrets Manager console, you should specify sign-in credential data in the following format.

```
{
  "username": "alice",
  "password": "alice-secret"
}
```

**Revoking user access:** To revoke a user's credentials to access a cluster, we recommend that you first remove or enforce an ACL on the cluster, and then disassociate the secret. This is because of the following:
+ Removing a user does not close existing connections.
+ Changes to your secret take up to 10 minutes to propagate.

For information about using an ACL with Amazon MSK, see [Apache Kafka ACLs](msk-acls.md).

For clusters using ZooKeeper mode, we recommend that you restrict access to your ZooKeeper nodes to prevent users from modifying ACLs. For more information, see [Control access to Apache ZooKeeper nodes in your Amazon MSK cluster](zookeeper-security.md).

# Limitations when using SCRAM secrets
<a name="msk-password-limitations"></a>

Note the following limitations when using SCRAM secrets:
+ Amazon MSK only supports SCRAM-SHA-512 authentication.
+ An Amazon MSK cluster can have up to 1000 users.
+ You must use an AWS KMS key with your Secret. You cannot use a Secret that uses the default Secrets Manager encryption key with Amazon MSK. For information about creating a KMS key, see [Creating symmetric encryption KMS keys](https://docs.aws.amazon.com/kms/latest/developerguide/create-keys.html#create-symmetric-cmk).
+ You can't use an asymmetric KMS key with Secrets Manager.
+ You can associate up to 10 secrets with a cluster at a time using the [ BatchAssociateScramSecret](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html#BatchAssociateScramSecret) operation.
+ The name of secrets associated with an Amazon MSK cluster must have the prefix **AmazonMSK\$1**.
+ Secrets associated with an Amazon MSK cluster must be in the same Amazon Web Services account and AWS region as the cluster.

# Apache Kafka ACLs
<a name="msk-acls"></a>

Apache Kafka has a pluggable authorizer and ships with an out-of-box authorizer implementation. Amazon MSK enables this authorizer in the `server.properties` file on the brokers.

Apache Kafka ACLs have the format "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". If RP doesn't match a specific resource R, then R has no associated ACLs, and therefore no one other than super users is allowed to access R. To change this Apache Kafka behavior, you set the property `allow.everyone.if.no.acl.found` to true. Amazon MSK sets it to true by default. This means that with Amazon MSK clusters, if you don't explicitly set ACLs on a resource, all principals can access this resource. If you enable ACLs on a resource, only the authorized principals can access it. If you want to restrict access to a topic and authorize a client using TLS mutual authentication, add ACLs using the Apache Kafka authorizer CLI. For more information about adding, removing, and listing ACLs, see [Kafka Authorization Command Line Interface](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Authorization+Command+Line+Interface).

Because Amazon MSK configures brokers as super users, they can access all topics. This helps the brokers to replicate messages from the primary partition whether or not the `allow.everyone.if.no.acl.found` property is defined for the cluster's configuration.

**To add or remove read and write access to a topic**

1. Add your brokers to the ACL table to allow them to read from all topics that have ACLs in place. To grant your brokers read access to a topic, run the following command on a client machine that can communicate with the MSK cluster. 

   Replace *Distinguished-Name* with the DNS of any of your cluster's bootstrap brokers, then replace the string before the first period in this distinguished name by an asterisk (`*`). For example, if one of your cluster's bootstrap brokers has the DNS `b-6.mytestcluster.67281x.c4.kafka.us-east-1.amazonaws.com`, replace *Distinguished-Name* in the following command with `*.mytestcluster.67281x.c4.kafka.us-east-1.amazonaws.com`. For information on how to get the bootstrap brokers, see [Get the bootstrap brokers for an Amazon MSK cluster](msk-get-bootstrap-brokers.md).

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Read --group=* --topic Topic-Name
   ```

1. To grant a client application read access to a topic, run the following command on your client machine. If you use mutual TLS authentication, use the same *Distinguished-Name* you used when you created the private key.

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Read --group=* --topic Topic-Name
   ```

   To remove read access, you can run the same command, replacing `--add` with `--remove`.

1. To grant write access to a topic, run the following command on your client machine. If you use mutual TLS authentication, use the same *Distinguished-Name* you used when you created the private key.

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Write --topic Topic-Name
   ```

   To remove write access, you can run the same command, replacing `--add` with `--remove`.