セルフマネージド Apache Kafka イベントソース用に Lambda イベントソースマッピングを作成する
イベントソースマッピングの作成には、Lambda コンソール、AWS Command Line Interface (CLI)、AWS SDK
次のコンソールの手順で、Lambda 関数のトリガーとしてセルフマネージド Apache Kafka クラスターを追加します。内部でイベントソースマッピングリソースが作成されます。
前提条件
-
セルフマネージド型 Apache Kafka クラスター。Lambda は、Apache Kafka バージョン 0.10.1.0 以降をサポートしています。
-
セルフマネージド Kafka クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール。
セルフマネージド型 Kafka クラスターを追加する (コンソール)
セルフマネージド型 Apache Kafka クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。
Apache Kafka トリガーを Lambda 関数に追加するには (コンソール)
-
Lambda コンソールの [Functions] (関数) ページ
を開きます。 -
Lambda 関数の名前を選択します。
-
[機能の概要] で、[トリガーを追加] を選択します。
-
[Trigger configuration] (トリガー設定) で次の操作を実行します。
-
Apache Kafka のトリガータイプを選択します。
-
[Bootstrap servers] (ブートストラップサーバー) に、クラスター内の Kafka ブローカーのホストおよびポートのペアアドレスを入力し、[Add] (追加) を選択します。クラスター内の各 Kafka ブローカーで上記を繰り返します。
-
[Topic name] (トピック名) に、クラスター内のレコードの保存に使用する Kafka トピックの名前を入力します。
-
(オプション) [Batch size] (バッチサイズ) に、単一のバッチで取得できるメッセージの最大数を入力します。
-
バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。
-
(オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。
-
(オプション) [開始位置] で、[最新] を選択して最新のレコードからストリームの読み取りを開始するか、[水平トリム] を選択して使用可能な最も以前のレコードから開始するか、または [タイムスタンプ時点] を選択して読み取りを開始するタイムスタンプを指定します。
-
(オプション) [VPC] で、Kafka クラスターに Amazon VPC を選択します。次に、[VPC subnets] (VPC サブネット) と [VPC security groups] (VPC セキュリティグループ) を選択します。
VPC 内のユーザーのみがブローカーにアクセスする場合、この設定は必須です。
-
(オプション) [Authentication] (認証) で [Add] (追加) をクリックしてから、以下を実行します。
-
クラスター内の Kafka ブローカーのアクセスまたは認証プロトコルを選択します。
-
Kafka ブローカーが SASL/PLAIN 認証を使用する場合は、[BASIC_AUTH] を選択します。
-
ブローカーが SALS/SCRAM 認証を使用する場合は、[SASL_SCRAM] プロトコルのいずれかを選択します。
-
mTLS 認証を設定している場合は、[CLIENT_CERTIFICATE_TLS_AUTH] プロトコルを選択します。
-
-
SASL/SCRAM または mTLS 認証の場合は、Kafka クラスターの認証情報が含まれる Secrets Manager シークレットキーを選択します。
-
-
(オプション) Kafka ブローカーがプライベート CA によって署名された証明書を使用する場合、[Encryption] (暗号化) には Kafka ブローカーが TLS 暗号化に使用するルート CA 証明書が含まれる Secrets Manager シークレットを選択します。
この設定は、SASL/SCRAM または SASL/PLAIN の TLS 暗号化、および mTLS 認証に適用されます。
-
テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。
-
-
トリガーを追加するには、[Add] (追加) を選択します。
セルフマネージド型 Kafka クラスターを追加する (AWS CLI)
Lambda 関数のセルフマネージド型 Apache Kafka トリガーを作成および表示するには、次の AWS CLI コマンドの例を使用します。
SASL/SCRAM を使用する
Kafka ユーザーがインターネット経由で Kafka ブローカーにアクセスする場合は、SASL/SCRAM 認証用に作成した Secrets Manager シークレットを指定します。次の例では、create-event-source-mappingmy-kafka-function という名前の Lambda 関数を AWSKafkaTopic という名前の Kafka トピックにマッピングします。
aws lambda create-event-source-mapping \ --topicsAWSKafkaTopic\ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName\ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
VPC の使用
Kafka ブローカーにアクセスするのが VPC 内の Kafka ユーザーのみである場合、VPC、サブネット、および VPC セキュリティグループを指定する必要があります。次の例では、create-event-source-mappingmy-kafka-function という名前の Lambda 関数を AWSKafkaTopic という名前の Kafka トピックにマッピングします。
aws lambda create-event-source-mapping \ --topicsAWSKafkaTopic\ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
AWS CLI を使用したステータスの表示
次の例では、get-event-source-mapping
aws lambda get-event-source-mapping --uuiddh38738e-992b-343a-1077-3478934hjkfd7