Lambda での Amazon MSK イベントソースの設定 - AWS Lambda

Lambda での Amazon MSK イベントソースの設定

Amazon MSK クラスターを Lambda 関数のイベントソースとして使用するには、2 つのリソースを接続するイベントソースマッピングを作成します。このページでは、Amazon MSK 用にイベントソースマッピングを作成する方法について説明します。

このページの説明は、MSK クラスターおよびそのクラスターが存在する Amazon 仮想プライベートクラウド (VPC) を正しく設定済みであることを前提としています。クラスターまたは VPC をセットアップする必要がある場合は、「Amazon MSK クラスターおよび Amazon VPC ネットワークの Lambda 向け設定」を参照してください。

イベントソースとしての Amazon MSK クラスターの使用

Apache Kafka クラスターまたは Amazon MSK クラスターを Lambda 関数のトリガーとして追加すると、クラスターはイベントソースとして使用されます。

Lambda は、ユーザーが指定した開始位置に基づいて、CreateEventSourceMapping リクエストで Topics として指定した Kafka トピックからイベントデータを読み取ります。処理が成功すると、Kafka トピックは Kafka クラスターにコミットされます。

Lambda は、Kafka トピックの各パーティションのメッセージを順番に読み込みます。1 つの Lambda ペイロードに、複数のパーティションからのメッセージを含めることができます。利用可能なレコードが増えると、Lambda は CreateEventSourceMapping リクエストで指定した BatchSize 値に基づいて、関数がトピックに追いつくまでバッチ単位でレコードの処理を継続します。

Lambda は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。すべての再試行が失敗したレコードを、障害発生時の送信先に送信して、後で処理することができます。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。

Amazon MSK イベントソース向けイベントソースマッピング作成

イベントソースマッピングの作成には、Lambda コンソール、AWS Command Line Interface (CLI)AWS SDK のいずれかを使用できます。

注記

イベントソースマッピングを作成すると、Lambda は MSK クラスターを含むプライベートサブネットにハイパープレーン ENI を作成し、これにより Lambda からの安全な接続を確立します。このハイパープレーン ENI は Lambda 関数ではなく、MSK クラスターのサブネットおよびセキュリティグループ設定を使用します。

次のコンソールの手順で、Lambda 関数のトリガーとして Amazon MSK クラスターを追加します。内部でイベントソースマッピングリソースが作成されます。

Amazon MSK トリガーをLambda 関数 (コンソール) に追加するには
  1. Lambda コンソールの [関数] ページを開きます。

  2. Amazon MSK トリガーを追加する Lambda 関数の名前を選択します。

  3. [関数の概要] で [トリガーを追加] をクリックします。

  4. [トリガー設定][MSK] を選択します。

  5. Kafka クラスターの詳細を指定するには、次の手順を実行してください。

    1. [MSK cluster] (MSK クラスター) で、クラスターを選択します。

    2. [トピック名]に、メッセージを使用する Kafka トピックの名前を入力します。

    3. [コンシューマーグループ ID] に、参加する Kafka コンシューマーグループの ID を入力します (該当する場合)。詳細については、「カスタマイズ可能なコンシューマーグループ ID」を参照してください。

  6. [クラスター認証] で必要な設定を行います。クラスター認証の詳細については、「クラスターの認証方法の設定」を参照してください。

    • 接続の確立時に Lambda で MSK クラスターに対する認証を実行させたい場合、[認証の使用] をオンに切り替えます。認証の使用をお勧めします。

    • 認証を使用する場合、[認証方法] で使用する認証方法を選択します。

    • 認証を使用する場合、[Secrets Manager キー] で、クラスターへのアクセスに必要な認証情報を含む Secrets Manager キーを選択します。

  7. [イベントポーラー設定] で、必要な設定を行います。

    • [トリガーを有効にする] を選択し、作成直後にトリガーが有効化されるようにします。

    • イベントソースマッピングに [プロビジョニングモードの設定] を行うかどうかを選択します。詳細については、「イベントポーラーのスケーリングモード」を参照してください。

      • プロビジョニングモードを設定する場合、[最小イベントポーラー] の値または [最大イベントポーラー] の値を入力するか、もしくはこれらの値両方を入力します。

    • [開始位置] で、Lambda がストリームからの読み取りを開始する方法を選択します。詳細については、「ポーリングとストリームの開始位置」を参照してください。

  8. [バッチ処理] で必要な設定を行います。バッチ処理の詳細については、「バッチ処理動作」を参照してください。

    1. [Batch size] (バッチサイズ) で、単一バッチで取得されるメッセージの最大数を設定します。

    2. [バッチウィンドウ] に、Lambda が関数呼び出し前にレコード収集に費やすことのできる最大時間 (秒) を入力します。

  9. [フィルタリング]で必要な設定を行います。のフィルタリングについての詳細は「Amazon MSK イベントソースでのイベントフィルタリングの使用」を参照してください。

    • [フィルター条件] にフィルター条件定義を追加して、イベントを処理するかどうかを決定します。

  10. [障害処理] で必要な設定を行います。障害処理の詳細については、「Amazon MSK イベントソースの破棄されたバッチのキャプチャ」を参照してください。

    • [障害発生時の宛先] には、障害発生時の送信先の ARN を指定します。

  11. [タグ] には、このイベントソースマッピングに関連付けるタグを入力します。

  12. トリガーを追加するには、[Add] (追加) を選択します。

create-event-source-mapping コマンドと一緒に AWS CLI を使用してイベントソースマッピングを作成することもできます。次の例では、LATEST メッセージを開始点として Lambda 関数の my-msk-functionAWSKafkaTopic トピックにマッピングするイベントソースマッピングを作成します。このコマンドは SourceAccessConfiguration オブジェクトも使用して、クラスターに接続するときに SASL/SCRAM 認証を使用するように Lambda に指示します。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

クラスターが mTLS 認証を使用する場合、CLIENT_CERTIFICATE_TLS_AUTH および Secrets Manager のキー ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。これは次のコマンドで示されます。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

クラスターが IAM 認証を使用する場合、SourceAccessConfiguration オブジェクトは不要です。これは次のコマンドで示されます。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function

クラスターの認証方法の設定

Lambda は、Amazon MSK クラスターへのアクセス、レコードの取得、その他のタスクの実行を行うための許可を必要とします。Amazon MSK は、MSK クラスターに対する認証方法をいくつかサポートしています。

非認証アクセス

インターネット経由でクラスターにアクセスするクライアントがない場合は、非認証アクセスを使用できます。

SASL/SCRAM 認証

Lambda は、SHA-512 ハッシュ関数および Transport Layer Security (TLS) 暗号化を使用する Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。Lambda がクラスターに接続できるようにするには、認証情報 (ユーザー名およびパスワード) を Secrets Manager のシークレットに保存し、イベントソースマッピングの設定時にこのシークレットを参照します。

Secrets Manager の使用に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「Secrets Manager を使用したサインイン認証」を参照してください。

注記

Amazon MSK は SASL/PLAIN 認証をサポートしていません。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間に双方向認証を提供します。サーバーがクライアントを認証できるよう、クライアントはサーバーに証明書を送信します。また、クライアントがサーバーを認証できるよう、サーバーもクライアントに証明書を送信します。

Amazon MSK と Lambda の統合の場合、MSK クラスターはサーバーとして機能し、Lambda はクライアントとして機能します。

  • Lambda が MSK クラスターを認証するためには、Secrets Manager でクライアント証明書をシークレットとして設定し、イベントソースマッピング設定でこの証明書を参照します。クライアント証明書は、サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。

  • MSK クラスターは Lambda にもサーバー証明書を送信します。サーバー証明書は、AWS トラストストア内の認証局 (CA) によって署名される必要があります。

Amazon MSK は自己署名サーバー証明書をサポートしていません。Amazon MSK のすべてのブローカーは、デフォルトで Lambda が信頼する Amazon Trust Services CA によって署名されたパブリック証明書を使用します。

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

Lambda は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Amazon MSK 用の mTLS およびクライアント証明書の生成手順に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「Amazon MSK の相互 TLS クライアント認証」を参照してください。

IAM 認証

AWS Identity and Access Management (IAM) を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。IAM 認証を使用すると、Lambda は関数の実行ロールのアクセス許可に依存してクラスターへの接続、レコードの取得、その他の必要なアクションを実行します。必要なアクセス許可を含むサンプルポリシーについては、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「IAM ロールの認可ポリシーを作成する」を参照してください。

MSK クラスターで IAM 認証がアクティブで、シークレットを指定しない場合、Lambda はデフォルトで自動的に IAM 認証を使用します。

Amazon MSK での IAM 認証の詳細については、「IAM アクセスコントロール」を参照してください。

Lambda でのブートストラップブローカーの選択方法

Lambda は、クラスターで使用可能な認証方法、および認証用のシークレットが提供されているかどうかに基づき、ブートストラップブローカーを選択します。mTLS または SASL/SCRAM のシークレットを指定すると、Lambda は自動的にその認証方法を選択します。シークレットを指定しない場合、Lambda は、クラスターでアクティブ化されている中で、最も強力な認証方法を選択します。以下は、Lambda によるブローカー選択の優先度を、最も強力な認証から弱い認証の順に示したものです。

  • mTLS (mTLS 用のシークレットを提供)

  • SASL/SCRAM (SASL/SCRAM 用のシークレットを提供)

  • SASL IAM (シークレットが提供されておらず、IAM 認証がアクティブ)

  • 非認証の TLS (シークレットが提供されておらず、IAM 認証も非アクティブ)

  • プレーンテキスト (シークレットが提供されておらず、IAM 認証と非認証 TLS の両方が非アクティブ)

注記

Lambda から最も安全なブローカータイプへの接続ができない場合でも、Lambda は別の (安全性の低い) ブローカータイプへの接続を試行しません。安全性の低いブローカータイプを Lambda に選択させたい場合は、クラスターが使用している、より強力な認証方法をすべて無効にします。

カスタマイズ可能なコンシューマーグループ ID

Kafka をイベントソースとして設定する際、コンシューマーグループ IDを指定できます。このコンシューマーグループ ID は、Lambda 関数を結合したい Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Kafka レコード処理設定を他のコンシューマーから Lambda にシームレスに移行できます。

Kafka は、コンシューマーグループ内のすべてのコンシューマーにメッセージを配信します。他のアクティブなコンシューマーが存在するコンシューマーグループ ID を指定した場合、Lambda は Kafka トピックからメッセージの一部のみを受信します。Lambda にトピック内のすべてのメッセージを処理させたい場合、そのコンシューマーグループ内の他のコンシューマーをすべてオフにしてください。

さらに、ユーザーがコンシューマーグループ ID を指定し、Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、Lambda は、イベントソースマッピングの StartingPosition を無視します。代わりに、Lambda はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Kafka が既存のコンシューマーグループを見つけられない場合、Lambda は指定された StartingPosition を使用してイベントソースを設定します。

指定するコンシューマーグループ ID は、すべての Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定して Kafka イベントソースマッピングを作成した後は、この値を更新することはできません。

ポーリングとストリームの開始位置

StartingPosition パラメータは、ストリームからのメッセージの読み取りを開始するタイミングを Lambda に指示します。選択できるオプションは 3 つあります。

  • 最新 – Lambda は Kafka トピックの最新レコードの直後から読み取りを開始します。

  • 水平線トリミング – Lambda は Kafka トピック内のトリミングされていない最新レコードから読み取りを開始します。これはトピック内の最も古いレコードになります。

  • タイムスタンプで – Lambda は、タイムスタンプで定義された位置 (秒単位のUnix 時間) から読み取りを開始します 。StartingPositionTimestamp パラメータを使用してタイムスタンプを指定します。

イベントソースマッピングの作成時、または更新が最終的に一貫性を持っている場合にポーリングをストリーミングしてください。

  • イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。

  • イベントソースマッピングの更新時、ストリーミングからのイベントのポーリングの停止と再開に最大 90 秒かかる場合があります。

この挙動のため、LATEST をストリーミング開始位置として指定した場合、作成または更新時にイベントソースマッピングがいくつかのイベントを見逃す可能性があります。イベントが見逃されないようにするには、TRIM_HORIZON または AT_TIMESTAMP を指定してください。

イベントポーラーのスケーリングモード

Kafka イベントソースマッピングのイベントポーラースケーリングについて、2 つのモードのいずれかを選択できます。

オンデマンドモード (デフォルト)

初めて Amazon MSK イベントソースを作成すると、Lambda は Kafka トピック内のすべてのパーティションを処理するために、デフォルトの数のイベントポーラーを割り当てます。Lambda は、メッセージ負荷に基づいてイベントポーラーの数を自動的にスケールアップまたはスケールダウンします。

Lambda は、1 分間隔でトピック内のすべてのパーティションのオフセットラグを評価します。オフセットラグが大きすぎる場合、パーティションは Lambda で処理できる速度よりも速くメッセージを受信します。必要に応じて、Lambda はトピックのイベントポーラーを追加または削除します。イベントポーラーを追加または削除するこの自動スケーリングプロセスは、評価から 3 分以内に実行されます。

ターゲットの Lambda 関数がスロットリングされると、Lambda はイベントポーラーの数を減らします。このアクションにより、イベントポーラーが関数から取得するメッセージ、および関数に送信するメッセージの数が減ることで、関数に対する負荷が軽減されます。

プロビジョンドモード

イベントソースマッピングのスループットを微調整する必要があるワークロードでは、プロビジョンドモードを使用できます。プロビジョンドモードでは、プロビジョニングされたイベントポーラーの最小数と最大数を定義します。これらのプロビジョニングされたイベントポーラーは、イベントソースマッピング専用であり、応答性の高い自動スケーリングによって予期しないメッセージスパイクを処理できます。パフォーマンス要件が厳しい Kafka ワークロードには、プロビジョンドモードを使用することをお勧めします。

Lambda のイベントポーラーは最大 5 MBps のスループットを処理できるコンピューティングユニットです。例えば、イベントソースの平均ペイロードが 1 MB で、平均関数所要時間が 1 秒であるとします。ペイロードで変換 (フィルタリングなど) が発生しない場合、1 つのポーラーが 5 MBps のスループットと 5 つの同時 Lambda 呼び出しをサポートできます。プロビジョニングモードを使用すると、追加コストが発生します。料金の見積もりについては、「AWS Lambda 料金」を参照してください。

注記

プロビジョンドモードを使用する場合は、AWS PrivateLink VPC エンドポイントを作成したり、関連するアクセス許可をネットワーク設定の一部として付与したりする必要はありません。

プロビジョンドモードでは、イベントポーラーの最小数 (MinimumPollers) の許容値の範囲は 1~200 です。イベントポーラーの最大数 (MaximumPollers) の許容値の範囲は 1~2,000 です。MaximumPollersMinimumPollers 以上である必要があります。また、パーティション内での順序付き処理を維持するために、Lambda は MaximumPollers をトピック内のパーティション数に制限します。

イベントポーラーの最小数および最大数に適切な値を選択する方の詳細については、「プロビジョンドモードを使用する際のベストプラクティスと考慮事項」を参照してください。

Amazon MSK イベントソースマッピングのプロビジョンドモードを設定するには、コンソールまたは Lambda API を使用できます。

既存の Amazon MSK イベントソースマッピングに対してプロビジョンドモードを設定する手順 (コンソール)
  1. Lambda コンソールの [関数] ページを開きます。

  2. プロビジョンドモードを設定する Amazon MSK イベントソースマッピングを持つ関数を選択します。

  3. [設定] タブを選択し、[トリガー] を選択します。

  4. プロビジョンドモードを設定する Amazon MSK イベントソースマッピングを選択し、[編集] を選択します。

  5. [イベントソースマッピング設定] で、[プロビジョンドモードの設定] を選択します。

    • [最小イベントポーラー数] に、1~200 の値を入力します。値を指定しない場合、Lambda はデフォルト値 1 を選択します。

    • [最大イベントポーラー数] に、1~2,000 の値を入力します。この値は、[最小イベントポーラー数] の値以上である必要があります。値を指定しない場合、Lambda はデフォルト値 200 を選択します。

  6. [保存] を選択します。

EventSourceMappingConfigurationProvisionedPollerConfig オブジェクトを使用して、プログラムでプロビジョンドモードを設定できます。例えば、次の UpdateEventSourceMapping CLI コマンドは、MinimumPollers 値を 5、MaximumPollers 値を 100 に設定します。

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'

プロビジョニングモードを設定すると、ProvisionedPollers メトリクスをモニタリングすることで、ワークロードのイベントポーラーの使用状況を確認できます。詳細については、「イベントソースマッピングメトリクス」を参照してください。

プロビジョンドモードを無効にしてデフォルト (オンデマンド) モードに戻すには、次の UpdateEventSourceMapping CLI コマンドを使用できます。

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'

プロビジョンドモードを使用する際のベストプラクティスと考慮事項

イベントソースマッピングの最小および最大イベントポーラー数の最適な設定は、アプリケーションのパフォーマンス要件によって異なります。パフォーマンスプロファイルのベースラインを確立する際には、最初はデフォルトの最小イベントポーラー数から始めることをお勧めします。観測されたメッセージ処理パターンと望ましいパフォーマンスプロファイルに基づいて設定を調整します。

トラフィックの急増が発生しやすく、厳格なパフォーマンス要件があるワークロードの場合、メッセージの突然の急増を処理できるように、最小イベントポーラーを増やします。必要な最小イベントポーラー数を決定するには、ワークロードの 1 秒あたりのメッセージ数と平均ペイロードサイズを考慮し、1 つのイベントポーラーのスループットキャパシティ (最大 5 MBps) を参考にします。

パーティション内での順序付き処理を維持するために、Lambda は最大イベントポーラー数をトピック内のパーティション数に制限します。さらに、イベントソースマッピングがスケーリングできる最大イベントポーラー数は、関数の同時実行設定によって異なります。

プロビジョンドモードを有効にする際には、ネットワーク設定を更新して AWS PrivateLink VPC エンドポイントと関連するアクセス許可を削除します。

クロスアカウントのイベントソースマッピングの作成

マルチ VPC プライベート接続を使用して、Lambda 関数を別の AWS アカウントのプロビジョニングされた MSK クラスターに接続できます。マルチ VPC 接続は AWS PrivateLink を使用して、すべてのトラフィックを AWS ネットワーク内に保持します。

注記

サーバーレス MSK クラスターにはクロスアカウントイベントソースマッピングを作成できません。

クロスアカウントイベントソースマッピングを作成するには、まず MSK クラスターのマルチ VPC 接続を設定する必要があります。イベントソースマッピングを作成するときは、以下の例に示すように、クラスター ARN の代わりにマネージド VPC 接続 ARN を使用します。CreateEventSourceMapping オペレーションは、MSK クラスターが使用する認証タイプによっても異なります。

例 — IAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが IAM ロールベースの認証を使用する場合、SourceAccessConfiguration オブジェクトは必要ありません。例:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — SASL/SCRAM 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが SASL/SCRAM 認証を使用する場合は、SASL_SCRAM_512_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。

SASL/SCRAM 認証でクロスアカウントの Amazon MSK イベントソースマッピングにシークレットを使用する方法は 2 つあります。

  • Lambda 関数アカウントにシークレットを作成し、クラスターシークレットと同期します。2 つのシークレットを同期させるローテーションを作成します。このオプションでは、関数アカウントからシークレットを制御できます。

  • MSK クラスターに関連付けられているシークレットを使用してください。このシークレットは、Lambda 関数アカウントへのクロスアカウントアクセスを許可する必要があります。詳細については、「別のアカウントのユーザーの AWS Secrets Manager シークレットに対するアクセス許可」を参照してください。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
例 — mTLS 認証を使用するクラスターのクロスアカウントイベントソースマッピングを作成します。

クラスターが mTLS 認証を使用する場合は、CLIENT_CERTIFICATE_TLS_AUTH および Secrets Manager のシークレット ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。シークレットは、クラスターアカウントまたは Lambda 関数アカウントに保存できます。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

すべての Amazon MSK イベントソースの設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、次のテーブルで示されるように、Amazon MSK に適用されるのは一部のパラメータのみです。

[Parameter] (パラメータ) 必須 デフォルト メモ

AmazonManagedKafkaEventSourceConfig

N

ConsumerGroupID フィールドを含み、デフォルトでは一意の値になっています。

作成時にのみ設定可能

BatchSize

N

100

最大: 10,000

DestinationConfig

N

該当なし

Amazon MSK イベントソースの破棄されたバッチのキャプチャ

有効

N

EventSourceArn

Y

該当なし

作成時にのみ設定可能

FilterCriteria

N

該当なし

Lambda が関数に送信するイベントを制御する

FunctionName

Y

該当なし

KMSKeyArn

N

該当なし

フィルター条件の暗号化

MaximumBatchingWindowInSeconds

N

500 ミリ秒

バッチ処理動作

ProvisionedPollersConfig

N

MinimumPollers: 指定しない場合のデフォルト値は 1

MaximumPollers: 指定しない場合のデフォルト値は 200

プロビジョンドモード

SourceAccessConfigurations

N

認証情報なし

イベントソース用の、SASL/SCRAM あるいは CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) の認証情報

StartingPosition

Y

該当なし

AT_TIMESTAMP、TRIM_HORIZON、または LATEST

作成時にのみ設定可能

StartingPositionTimestamp

N

該当なし

StartingPosition が AT_TIMESTAMP に設定されている場合にのみ必要

[タグ]

N

該当なし

イベントソースマッピングでのタグの使用

トピック

Y

該当なし

カフカのトピック名

作成時にのみ設定可能