Amazon MSK イベントソース向け Lambda イベントソースマッピング作成
イベントソースマッピングの作成には、Lambda コンソール、AWS Command Line Interface (CLI)、AWS SDK
注記
イベントソースマッピングを作成すると、Lambda は MSK クラスターを含むプライベートサブネットにハイパープレーン ENI を作成し、これにより Lambda からの安全な接続を確立します。このハイパープレーン ENI は Lambda 関数ではなく、MSK クラスターのサブネットおよびセキュリティグループ設定を使用します。
次のコンソールの手順で、Lambda 関数のトリガーとして Amazon MSK クラスターを追加します。内部でイベントソースマッピングリソースが作成されます。
Amazon MSK トリガーをLambda 関数 (コンソール) に追加するには
-
Lambda コンソールの [関数]
ページを開きます。 -
Amazon MSK トリガーを追加する Lambda 関数の名前を選択します。
-
[関数の概要] で [トリガーを追加] をクリックします。
-
[トリガー設定] で [MSK] を選択します。
-
Kafka クラスターの詳細を指定するには、次の手順を実行してください。
-
[MSK cluster] (MSK クラスター) で、クラスターを選択します。
-
[トピック名]に、メッセージを使用する Kafka トピックの名前を入力します。
-
[コンシューマーグループ ID] に、参加する Kafka コンシューマーグループの ID を入力します (該当する場合)。詳細については、「Lambda でのカスタマイズ可能なコンシューマーグループ ID」を参照してください。
-
-
[クラスター認証] で必要な設定を行います。クラスター認証の詳細については、「Lambda でのクラスターの認証方法の設定」を参照してください。
-
接続の確立時に Lambda で MSK クラスターに対する認証を実行させたい場合、[認証の使用] をオンに切り替えます。認証の使用をお勧めします。
-
認証を使用する場合、[認証方法] で使用する認証方法を選択します。
-
認証を使用する場合、[Secrets Manager キー] で、クラスターへのアクセスに必要な認証情報を含む Secrets Manager キーを選択します。
-
-
[イベントポーラー設定] で、必要な設定を行います。
-
[トリガーを有効にする] を選択し、作成直後にトリガーが有効化されるようにします。
-
イベントソースマッピングに [プロビジョニングモードの設定] を行うかどうかを選択します。詳細については、「Lambda でのイベントポーラーのスケーリングモード」を参照してください。
-
プロビジョニングモードを設定する場合、[最小イベントポーラー] の値または [最大イベントポーラー] の値を入力するか、もしくはこれらの値両方を入力します。
-
-
[開始位置] で、Lambda がストリームからの読み取りを開始する方法を選択します。詳細については、「Lambda でのポーリングとストリームの開始位置」を参照してください。
-
-
[バッチ処理] で必要な設定を行います。バッチ処理の詳細については、「バッチ処理動作」を参照してください。
-
[Batch size] (バッチサイズ) で、単一バッチで取得されるメッセージの最大数を設定します。
-
[バッチウィンドウ] に、Lambda が関数呼び出し前にレコード収集に費やすことのできる最大時間 (秒) を入力します。
-
-
[フィルタリング]で必要な設定を行います。のフィルタリングについての詳細は「Amazon MSK イベントソースでのイベントフィルタリングの使用」を参照してください。
-
[フィルター条件] にフィルター条件定義を追加して、イベントを処理するかどうかを決定します。
-
-
[障害処理] で必要な設定を行います。障害処理の詳細については、「Amazon MSK イベントソースの破棄されたバッチのキャプチャ」を参照してください。
-
[障害発生時の宛先] には、障害発生時の送信先の ARN を指定します。
-
-
[タグ] には、このイベントソースマッピングに関連付けるタグを入力します。
-
トリガーを追加するには、[Add] (追加) を選択します。
create-event-source-mappingLATEST
メッセージを開始点として Lambda 関数の my-msk-function
を AWSKafkaTopic
トピックにマッピングするイベントソースマッピングを作成します。このコマンドは SourceAccessConfiguration オブジェクトも使用して、クラスターに接続するときに SASL/SCRAM 認証を使用するように Lambda に指示します。
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
クラスターが mTLS 認証を使用する場合、CLIENT_CERTIFICATE_TLS_AUTH
および Secrets Manager のキー ARN を指定する SourceAccessConfiguration オブジェクトを含める必要があります。これは次のコマンドで示されます。
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
クラスターが IAM 認証を使用する場合、SourceAccessConfiguration オブジェクトは不要です。これは次のコマンドで示されます。
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function