Apache Kafka
Apache Kafka (Kafka) アクションは、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、Confluent Cloud
注記
このトピックでは、Apache Kafka プラットフォームおよび関連概念について精通していることを前提としています。Apache Kafka の詳細については、「Apache Kafka
要件
このルールアクションには、以下の要件があります。
-
ec2:CreateNetworkInterface、ec2:DescribeNetworkInterfaces、ec2:CreateNetworkInterfacePermission、ec2:DeleteNetworkInterface、ec2:DescribeSubnets、ec2:DescribeVpcs、ec2:DescribeVpcAttributeおよびec2:DescribeSecurityGroupsのオペレーションを実行するために AWS IoT が引き受けることができる IAM ロール。このロールは、Kafka ブローカーに到達するために、Amazon Virtual Private Cloud への伸縮自在なネットワークインターフェイスを作成および管理します。詳細については、「AWS IoT ルールに必要なアクセスを付与する」を参照してください。AWS IoT コンソールで、AWS IoT Core がこのルールアクションを実行できるようにするロールを選択または作成できます。
ネットワークインターフェイスの詳細については、Amazon EC2 ユーザーガイドの「Elastic Network Interface」を参照してください。
指定したロールにアタッチされるポリシーは次の例のようになります。
-
AWS Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合は、AWS IoT Core が
secretsmanager:GetSecretValueオペレーションとsecretsmanager:DescribeSecretオペレーションを実行するために引き受けることができる IAM ロールを作成する必要があります。指定したロールにアタッチされるポリシーは次の例のようになります。
-
Amazon Virtual Private Cloud (Amazon VPC) 内で Apache Kafka クラスターを実行できます。AWS IoT からパブリックな Kafka クラスターにメッセージを転送するには、Amazon VPC の送信先を作成し、サブネットで NAT ゲートウェイを使用する必要があります。AWS IoT ルールエンジンは、VPC 送信先にリストされている各サブネットにネットワークインターフェイスを作成し、VPC に直接トラフィックをルーティングします。VPC 送信先を作成すると、AWS IoT ルールエンジンによって VPC ルールアクションが自動的に作成されます。VPC ルールアクションの詳細については、Virtual private cloud (仮想プライベートクラウド )(VPC) 送信先 を参照してください。
-
保管中のデータを暗号化するために、カスタマーマネージド AWS KMS key (KMS キー) を使用する場合は、サービスが発信者に代わって KMS キーを使用する許可を持っている必要があります。詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドの Amazon MSK 暗号化を参照してください。
パラメータ
このアクションで AWS IoT ルールを作成するときは、次の情報を指定する必要があります。
- destinationArn
-
VPC 送信先の Amazon リソースネーム (ARN)。VPC 送信先の作成方法の詳細については、「Virtual private cloud (仮想プライベートクラウド )(VPC) 送信先」を参照してください。
- トピック
-
Kafka のブローカーに送信されるメッセージの Kafka のトピック。
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- キー (オプション)
-
Kafka のメッセージキー
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- ヘッダー (オプション)
-
指定した Kafka ヘッダーのリスト。各ヘッダーは、Kafka アクションを作成するときに指定できるキーと値のペア (1 つのキーと 1 つの値) です。これらのヘッダーを使用して、メッセージペイロードを変更せずに IoT クライアントからダウンストリーム Kafka クラスターにデータをルーティングできます。
このフィールドは、置換テンプレートを使用して置換できます。インラインルールの関数を Kafka Action のヘッダーで代替テンプレートとして渡す方法については、「例」を参照してください。詳細については、「置換テンプレート」を参照してください。
注記
バイナリ形式のヘッダーはサポートされていません。
- パーティション (オプション)
-
Kafka のメッセージパーティション。
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- clientProperties
-
Apache Kafka プロデューサークライアントのプロパティを定義するオブジェクト。
- acks (オプション)
-
リクエストが完了したとみなされる前に、プロデューサーがサーバーに受信することを求める確認応答の数。
値として 0 を指定すると、プロデューサーはサーバーからの確認応答を待機しなくなります。サーバーがメッセージを受信しない場合、プロデューサーはメッセージの送信を再試行しません。
有効な値は、
-1、0、1、allです。デフォルト値は1です。 - bootstrap.servers
-
Kafka クラスターへの初期接続を確立するために使用されるホストとポートのペア (
host1:port1、host2:port2など) のリスト。 - compression.type (optional)
-
プロデューサーによって生成されるすべてのデータの圧縮タイプ。
有効な値:
none、gzip、snappy、lz4、zstd。デフォルト値はnoneです。 - security.protocol
-
Kafka ブローカーにアタッチするために使用されるセキュリティプロトコル。
有効な値:
SSL、SASL_SSL。デフォルト値はSSLです。 - key.serializer
-
ProducerRecordで提供するキーオブジェクトをバイトに変換する方法を指定します。有効な値:
StringSerializer。 - value.serializer
-
ProducerRecordで提供する値オブジェクトをバイトに変換する方法を指定します。有効な値:
ByteBufferSerializer。 - ssl.truststore
-
base64 形式のトラストストアファイル、または AWS Secrets Manager 内のトラストストアファイルの場所。トラストストアが Amazon 認証機関 (CA) によって信頼されている場合は、この値は必須ではありません。
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合、
get_secretSQL 関数を使用してこのフィールドの値を取得できます。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。トラストストアがファイル形式の場合は、SecretBinaryパラメータを使用します。トラストストアが文字列の形式である場合は、SecretStringパラメータを使用します。この値の最大サイズは 65 KB です。
- ssl.truststore.password
-
信頼ストアのパスワード。この値は、トラストストアのパスワードを作成した場合にのみ必要です。
- ssl.keystore
-
キーストアファイル。
security.protocolの値としてSSLを指定する場合、この値は必須です。このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretBinaryパラメータを使用します。 - ssl.keystore.password
-
キーストアファイルのストアパスワード。
ssl.keystoreの値を指定している場合、この値は必須です。このフィールドの値はプレーンテキストにすることができます。このフィールドは、代替テンプレートもサポートします。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretStringパラメータを使用します。 - ssl.key.password
-
キーストアファイル内のプライベートキーのパスワード。
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretStringパラメータを使用します。 - sasl.mechanism
-
Kafka のブローカーに接続するために使用されるセキュリティメカニズム。この値は、
security.protocolのSASL_SSLを指定する場合に必要です。有効な値:
PLAIN、SCRAM-SHA-512、GSSAPI。注記
SCRAM-SHA-512は、cn-north-1、cn-northwest-1、us-gov-east-1、および us-gov-west-1 リージョンでサポートされている唯一のセキュリティメカニズムです。 - sasl.plain.username
-
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのPLAINを指定する場合に必要です。 - sasl.plain.password
-
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのPLAINを指定する場合に必要です。 - sasl.scram.username
-
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのSCRAM-SHA-512を指定する場合に必要です。 - sasl.scram.password
-
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのSCRAM-SHA-512を指定する場合に必要です。 - sasl.kerberos.keytab
-
Secrets Manager の Kerberos 認証用のキータブファイル。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretBinaryパラメータを使用します。 - sasl.kerberos.service.name
-
Apache Kafka が実行される Kerberos プリンシパル名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。 - sasl.kerberos.krb5.kdc
-
Apache Kafka プロデューサークライアントが接続するキー配布センター (KDC) のホスト名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。 - sasl.kerberos.krb5.realm
-
Apache Kafka プロデューサークライアントが接続する領域。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。 - sasl.kerberos.principal
-
Kerberos 対応サービスにアクセスするためのチケットを Kerberos が割り当てることができる一意の Kerberos ID。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。
例
次の JSON 例では、AWS IoT ルールで Apache Kafka アクションを定義します。次の例では、sourceIp () インライン関数を Kafka Action ヘッダーの代替テンプレートとして渡します。
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Kerberos セットアップに関する重要な注意事項
-
キー配布センター (KDC) は、ターゲット VPC 内のプライベートドメインネームシステム (DNS) を介して解決可能である必要があります。考えられる方法の 1 つは、KDC DNS エントリをプライベートホストゾーンに追加することです。このアプローチの詳細については、「プライベートホストゾーンの使用」を参照してください。
-
各 VPC で DNS 解決が有効になっている必要があります。詳細については、「Using DNS with Your VPC」を参照してください。
-
VPC 送信先のネットワークインターフェイスセキュリティグループとインスタンスレベルのセキュリティグループは、次のポートで VPC 内からのトラフィックを許可する必要があります。
-
ブートストラップブローカーのリスナーポート上の TCP トラフィック (通常は 9092 ですが、9000~9100 の範囲内である必要があります)
-
KDC のポート 88 の TCP および UDP トラフィック
-
-
SCRAM-SHA-512は、cn-north-1、cn-northwest-1、us-gov-east-1、および us-gov-west-1 リージョンでサポートされている唯一のセキュリティメカニズムです。