

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Apache Kafka
<a name="apache-kafka-rule-action"></a>

La acción de Apache Kafka (Kafka) envía mensajes directamente a [Amazon Managed Streaming para Apache Kafka](https://docs.aws.amazon.com//msk/latest/developerguide/what-is-msk.html) (Amazon MSK), a clústeres de Apache Kafka autoadministrados por proveedores externos, como [Confluent Cloud](https://www.confluent.io/) o a clústeres de Apache Kafka autoadministrados. Con la acción de reglas de Kafka, puede enrutar sus datos de IoT a los clústeres de Kafka. Esto le permite crear canalizaciones de datos de alto rendimiento para diversos fines, como el análisis de flujos, la integración de datos, la visualización y las aplicaciones empresariales esenciales.

**nota**  
En este tema se presupone estar familiarizado con la plataforma Apache Kafka y los conceptos relacionados. Para obtener más información sobre Apache Kafka, consulte [Apache Kafka](https://kafka.apache.org/). No se admite [MSK sin servidor](https://docs.aws.amazon.com//msk/latest/developerguide/serverless.html). Los clústeres de MSK sin servidor solo se pueden crear mediante la autenticación de IAM, que la acción de la regla de Apache Kafka no admite actualmente. Para obtener más información sobre cómo configurar AWS IoT Core con Confluent, consulte [Aprovechar Confluent y AWS resolver los desafíos de la administración de datos y dispositivos de IoT](https://aws.amazon.com/blogs/apn/leveraging-confluent-and-aws-to-solve-iot-device-and-data-management-challenges/).

## Requisitos
<a name="apache-kafka-rule-action-requirements"></a>

Esta regla tiene los siguientes requisitos:
+ Una función de IAM que AWS IoT puede asumir para realizar las `ec2:CreateNetworkInterface` operaciones`ec2:DescribeNetworkInterfaces`,,`ec2:CreateNetworkInterfacePermission`, `ec2:DeleteNetworkInterface` `ec2:DescribeSubnets``ec2:DescribeVpcs`, `ec2:DescribeVpcAttribute` y. `ec2:DescribeSecurityGroups` Este rol crea y administra interfaces de red elásticas para su Amazon Virtual Private Cloud para comunicarse con su agente de Kafka. Para obtener más información, consulte [Otorgar a una AWS IoT regla el acceso que requiere](iot-create-role.md).

  En la AWS IoT consola, puede elegir o crear un rol que permita AWS IoT Core realizar esta acción de regla. 

  A fin de obtener más información sobre las interfaces de red, consulte [Interfaces de red elástica](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-eni.html) en la *Guía del usuario de Amazon EC2*.

  La política asociada al rol especificado debería verse de manera similar al siguiente ejemplo.  
****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "ec2:CreateNetworkInterface",
              "ec2:DescribeNetworkInterfaces",
              "ec2:CreateNetworkInterfacePermission",
              "ec2:DeleteNetworkInterface",
              "ec2:DescribeSubnets",
              "ec2:DescribeVpcs",
              "ec2:DescribeVpcAttribute",
              "ec2:DescribeSecurityGroups"
              ],
              "Resource": "*"
          }
      ]
  }
  ```
+ Si lo utiliza AWS Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka, debe crear un rol de IAM que AWS IoT Core pueda asumir para realizar las operaciones `secretsmanager:GetSecretValue` y`secretsmanager:DescribeSecret`.

  La política asociada al rol especificado debería verse de manera similar al siguiente ejemplo.  
****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "secretsmanager:GetSecretValue",
                  "secretsmanager:DescribeSecret"
              ],
              "Resource": [
                  "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_client_truststore-*",
                  "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_keytab-*"
              ]
          }
      ]
  }
  ```
+ Puede ejecutar sus clústeres de Apache Kafka dentro de Amazon Virtual Private Cloud (Amazon VPC). Debe crear un destino de nube privada virtual (VPC) de Apache Kafka y utilizar una puerta de enlace NAT en las subredes para reenviar los mensajes desde AWS IoT un clúster público de Kafka. El motor de AWS IoT reglas crea una interfaz de red en cada una de las subredes enumeradas en el destino para enrutar el tráfico directamente a la VPC. Cuando llegue a su destino, el motor de AWS IoT reglas crea automáticamente una acción de regla de VPC. Para obtener más información sobre las acciones de las reglas, consulte [Destinos de Apache Kafka Virtual Private Cloud (VPC)](kafka-vpc-destination.md).
+ Si utilizas una clave KMS gestionada AWS KMS key por el cliente para cifrar los datos en reposo, el servicio debe tener permiso para utilizar la clave KMS en nombre de la persona que llama. Para obtener más información, consulte el [cifrado de Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html) en la *guía para desarrolladores Amazon Managed Streaming for Apache Kafka.*.

## Parameters
<a name="apache-kafka-rule-action-parameters"></a>

Al crear una AWS IoT regla con esta acción, debe especificar la siguiente información:

destinationArn  
El nombre del recurso de Amazon (ARN) del destino de la Nube Privada Virtual (VPC) de Apache Kafka. Para obtener información sobre la creación de un destino, consulte. [Destinos de Apache Kafka Virtual Private Cloud (VPC)](kafka-vpc-destination.md)

tema  
Tema de Kafka para que los mensajes se envíen al agente de Kafka.  
Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte [Plantillas de sustitución](iot-substitution-templates.md). 

clave (opcional)  
Clave de mensajes de Kafka.  
Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte [Plantillas de sustitución](iot-substitution-templates.md). 

encabezados (opcional)  
La lista de cabeceras de Kafka que usted especifique. Cada encabezado es un par clave-valor que puede especificar al crear una acción de Kafka. Puede usar estos encabezados para enrutar los datos de los clientes de IoT a los clústeres de Kafka descendentes sin modificar la carga útil de los mensajes.  
Puede sustituir este campo mediante una plantilla de sustitución. [Para saber cómo pasar una función de regla en línea como plantilla de sustitución en el encabezado de la acción Kafka, consulte los ejemplos.](#apache-kafka-rule-action-examples) Para obtener más información, consulte [Plantillas de sustitución](iot-substitution-templates.md).  
Los encabezados en formato binario no son compatibles.

partición (opcional)  
Partición de mensajes de Kafka.  
Puede sustituir este campo mediante una plantilla de sustitución. Para obtener más información, consulte [Plantillas de sustitución](iot-substitution-templates.md).

clientProperties  
Un objeto que define las propiedades del cliente productor de Apache Kafka.    
acks (opcional)  
El número de reconocimientos que el productor requiere que el servidor haya recibido antes de considerar completa una solicitud.  
Si especifica 0 como valor, el productor no esperará ningún acuse de recibo del servidor. Si el servidor no recibe el mensaje, el productor no volverá a intentar enviarlo.  
Valores válidos: `-1`, `0`, `1`, `all`. El valor predeterminado es `1`.  
bootstrap.servers  
Una lista de pares de host y puerto (por ejemplo `host1:port1`, `host2:port2`) que se utilizan para establecer la conexión inicial con el clúster de Kafka.  
compression.type (opcional)  
El tipo de compresión de todos los datos generados por el productor.  
Valores válidos: `none`, `gzip`, `snappy`, `lz4`, `zstd`. El valor predeterminado es `none`.  
security.protocol  
El protocolo de seguridad utilizado para conectarse a su agente de Kafka.  
Valores válidos: `SSL`, `SASL_SSL`. El valor predeterminado es `SSL`.  
key.serializer  
Especifica cómo convertir los objetos clave que usted proporciona con el `ProducerRecord` en bytes.  
Valor válido: `StringSerializer`.  
value.serializer  
Especifica cómo convertir los objetos de valor que proporcione con el `ProducerRecord` en bytes.  
Valor válido: `ByteBufferSerializer`.  
ssl.truststore  
El archivo truststore en formato base64 o la ubicación del archivo truststore en [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/) Este valor no es necesario si su almacén de confianza cuenta con la confianza de las autoridades de certificación (CA) de Amazon.  
Este campo admite plantillas de sustitución. Si usa Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka, puede usar la función SQL `get_secret` para recuperar el valor de este campo. Para obtener más información sobre las plantillas de sustitución, consulte [Plantillas de sustitución](iot-substitution-templates.md). Para obtener más información sobre la función `get_secret`, consulte [get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret). Si el almacén de confianza tiene la forma de un archivo, utilice el parámetro `SecretBinary`. Si el almacén de confianza tiene la forma de una cadena, utilice el parámetro `SecretString`.  
El valor máximo de este recuento es 65 KB.  
ssl.truststore.password  
La contraseña del almacén de confianza. Este valor solo es obligatorio si ha creado una contraseña para el almacén de confianza.  
ssl.keystore  
El archivo del almacén de claves. Este valor es obligatorio cuando se especifica `SSL` como valor para `security.protocol`.  
Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la función SQL `get_secret`. Para obtener más información sobre las plantillas de sustitución, consulte [Plantillas de sustitución](iot-substitution-templates.md). Para obtener más información sobre la función `get_secret`, consulte [get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret). Utilice el parámetro `SecretBinary`.  
ssl.keystore.password  
La contraseña del almacén para el archivo keystore. Este valor es necesario si especifica un valor para `ssl.keystore`.  
El valor de este campo puede ser texto sin formato. Este campo también admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la función SQL `get_secret`. Para obtener más información sobre las plantillas de sustitución, consulte [Plantillas de sustitución](iot-substitution-templates.md). Para obtener más información sobre la función `get_secret`, consulte [get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret). Utilice el parámetro `SecretString`.  
ssl.key.password  
La contraseña de la clave privada del archivo del almacén de claves.  
Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la función SQL `get_secret`. Para obtener más información sobre las plantillas de sustitución, consulte [Plantillas de sustitución](iot-substitution-templates.md). Para obtener más información sobre la función `get_secret`, consulte [get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret). Utilice el parámetro `SecretString`.  
sasl.mechanism  
El mecanismo de seguridad utilizado para conectarse a su agente de Kafka. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol`.  
Valores válidos: `PLAIN`, `SCRAM-SHA-512`, `GSSAPI`.  
`SCRAM-SHA-512`es el único mecanismo de seguridad compatible en las regiones cn-north-1, cn-northwest-1, -1 y -1. us-gov-east us-gov-west  
sasl.plain.username  
El nombre de usuario utilizado para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `PLAIN` para `sasl.mechanism`.  
sasl.plain.password  
La contraseña utilizada para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `PLAIN` para `sasl.mechanism`.  
sasl.scram.username  
El nombre de usuario utilizado para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `SCRAM-SHA-512` para `sasl.mechanism`.  
sasl.scram.password  
La contraseña utilizada para recuperar la cadena secreta de Secrets Manager. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `SCRAM-SHA-512` para `sasl.mechanism`.  
sasl.kerberos.keytab  
El archivo keytab para la autenticación de Kerberos en Secrets Manager. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `GSSAPI` para `sasl.mechanism`.  
Este campo admite plantillas de sustitución. Utilice Secrets Manager para almacenar las credenciales necesarias para conectarse a su agente de Kafka. Para recuperar el valor de este campo, utilice la función SQL `get_secret`. Para obtener más información sobre las plantillas de sustitución, consulte [Plantillas de sustitución](iot-substitution-templates.md). Para obtener más información sobre la función `get_secret`, consulte [get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret). Utilice el parámetro `SecretBinary`.  
sasl.kerberos.service.name  
El nombre principal de Kerberos con el que se ejecuta Apache Kafka. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `GSSAPI` para `sasl.mechanism`.  
sasl.kerberos.krb5.kdc  
El nombre de host del centro de distribución de claves (KDC) al que se conecta su cliente productor de Apache Kafka. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `GSSAPI` para `sasl.mechanism`.  
sasl.kerberos.krb5.realm  
El ámbito al que se conecta su cliente productor de Apache Kafka. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `GSSAPI` para `sasl.mechanism`.  
sasl.kerberos.principal  
La identidad única de Kerberos a la que Kerberos puede asignar tickets para acceder a los servicios compatibles con Kerberos. Este valor es obligatorio cuando se especifica `SASL_SSL` para `security.protocol` y `GSSAPI` para `sasl.mechanism`.

## Ejemplos
<a name="apache-kafka-rule-action-examples"></a>

El siguiente ejemplo de JSON define una acción de Apache Kafka en una regla. AWS IoT El siguiente ejemplo pasa la función en línea [ sourceIP ()](iot-sql-functions.md#iot-function-sourceip) como [plantilla de sustitución](https://docs.aws.amazon.com//iot/latest/developerguide/iot-substitution-templates.html) en el encabezado Kafka Action.

```
{
	"topicRulePayload": {
		"sql": "SELECT * FROM 'some/topic'",
		"ruleDisabled": false,
		"awsIotSqlVersion": "2016-03-23",
		"actions": [
			{
				"kafka": {
					"destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN",
					"topic": "TopicName",
					"clientProperties": {
						"bootstrap.servers": "kafka.com:9092",
						"security.protocol": "SASL_SSL",
						"ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}",
						"ssl.truststore.password": "kafka password",
						"sasl.mechanism": "GSSAPI",
						"sasl.kerberos.service.name": "kafka",
						"sasl.kerberos.krb5.kdc": "kerberosdns.com",
						"sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}",
						"sasl.kerberos.krb5.realm": "KERBEROSREALM",
						"sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com"
					},
					"headers": [
						{
							"key": "static_header_key",
							"value": "static_header_value"
						},
						{
							"key": "substitutable_header_key",
							"value": "${value_from_payload}"
						},
						{
							"key": "source_ip",
							"value": "${sourceIp()}"
						}
					]
				}
			}
		]
	}
}
```

**Notas importantes sobre la configuración de Kerberos**
+ Su centro de distribución de claves (KDC) debe poder resolverse mediante un sistema de nombres de dominio (DNS) privado dentro de la VPC de destino. Un enfoque posible consiste en añadir la entrada DNS del KDC a una zona alojada privada. Para más información sobre este enfoque, consulte [Trabajar con zonas alojadas privadas](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/hosted-zones-private.html).
+ Cada VPC debe tener habilitada la resolución DNS. Para obtener más información, consulte [Utilización de DNS con su VPC](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html).
+ Los grupos de seguridad de la interfaz de red y los grupos de seguridad a nivel de instancia del destino de la VPC deben permitir el tráfico desde dentro de la VPC en los siguientes puertos.
  + El tráfico TCP en el puerto oyente intermediario bootstrap (normalmente 9092, pero debe estar dentro del rango de 9000 a 9100)
  + Tráfico TCP y UDP en el puerto 88 del KDC
+ `SCRAM-SHA-512`es el único mecanismo de seguridad compatible en las regiones cn-north-1, cn-northwest-1, -1 y -1. us-gov-east us-gov-west