

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Apache Kafka 的身份验证和授权 APIs
<a name="kafka_apis_iam"></a>

您可以使用 IAM 对客户端进行身份验证并允许或拒绝 Apache Kafka 操作。或者，您可以使用 TLS 或对客户端 SASL/SCRAM 进行身份验证，使用 Apache Kafka ACLs 来允许或拒绝操作。

有关如何控制谁可以在您的集群上执行 [Amazon MSK 操作](https://docs.aws.amazon.com/msk/1.0/apireference/operations.html)的信息，请参阅[亚马逊 MSK 的身份验证和授权 APIs](security-iam.md)。

**Topics**
+ [IAM 访问控制](iam-access-control.md)
+ [Amazon MSK 的双向 TLS 客户端身份验证](msk-authentication.md)
+ [使用 S AWS ecrets Manager 进行登录凭据身份验证](msk-password.md)
+ [阿帕奇 Kafka ACLs](msk-acls.md)

# IAM 访问控制
<a name="iam-access-control"></a>

Amazon MSK 的 IAM 访问控制让您能够处理 MSK 集群的身份验证和授权。这样就不需要使用一种身份验证机制和另一种授权机制。例如，当客户端尝试写入您的集群时，Amazon MSK 使用 IAM 来检查该客户端是否是经过身份验证的身份，以及是否有权向您的集群生成数据。

IAM 访问控制适用于 Java 和非 Java 客户端，包括用 Python、 JavaScript Go 和.NET 编写的 Kafka 客户端。非 Java 客户端的 IAM 访问控制适用于 Kafka 版本 2.7.1 或更高版本的 MSK 集群。

为了能够进行 IAM 访问控制，Amazon MSK 对 Apache Kafka 源代码进行了少许修改。这些修改不会给您的 Apache Kafka 体验造成明显的影响。Amazon MSK 会记录访问事件，以方便您进行审计。

您可以 APIs 为使用 IAM 访问控制的 MSK 集群调用 Apache Kafka ACL。但是，Apache Kafka 对 IAM 身份的授权 ACLs 没有影响。您必须将 IAM 策略用于 IAM 身份的访问控制。

**重要注意事项**  
对 MSK 集群使用 IAM 访问控制时，请记住以下重要注意事项：  
IAM 访问控制不适用于 Apache ZooKeeper 节点。有关如何控制对这些节点的访问权限的信息，请参阅 [控制对亚马逊 MSK ZooKeeper 集群中 Apache 节点的访问权限](zookeeper-security.md)。
如果您的集群使用 IAM 访问控制，则 `allow.everyone.if.no.acl.found` Apache Kafka 设置无效。
您可以 APIs 为使用 IAM 访问控制的 MSK 集群调用 Apache Kafka ACL。但是，Apache Kafka 对 IAM 身份的授权 ACLs 没有影响。您必须将 IAM 策略用于 IAM 身份的访问控制。

# Amazon MSK 的 IAM 访问控制的工作原理
<a name="how-to-use-iam-access-control"></a>

要对 Amazon MSK 使用 IAM 访问控制，请执行以下步骤，以下主题将详细介绍这些步骤：
+ [创建使用 IAM 访问控制的 Amazon MSK 集群](create-iam-access-control-cluster-in-console.md) 
+ [配置客户端以进行 IAM 访问控制](configure-clients-for-iam-access-control.md)
+ [为 IAM 角色创建授权策略](create-iam-access-control-policies.md)
+ [获取用于 IAM 访问控制的引导代理](get-bootstrap-brokers-for-iam.md)

# 创建使用 IAM 访问控制的 Amazon MSK 集群
<a name="create-iam-access-control-cluster-in-console"></a>

本节介绍如何使用 AWS 管理控制台、API 或创建使用 IAM 访问控制的 Amazon MSK 集群。 AWS CLI 有关如何为现有集群开启 IAM 访问控制的信息，请参阅[更新 Amazon MSK 集群的安全设置](msk-update-security.md)。

**使用创建 AWS 管理控制台 使用 IAM 访问控制的集群**

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 选择**创建集群**。

1. 选择**使用自定义设置创建集群**。

1. 在**身份验证**部分中，选择 **IAM 访问控制**。

1. 完成创建集群的其余工作流程。

**使用 API 或创建 AWS CLI 使用 IAM 访问控制的集群**
+ 要创建启用了 IAM 访问控制的集群，请使用 [CreateCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters.html#CreateCluster)API 或 [create-cluster CLI](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/create-cluster.html) 命令，并传递以下 JSON 作为`ClientAuthentication`参数：。`"ClientAuthentication": { "Sasl": { "Iam": { "Enabled": true } }`

# 配置客户端以进行 IAM 访问控制
<a name="configure-clients-for-iam-access-control"></a>

要让客户端能够与使用 IAM 访问控制的 MSK 集群通信，您可以使用以下任何一种机制：
+ 使用 SASL\$1OAUTHBEARER机制进行非 Java 客户端配置
+ 使用 SASL\$1OAUTHBEARER机制或 AWS\$1MSK\$1IAM 机制配置 Java 客户端

## 使用该 SASL\$1OAUTHBEARER机制配置 IAM
<a name="configure-clients-for-iam-access-control-sasl-oauthbearer"></a>

1. 使用以下 Python Kafka 客户端示例，编辑 client.properties 配置文件。其他语言的配置更改与之类似。

   ```
   from kafka import KafkaProducer
   from kafka.errors import KafkaError
   from kafka.sasl.oauth import AbstractTokenProvider
   import socket
   import time
   from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
   
   class MSKTokenProvider():
       def token(self):
           token, _ = MSKAuthTokenProvider.generate_auth_token('<my AWS 区域>')
           return token
   
   tp = MSKTokenProvider()
   
   producer = KafkaProducer(
       bootstrap_servers='<myBootstrapString>',
       security_protocol='SASL_SSL',
       sasl_mechanism='OAUTHBEARER',
       sasl_oauth_token_provider=tp,
       client_id=socket.gethostname(),
   )
   
   topic = "<my-topic>"
   while True:
       try:
           inp=input(">")
           producer.send(topic, inp.encode())
           producer.flush()
           print("Produced!")
       except Exception:
           print("Failed to send message:", e)
   
   producer.close()
   ```

1. 下载所选配置语言的帮助程序库，然后按照该语言库主页的“*开始使用*”部分中的说明进行操作。
   + JavaScript: [https://github.com/aws/aws-msk-iam-sasl-signer-js \$1getting-star](https://github.com/aws/aws-msk-iam-sasl-signer-js#getting-started) ted
   + Python：[https://github.com/aws/aws-msk-iam-sasl-signer-python \$1get-](https://github.com/aws/aws-msk-iam-sasl-signer-python#get-started) 已启动
   + Go: [https://github.com/aws/aws-msk-iam-sasl-signer-](https://github.com/aws/aws-msk-iam-sasl-signer-go#getting-started) go \$1getting-started
   + .NET: [https://github.com/aws/aws-msk-iam-sasl-signer-net \$1getting-](https://github.com/aws/aws-msk-iam-sasl-signer-net#getting-started) 已启动
   + JAVA：可通过 ja [https://github.com/aws/aws-msk-iam-auth/releases](https://github.com/aws/aws-msk-iam-auth/releases)r 文件获得对 Java 的 SASL\$1OAUTHBEARER支持

## 使用 MSK 自定义 AWS\$1MSK\$1IAM 机制配置 IAM
<a name="configure-clients-for-iam-access-control-msk-iam"></a>

1. 将以下内容添加到 `client.properties` 文件中。*<PATH\$1TO\$1TRUST\$1STORE\$1FILE>*替换为客户端上信任存储文件的完全限定路径。
**注意**  
如果您不想使用特定证书，可以从 `client.properties` 文件中删除 `ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>`。如果您不指定 `ssl.truststore.location` 的值，Java 进程将使用默认证书。

   ```
   ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
   sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
   ```

   要使用您为 AWS 凭证创建的命名配置文件，请将其包含`awsProfileName="your profile name";`在您的客户端配置文件中。有关命名配置文件的信息，请参阅文档中的[命名配置](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) AWS CLI 文件。

1. 下载最新的稳定版 [aws-msk-iam-auth](https://github.com/aws/aws-msk-iam-auth/releases)JAR 文件，并将其放在类路径中。如果您使用 Maven，请添加以下依赖项，并根据需要调整版本号：

   ```
   <dependency>
       <groupId>software.amazon.msk</groupId>
       <artifactId>aws-msk-iam-auth</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

Amazon MSK 客户端插件在 Apache 2.0 许可证下是开源的。

# 为 IAM 角色创建授权策略
<a name="create-iam-access-control-policies"></a>

将授权策略附加到与客户端对应的 IAM 角色。在授权策略中，您可以指定角色允许或拒绝哪些操作。如果您的客户端位于 Amazon EC2 实例上，请将授权策略与该 Amazon EC2 实例的 IAM 角色关联。或者，您也可以将客户端配置为使用命名配置文件，然后将授权策略与该命名配置文件的角色关联。[配置客户端以进行 IAM 访问控制](configure-clients-for-iam-access-control.md) 介绍如何将客户端配置为使用命名配置文件。

有关如何创建 IAM policy 的信息，请参阅[创建 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_create.html)。

以下是名为的集群的授权策略示例 MyTestCluster。要了解 `Action` 和 `Resource` 元素的语义，请参阅 [IAM 授权策略、操作和资源的语义](kafka-actions.md)。

**重要**  
您对 IAM 策略所做的更改会 AWS CLI 立即反映在 IAM APIs 中。但是，策略更改可能需要很长时间才能生效。在大多数情况下，策略更改会在不到一分钟的时间内生效。网络状况有时可能会增加延迟时间。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}
```

------

要了解如何创建包含与常见 Apache Kafka 用例（例如创建和使用数据）对应的操作元素的策略，请参阅[客户端授权策略的常见使用场景](iam-access-control-use-cases.md)。

[对于 Kafka 版本 2.8.0 及更高版本，该**WriteDataIdempotently**权限已被弃用 (KIP-679)。](https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default)默认情况下，设置了 `enable.idempotence = true`。因此，对于 Kafka 版本 2.8.0 及更高版本，IAM 不提供与 Kafka 相同的功能。 ACLs仅提供 `WriteData` 对某个主题的访问权限，无法 `WriteDataIdempotently` 到该主题。这不会影响将 `WriteData` 提供给**所有**主题的情况。在这种情况下，允许 `WriteDataIdempotently`。这是由于 IAM 逻辑的实现和 Kafka ACLs 的实现方式存在差异。此外，以幂等方式写入一个主题也需要访问 `transactional-ids`。

要解决这个问题，建议使用类似于以下策略的策略：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:WriteDataIdempotently"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/TestTopic",
                "arn:aws:kafka:us-east-1:123456789012:transactional-id/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*"
            ]
        }
    ]
}
```

------

在这种情况下，`WriteData` 允许写入 `TestTopic`，而 `WriteDataIdempotently` 允许对集群进行幂等性写入。该策略还增加了对未来所需的 `transactional-id` 资源的访问权限。

由于 `WriteDataIdempotently` 是集群级权限，因此无法主题级别使用。如果 `WriteDataIdempotently` 仅限于主题级别，则此策略将不起作用。

# 获取用于 IAM 访问控制的引导代理
<a name="get-bootstrap-brokers-for-iam"></a>

请参阅[获取 Amazon MSK 集群的引导代理](msk-get-bootstrap-brokers.md)。

# IAM 授权策略、操作和资源的语义
<a name="kafka-actions"></a>

**注意**  
对于运行 Apache Kafka 3.8 或更高版本的集群，IAM 访问控制支持用于终止 WriteTxnMarkers 交易的 API。对于运行 Kafka 版本低于 3.8 的集群，IAM 访问控制不支持内部集群操作，包括 WriteTxnMarkers。对于这些早期版本，要终止交易，请使用带有相应的 SCRAM 或 mTLS 身份验证， ACLs 而不是 IAM 身份验证。

本部分解释了可以在 IAM 授权策略中使用的操作和资源元素的语义。有关策略示例，请参阅 [为 IAM 角色创建授权策略](create-iam-access-control-policies.md)。

## 授权策略操作
<a name="actions"></a>

下表列出了在使用 Amazon MSK 的 IAM 访问控制时可以在授权策略中包含的操作。当您在授权策略中包含表*操作*列中的操作时，还必须包含*所需操作*列中的相应操作。


| Action | 说明 | 所需的操作 | 所需的资源 | 适用于无服务器集群 | 
| --- | --- | --- | --- | --- | 
| kafka-cluster:Connect | 授予连接和验证集群的权限。 | 无 | cluster | 是 | 
| kafka-cluster:DescribeCluster | 授予描述集群各个方面的权限，相当于 Apache Kafka 的 DESCRIBE CLUSTER ACL。 |  `kafka-cluster:Connect`  | cluster | 是 | 
| kafka-cluster:AlterCluster | 授予更改集群各个方面的权限，相当于 Apache Kafka 的 ALTER CLUSTER ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeCluster`  | cluster | 否 | 
| kafka-cluster:DescribeClusterDynamicConfiguration | 授予描述集群动态配置的权限，相当于 Apache Kafka 的 DESCRIBE\$1CONFIGS CLUSTER ACL。 |  `kafka-cluster:Connect`  | cluster | 否 | 
| kafka-cluster:AlterClusterDynamicConfiguration | 授予更改集群动态配置的权限，相当于 Apache Kafka 的 ALTER\$1CONFIGS CLUSTER ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration`  | cluster | 否 | 
| kafka-cluster:WriteDataIdempotently | 授予在集群上以幂等方式写入数据的权限，相当于 Apache Kafka 的 IDEMPOTENT\$1WRITE CLUSTER ACL。 |  `kafka-cluster:Connect` `kafka-cluster:WriteData`  | cluster | 是 | 
| kafka-cluster:CreateTopic | 授予在集群上创建主题的权限，相当于 Apache Kafka 的创建 CLUSTER/TOPIC ACL。 |  `kafka-cluster:Connect`  | topic | 是 | 
| kafka-cluster:DescribeTopic | 授予描述集群上主题的权限，相当于 Apache Kafka 的 DESCRIBE TOPIC ACL。 |  `kafka-cluster:Connect`  | topic | 是 | 
| kafka-cluster:AlterTopic | 授予更改集群上主题的权限，相当于 Apache Kafka 的 ALTER TOPIC ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | 是 | 
| kafka-cluster:DeleteTopic | 授予删除集群上主题的权限，相当于 Apache Kafka 的 DELETE TOPIC ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | 是 | 
| kafka-cluster:DescribeTopicDynamicConfiguration | 授予描述集群上主题动态配置的权限，相当于 Apache Kafka 的 DESCRIBE\$1CONFIGS TOPIC ACL。 |  `kafka-cluster:Connect`  | topic | 是 | 
| kafka-cluster:AlterTopicDynamicConfiguration | 授予更改集群上主题动态配置的权限，相当于 Apache Kafka 的 ALTER\$1CONFIGS TOPIC ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration`  | topic | 是 | 
| kafka-cluster:ReadData | 授予从集群上主题中读取数据的权限，相当于 Apache Kafka 的 READ TOPIC ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:AlterGroup`  | topic | 是 | 
| kafka-cluster:WriteData | 授予向集群上的主题写入数据的权限，相当于 Apache Kafka 的 WRITE TOPIC ACL |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | 是 | 
| kafka-cluster:DescribeGroup | 授予描述集群上群组的权限，相当于 Apache Kafka 的 DESCRIBE GROUP ACL。 |  `kafka-cluster:Connect`  | 组 | 是 | 
| kafka-cluster:AlterGroup | 授予加入集群上群组的权限，相当于 Apache Kafka 的 READ GROUP ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeGroup`  | 组 | 是 | 
| kafka-cluster:DeleteGroup | 授予删除集群上群组的权限，相当于 Apache Kafka 的 DELETE GROUP ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeGroup`  | 组 | 是 | 
| kafka-cluster:DescribeTransactionalId | 授予描述集群 IDs 上交易的权限，相当于 Apache Kafka 的 DESCRIBE TRANSACTIONAL\$1ID ACL。 |  `kafka-cluster:Connect`  | transactional-id | 是 | 
| kafka-cluster:AlterTransactionalId | 授予在集群 IDs 上更改事务的权限，相当于 Apache Kafka 的 WRITE TRANSACTIONAL\$1ID ACL。 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTransactionalId` `kafka-cluster:WriteData`  | transactional-id | 是 | 

在冒号之后的操作中，您可以任意次数地使用星号 (\$1) 通配符。示例如下。
+ `kafka-cluster:*Topic` 代表 `kafka-cluster:CreateTopic`、`kafka-cluster:DescribeTopic`、`kafka-cluster:AlterTopic` 和 `kafka-cluster:DeleteTopic`。它不包括 `kafka-cluster:DescribeTopicDynamicConfiguration` 或 `kafka-cluster:AlterTopicDynamicConfiguration`。
+ `kafka-cluster:*` 代表所有权限。

## 授权策略资源
<a name="msk-iam-resources"></a>

下表显示了在使用 Amazon MSK 的 IAM 访问控制时可在授权策略中使用的四种资源。您可以使用 [DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster)API 或 d [es](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/describe-cluster.html) AWS CLI cribe-cluster 命令从 AWS 管理控制台 或中获取集群 Amazon 资源名称 (ARN)。然后，您可以使用集群 ARN 来构造主题、群组和交易 ID。 ARNs要在授权策略中指定资源，请使用该资源的 ARN。


| 资源 | ARN 格式 | 
| --- | --- | 
| Cluster | arn: aws: kafka::: cluster//regionaccount-idcluster-namecluster-uuid | 
| Topic | arn: aws: kafka::: topic//regionaccount-idcluster-namecluster-uuidtopic-name | 
| Group | arn: aws: kafka::: group//regionaccount-idcluster-namecluster-uuidgroup-name | 
| 事务 ID | arn: aws: kafka::: transactional-id///regionaccount-idcluster-namecluster-uuidtransactional-id | 

您可以在 ARN 中 `:cluster/`、`:topic/`、`:group/` 和 `:transactional-id/` 之后的任意位置，任意次数地使用星号 (\$1) 通配符。以下是如何使用星号 (\$1) 通配符引用多个资源的部分示例：
+ `arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*`：任何名为的集群中的所有主题 MyTestCluster，无论集群的 UUID 如何。
+ `arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*_test`：集群中名称以 “\$1test” 结尾的所有主题，其名称为，UUID 为 abcd1234-0123-abcd-56 MyTestCluster 78-1234abcd-1。
+ `arn:aws:kafka:us-east-1:0123456789012:transactional-id/MyTestCluster/*/5555abcd-1111-abcd-1234-abcd1234-1`：所有交易 ID 为 5555abcd-1111-abcd-1234-abcd-1234-1 的交易，适用于您的账户中命名的集群的所有化身。 MyTestCluster 这意味着，如果您创建了一个名为 MyTestCluster的集群，然后将其删除，然后创建另一个同名集群，则可以使用此资源 ARN 在两个集群上表示相同的交易 ID。但是，无法访问已删除的集群。

# 客户端授权策略的常见使用场景
<a name="iam-access-control-use-cases"></a>

下表中的第一列显示了一些常见用例。要授权客户端执行给定用例，请在客户端的授权策略中包含该用例所需的操作，并将 `Effect` 设置为 `Allow`。

有关 Amazon MSK 的 IAM 访问控制之所有操作的信息，请参阅 [IAM 授权策略、操作和资源的语义](kafka-actions.md)。

**注意**  
默认情况下，操作将被拒绝。您必须明确允许要授权客户端执行的每个操作。


****  

| 使用案例 | 所需的操作 | 
| --- | --- | 
| Admin |  `kafka-cluster:*`  | 
| 创建话题 |  `kafka-cluster:Connect` `kafka-cluster:CreateTopic`  | 
| 生成数据 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData`  | 
| 使用数据 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:DescribeGroup` `kafka-cluster:AlterGroup` `kafka-cluster:ReadData`  | 
| 以幂等方式生成数据 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData` `kafka-cluster:WriteDataIdempotently`  | 
| 以事务方式生成数据 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData` `kafka-cluster:DescribeTransactionalId` `kafka-cluster:AlterTransactionalId`  | 
| 描述集群的配置 |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration`  | 
| 更新集群的配置 |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration` `kafka-cluster:AlterClusterDynamicConfiguration`  | 
| 描述主题的配置 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration` | 
| 更新主题的配置 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration` `kafka-cluster:AlterTopicDynamicConfiguration`  | 
| 更改主题 |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:AlterTopic`  | 

# Amazon MSK 的双向 TLS 客户端身份验证
<a name="msk-authentication"></a>

可以为从应用程序到 Amazon MSK 代理的连接启用 TLS 客户端身份验证。要使用客户端身份验证，您需要有 AWS 私有 CA。 AWS 私有 CA 可以与您的集群位于 AWS 账户 同一个账户中，也可以位于不同的账户中。有关 AWS 私有 CA s 的信息，请参阅[创建和管理 AWS 私有 CA](https://docs.aws.amazon.com/acm-pca/latest/userguide/create-CA.html)。

Amazon MSK 不支持证书吊销列表 () CRLs。要控制对集群主题的访问权限或屏蔽已泄露的证书，请使用 Apache Kafka ACLs 和 AWS 安全组。有关使用 Apache Kafka 的信息 ACLs，请参阅。[阿帕奇 Kafka ACLs](msk-acls.md)

**Topics**
+ [创建支持客户端身份验证的 Amazon MSK 集群](msk-authentication-cluster.md)
+ [将客户端设置为使用身份验证](msk-authentication-client.md)
+ [使用身份验证生成和使用消息](msk-authentication-messages.md)

# 创建支持客户端身份验证的 Amazon MSK 集群
<a name="msk-authentication-cluster"></a>

此过程向您展示如何使用启用客户端身份验证 AWS 私有 CA。
**注意**  
在使用双向 TLS 控制访问时，我们强烈建议 AWS 私有 CA 对每个 MSK 集群使用独立模式。这样做可以确保由签名的 TLS 证书 PCAs 仅在单个 MSK 集群中进行身份验证。

1. 使用以下内容创建名为 `clientauthinfo.json` 的文件。*Private-CA-ARN*替换为您的 PCA 的 ARN。

   ```
   {
      "Tls": {
          "CertificateAuthorityArnList": ["Private-CA-ARN"]
       }
   }
   ```

1. 创建一个名为 `brokernodegroupinfo.json` 的文件，如[使用创建预配置的 Amazon MSK 集群 AWS CLI](create-cluster-cli.md)中所述。

1. 客户端身份验证还要求您启用客户端和代理之间的传输中加密。使用以下内容创建名为 `encryptioninfo.json` 的文件。*KMS-Key-ARN*替换为您的 KMS 密钥的 ARN。可以将 `ClientBroker` 设置为 `TLS` 或 `TLS_PLAINTEXT`。

   ```
   {
      "EncryptionAtRest": {
          "DataVolumeKMSKeyId": "KMS-Key-ARN"
       },
      "EncryptionInTransit": {
           "InCluster": true,
           "ClientBroker": "TLS"
       }
   }
   ```

   有关加密的更多信息，请参阅[Amazon MSK 加密](msk-encryption.md)。

1. 在 AWS CLI 安装了身份验证和传输中加密的计算机上，运行以下命令来创建集群。保存响应中提供的集群 ARN。

   ```
   aws kafka create-cluster --cluster-name "AuthenticationTest" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --client-authentication file://clientauthinfo.json --kafka-version "{YOUR KAFKA VERSION}" --number-of-broker-nodes 3
   ```

# 将客户端设置为使用身份验证
<a name="msk-authentication-client"></a>

此过程描述了如何设置 Amazon EC2 实例以用作客户端来使用身份验证。

此过程介绍了如何通过创建客户端计算机、创建主题和配置所需的安全设置，使用身份验证来生成和使用消息。

1. 创建用作客户端计算机的 Amazon EC2 实例。为简单起见，请在用于集群的同一 VPC 中创建此实例。有关如何创建此类客户端计算机的示例，请参阅[步骤 3：创建客户端计算机](create-client-machine.md)。

1. 创建主题。有关示例，请参阅[步骤 4：在 Amazon MSK 集群中创建主题](create-topic.md)下的说明。

1. 在已 AWS CLI 安装的计算机上，运行以下命令以获取集群的引导代理。*Cluster-ARN*替换为集群的 ARN。

   ```
   aws kafka get-bootstrap-brokers --cluster-arn Cluster-ARN
   ```

   保存与响应中的 `BootstrapBrokerStringTls` 关联的字符串。

1. 在客户端计算机上，运行以下命令以使用 JVM 信任存储来创建客户端信任存储。如果您的 JVM 路径不同，请相应地调整命令。

   ```
   cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64/jre/lib/security/cacerts kafka.client.truststore.jks
   ```

1. 在客户端计算机上，运行以下命令为客户端创建私有密钥。用您选择*Your-Key-Pass*的字符串替换*Distinguished-Name**Example-Alias**Your-Store-Pass*、、和。

   ```
   keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass Your-Store-Pass -keypass Your-Key-Pass -dname "CN=Distinguished-Name" -alias Example-Alias -storetype pkcs12 -keyalg rsa
   ```

1. 在客户端计算机上，运行以下命令以使用您在上一步中创建的私有密钥创建证书请求。

   ```
   keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass
   ```

1. 打开 `client-cert-sign-request` 文件，并确保该文件的开头为 `-----BEGIN CERTIFICATE REQUEST-----` 且结尾为 `-----END CERTIFICATE REQUEST-----`。如果该文件的开头为 `-----BEGIN NEW CERTIFICATE REQUEST-----`，请从文件的开头和结尾处删除单词 `NEW`（及其后面的单个空格）。

1. 在已 AWS CLI 安装证书的计算机上，运行以下命令对证书请求进行签名。*Private-CA-ARN*替换为您的 PCA 的 ARN。如果需要，您可以更改有效性值。在这里，我们以 300 为例。

   ```
   aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS"
   ```

   保存响应中提供的证书 ARN。
**注意**  
要检索您的客户端证书，请使用 `acm-pca get-certificate` 命令并指定您的证书 ARN。有关更多信息，请参阅《AWS CLI Command Reference》**中的 [get-certificate](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/acm-pca/get-certificate.html)。

1. 运行以下命令获取为您 AWS 私有 CA 签名的证书。*Certificate-ARN*替换为您从对上一个命令的响应中获得的 ARN。

   ```
   aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN
   ```

1. 从运行上一条命令所获得的 JSON 结果中，复制与 `Certificate` 和 `CertificateChain` 关联的字符串。将这两个字符串粘贴到名为的新文件中 signed-certificate-from-acm。先粘贴与 `Certificate` 关联的字符串，然后粘贴与 `CertificateChain` 关联的字符串。将 `\n` 字符替换为新行。以下是将证书和证书链粘贴到其中之后的文件结构。

   ```
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   ```

1. 在客户端计算机上运行以下命令将此证书添加到您的密钥库中，以便能在与 MSK 代理交流时出示此证书。

   ```
   keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass
   ```

1. 使用以下内容创建名为 `client.properties` 的文件。将信任存储和密钥库位置调整为您将 `kafka.client.truststore.jks` 保存到的路径。用你的 Kafka 客户端版本代替*\$1YOUR KAFKA VERSION\$1*占位符。

   ```
   security.protocol=SSL
   ssl.truststore.location=/tmp/kafka_2.12-{YOUR KAFKA VERSION}/kafka.client.truststore.jks
   ssl.keystore.location=/tmp/kafka_2.12-{YOUR KAFKA VERSION}/kafka.client.keystore.jks
   ssl.keystore.password=Your-Store-Pass
   ssl.key.password=Your-Key-Pass
   ```

# 使用身份验证生成和使用消息
<a name="msk-authentication-messages"></a>

此过程介绍了如何使用身份验证生成和使用消息。

1. 运行以下命令以创建主题。名为 `client.properties` 的文件是您在上一过程中创建的文件。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapBroker-String --replication-factor 3 --partitions 1 --topic ExampleTopic --command-config client.properties
   ```

1. 运行以下命令以启动控制台生成器。名为 `client.properties` 的文件是您在上一过程中创建的文件。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --bootstrap-server BootstrapBroker-String --topic ExampleTopic --producer.config client.properties
   ```

1. 在客户端计算机上的新命令窗口中，运行以下命令以启动控制台使用器。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBroker-String --topic ExampleTopic --consumer.config client.properties
   ```

1. 在生成器窗口中键入消息，并观察消息显示在使用器窗口中。

# 使用 S AWS ecrets Manager 进行登录凭据身份验证
<a name="msk-password"></a>

您可以使用使用 S AWS ecrets Manager 存储和保护的登录凭证来控制对您的 Amazon MSK 集群的访问权限。将用户凭证存储在 Secrets Manager 中可以减少集群身份验证的开销，例如审计、更新和轮换凭证。Secrets Manager 还让您能够跨集群共享用户凭证。

将密钥与 MSK 集群关联后，MSK 会定期同步该凭证数据。

**Topics**
+ [登录凭证身份验证的工作原理](msk-password-howitworks.md)
+ [为 Amazon MSK 集群设置 SASL/SCRAM 身份验证](msk-password-tutorial.md)
+ [使用用户](msk-password-users.md)
+ [使用 SCRAM 密钥时的限制](msk-password-limitations.md)

# 登录凭证身份验证的工作原理
<a name="msk-password-howitworks"></a>

Amazon MSK 的登录凭证身份验证使用的身份验证 SASL/SCRAM （简单身份验证和安全层/加盐质询响应机制）身份验证。要为集群设置登录凭证身份验证，您可以在 [AWS Secrets Manager](https://docs.aws.amazon.com//secretsmanager/?id=docs_gateway) 中创建密钥资源，并将登录凭证与该密钥关联。

SASL/SCRAM 在 [RFC 5802](https://tools.ietf.org/html/rfc5802) 中定义。SCRAM 使用安全哈希算法，不会在客户端和服务器之间传输明文登录凭证。

**注意**  
当您为集群设置 SASL/SCRAM 身份验证时，Amazon MSK 会为客户端和代理之间的所有流量启用 TLS 加密。

# 为 Amazon MSK 集群设置 SASL/SCRAM 身份验证
<a name="msk-password-tutorial"></a>

要在 Secr AWS ets Manager 中设置密钥，请按照 Secrets Man [ager 用户指南中的[创建和检索密](https://docs.aws.amazon.com/secretsmanager/latest/userguide/tutorials_basic.html)AWS 钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html)教程进行操作。

在为 Amazon MSK 集群创建密钥时，请注意以下要求：
+ 对于密钥类型，请选择**其他密钥类型（例如 API 密钥）**。
+ 您的密钥名称必须以前缀 **AmazonMSK\$1** 开头。
+ 您必须使用现有的自定义 AWS KMS 密钥或为您的密 AWS KMS 钥创建新的自定义密钥。默认情况下，Secrets Manager 对密 AWS KMS 钥使用默认密钥。
**重要**  
使用默认密钥创建的密 AWS KMS 钥不能用于 Amazon MSK 集群。
+ 您的登录凭证数据必须采用以下格式，才能使用**明文**选项输入键值对。

  ```
  {
    "username": "alice",
    "password": "alice-secret"
  }
  ```
+ 记录密钥的 ARN（Amazon 资源名称）值。
+ 
**重要**  
您不能将 Secrets Manager 密钥与超出 [调整集群的大小：每个标准代理的分区数量](bestpractices.md#partitions-per-broker) 中所述限制的集群关联。
+ 如果您使用创建密钥，请为参数指定密钥 ID 或 ARN。 AWS CLI `kms-key-id`不要指定别名。
+ 要将密钥与您的集群关联，请使用 Amazon MSK 控制台或[ BatchAssociateScramSecret](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html#BatchAssociateScramSecret)操作。
**重要**  
当您将密钥与集群关联时，Amazon MSK 会向该密钥附加资源策略，以允许您的集群访问和读取您定义的密钥值。您不应修改此资源策略。这样做可能会阻止您的集群访问密钥。如果对密钥资源策略和/或用于密钥加密的 KMS 密钥进行了任何更改，请确保将密钥重新关联至 MSK 集群。这可以确保集群能继续访问密钥。

  `BatchAssociateScramSecret` 操作的以下 JSON 输入示例将密钥与集群关联：

  ```
  {
    "clusterArn" : "arn:aws:kafka:us-west-2:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",          
    "secretArnList": [
      "arn:aws:secretsmanager:us-west-2:0123456789019:secret:AmazonMSK_MyClusterSecret"
    ]
  }
  ```

# 使用登录凭证连接到集群
<a name="msk-password-tutorial-connect"></a>

在创建密钥并将其与集群关联后，您便可以将客户端连接到集群。以下过程演示如何将客户端连接到使用 SASL/SCRAM 身份验证的集群。此外还展示了如何通过示例主题生成和使用。

**Topics**
+ [使用 SASL/SCRAM 身份验证将客户端连接到集群](#w2aab9c13c29c17c13c11b9b7)
+ [排除连接问题](#msk-password-tutorial-connect-troubleshooting)

## 使用 SASL/SCRAM 身份验证将客户端连接到集群
<a name="w2aab9c13c29c17c13c11b9b7"></a>

1. 在已 AWS CLI 安装的计算机上运行以下命令。*clusterARN*替换为集群的 ARN。

   ```
   aws kafka get-bootstrap-brokers --cluster-arn clusterARN
   ```

   从该命令的 JSON 结果中，保存与名为 `BootstrapBrokerStringSaslScram` 的字符串关联的值。该值将在后面的步骤中使用。

1. 在您的客户端计算机上，创建一个 JAAS 配置文件，其中包含存储在密钥中的用户凭证。例如，对于用户 **alice**，使用以下内容创建一个名为 `users_jaas.conf` 的文件。

   ```
   KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="alice"
      password="alice-secret";
   };
   ```

1. 使用以下命令将 JAAS 配置文件导出为 `KAFKA_OPTS` 环境参数。

   ```
   export KAFKA_OPTS=-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf
   ```

1. 在 `/tmp` 目录中创建一个名为 `kafka.client.truststore.jks` 的文件。

1. （可选）使用以下命令将 JDK 密钥存储文件从 JVM `cacerts` 文件夹复制到您在上一步中创建的 `kafka.client.truststore.jks` 文件。*JDKFolder*替换为实例上的 JDK 文件夹的名称。例如，您的 JDK 文件夹可能命名为 `java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64`。

   ```
   cp /usr/lib/jvm/JDKFolder/lib/security/cacerts /tmp/kafka.client.truststore.jks
   ```

1. 在 Apache Kafka 安装的 `bin` 目录中，创建一个名为 `client_sasl.properties` 的客户端属性文件，其中包含以下内容。此文件可定义 SASL 机制和协议。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=SCRAM-SHA-512
   ```

1. 要创建示例主题，请运行以下命令。*BootstrapBrokerStringSaslScram*替换为您在本主题的步骤 1 中获得的引导代理字符串。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapBrokerStringSaslScram --command-config <path-to-client-properties>/client_sasl.properties --replication-factor 3 --partitions 1 --topic ExampleTopicName
   ```

1. 要生成您创建的示例主题，请在客户端计算机上运行以下命令。*BootstrapBrokerStringSaslScram*替换为您在本主题的步骤 1 中检索到的引导代理字符串。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerStringSaslScram --topic ExampleTopicName --producer.config client_sasl.properties
   ```

1. 要使用您创建的主题，在您的客户端计算机上运行以下命令。*BootstrapBrokerStringSaslScram*替换为您在本主题的步骤 1 中获得的引导代理字符串。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --from-beginning --consumer.config client_sasl.properties
   ```

## 排除连接问题
<a name="msk-password-tutorial-connect-troubleshooting"></a>

运行 Kafka 客户端命令时，可能会遇到 Java 堆内存错误，尤其是在使用大型主题或数据集时。出现这些错误的原因是，Kafka 工具作为 Java 应用程序运行，其默认内存设置可能不足以供工作负载使用。

要解决 `Out of Memory Java Heap` 错误，可通过修改 `KAFKA_OPTS` 环境变量包含内存设置来增大 Java 堆大小。

以下示例将最大堆大小设置为 1GB (`-Xmx1G`)。此值可根据可用系统内存和要求进行调整。

```
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf -Xmx1G"
```

要使用大型主题，可考虑使用基于时间或基于偏移量的参数（而不是 `--from-beginning`）来限制内存使用量：

```
<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --max-messages 1000 --consumer.config client_sasl.properties
```

# 使用用户
<a name="msk-password-users"></a>

**创建用户：**您在密钥中以键值对的形式创建用户。在 Secrets Manager 控制台中使用**明文**选项时，应按以下格式指定登录凭证数据。

```
{
  "username": "alice",
  "password": "alice-secret"
}
```

**撤销用户访问权限：**要撤销用户访问集群的凭证，建议您先在集群上移除或强制执行 ACL，然后取消与该密钥的关联。这是因为：
+ 移除用户并不能关闭现有连接。
+ 对密钥的更改最多需要 10 分钟才能传播。

有关将 ACL 与 Amazon MSK 结合使用的更多信息，请参阅 [阿帕奇 Kafka ACLs](msk-acls.md)。

对于使用 ZooKeeper 模式的集群，我们建议您限制对 ZooKeeper 节点的访问以防止用户进行修改 ACLs。有关更多信息，请参阅 [控制对亚马逊 MSK ZooKeeper 集群中 Apache 节点的访问权限](zookeeper-security.md)。

# 使用 SCRAM 密钥时的限制
<a name="msk-password-limitations"></a>

使用 SCRAM 密钥时请注意以下限制：
+ Amazon MSK 仅支持 SCRAM-SHA-512 身份验证。
+ 一个 Amazon MSK 集群最多可拥有 1000 个用户。
+ 你必须在你的密钥中 AWS KMS key 使用。您不能将使用默认 Secrets Manager 加密密钥的密钥与 Amazon MSK 一起使用。有关创建 KMS 密钥的信息，请参阅 [Creating symmetric encryption KMS keys](https://docs.aws.amazon.com/kms/latest/developerguide/create-keys.html#create-symmetric-cmk)。
+ 您无法在 Secrets Manager 中使用非对称 KMS 密钥。
+ 使用该[ BatchAssociateScramSecret](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html#BatchAssociateScramSecret)操作，您一次最多可以将 10 个密钥与一个集群关联。
+ 与 Amazon MSK 集群关联的密钥的名称必须带有前缀 **AmazonMSK\$1**。
+ 与 Amazon MSK 集群关联的密钥必须与集群位于相同的 Amazon Web Services 账户和 AWS 区域中。

# 阿帕奇 Kafka ACLs
<a name="msk-acls"></a>

Apache Kafka 有一个可插拔的授权器，并附带了授权器实现。 out-of-boxAmazon MSK 在代理上的 `server.properties` 文件中启用此授权方。

Apache Kafka ACLs 的格式为 “主体 P 是 [允许/拒绝] 来自主机 H 对任何与 RP 匹配的资源 R 的操作 O”。 ResourcePattern 如果 RP 与特定资源 R 不匹配，则 R 没有关联 ACLs，因此除了超级用户之外，不允许任何其他人访问 R。要更改此 Apache Kafka 行为，请将该属性`allow.everyone.if.no.acl.found`设置为 true。默认情况下，Amazon MSK 会将其设置为 true。这意味着， ACLs 在 Amazon MSK 集群中，如果您没有明确设置资源，则所有委托人都可以访问此资源。如果您在资源 ACLs 上启用，则只有授权的委托人才能访问该资源。如果要限制对主题的访问并使用 TLS 双向身份验证对客户端进行授权，请 ACLs 使用 Apache Kafka 授权器 CLI 进行添加。有关添加、删除和列出的更多信息 ACLs，请参阅 [Kafka 授权命令行界面](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Authorization+Command+Line+Interface)。

由于 Amazon MSK 将代理配置为超级用户，因此代理可访问所有主题。这有助于代理从主分区复制消息，无论是否为集群配置定义了 `allow.everyone.if.no.acl.found` 属性。

**添加或删除对主题的读写访问权**

1. 将您的代理添加到 ACL 表中，以允许他们读取所有已 ACLs 存在的主题。要授予代理对主题的读取访问权限，请在可与 MSK 集群通信的客户端计算机上运行以下命令。

   *Distinguished-Name*替换为集群中任何引导程序代理的 DNS，然后将此可分辨名称中第一个句点之前的字符串替换为星号 () `*`。例如，如果集群的某个引导代理有 DNS`b-6.mytestcluster.67281x.c4.kafka.us-east-1.amazonaws.com`，则在以下命令*Distinguished-Name*中替换为。`*.mytestcluster.67281x.c4.kafka.us-east-1.amazonaws.com`有关如何获取引导代理的信息，请参阅[获取 Amazon MSK 集群的引导代理](msk-get-bootstrap-brokers.md)。

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Read --group=* --topic Topic-Name
   ```

1. 要授予客户端应用对主题的读访问权，请在客户端计算机上运行以下命令。如果您使用双向 TLS 身份验证，请使用与创建私钥时相同的*Distinguished-Name*身份验证。

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Read --group=* --topic Topic-Name
   ```

   要删除读访问权，您可以运行相同的命令，并将 `--add` 替换为 `--remove`。

1. 要授予对主题的写访问权，请在客户端计算机上运行以下命令。如果您使用双向 TLS 身份验证，请使用与创建私钥时相同的*Distinguished-Name*身份验证。

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Write --topic Topic-Name
   ```

   要删除写访问权，您可以运行相同的命令，并将 `--add` 替换为 `--remove`。