セルフマネージド Apache Kafka イベントソース用に Lambda イベントソースマッピングを作成する - AWS Lambda

セルフマネージド Apache Kafka イベントソース用に Lambda イベントソースマッピングを作成する

イベントソースマッピングの作成には、Lambda コンソール、AWS Command Line Interface (CLI)AWS SDK のいずれかを使用できます。

次のコンソールの手順で、Lambda 関数のトリガーとしてセルフマネージド Apache Kafka クラスターを追加します。内部でイベントソースマッピングリソースが作成されます。

前提条件

  • セルフマネージド型 Apache Kafka クラスター。Lambda は、Apache Kafka バージョン 0.10.1.0 以降をサポートしています。

  • セルフマネージド Kafka クラスターが使用する AWS リソースにアクセスするための許可を持つ実行ロール

セルフマネージド型 Kafka クラスターを追加する (コンソール)

セルフマネージド型 Apache Kafka クラスターと Kafka トピックを Lambda 関数のトリガーとして追加するには、次の手順を実行します。

Apache Kafka トリガーを Lambda 関数に追加するには (コンソール)
  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. Lambda 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. [Trigger configuration] (トリガー設定) で次の操作を実行します。

    1. Apache Kafka のトリガータイプを選択します。

    2. [Bootstrap servers] (ブートストラップサーバー) に、クラスター内の Kafka ブローカーのホストおよびポートのペアアドレスを入力し、[Add] (追加) を選択します。クラスター内の各 Kafka ブローカーで上記を繰り返します。

    3. [Topic name] (トピック名) に、クラスター内のレコードの保存に使用する Kafka トピックの名前を入力します。

    4. (オプション) [Batch size] (バッチサイズ) に、単一のバッチで取得できるメッセージの最大数を入力します。

    5. バッチウィンドウ では、Lambda が関数を呼び出すまで費やすレコード収集の最大時間 (秒) を入力します。

    6. (オプション) コンシューマーグループ ID で、参加する Kafka コンシューマーグループの ID を入力します。

    7. (オプション) [開始位置] で、[最新] を選択して最新のレコードからストリームの読み取りを開始するか、[水平トリム] を選択して使用可能な最も以前のレコードから開始するか、または [タイムスタンプ時点] を選択して読み取りを開始するタイムスタンプを指定します。

    8. (オプション) [VPC] で、Kafka クラスターに Amazon VPC を選択します。次に、[VPC subnets] (VPC サブネット) と [VPC security groups] (VPC セキュリティグループ) を選択します。

      VPC 内のユーザーのみがブローカーにアクセスする場合、この設定は必須です。

    9. (オプション) [Authentication] (認証) で [Add] (追加) をクリックしてから、以下を実行します。

      1. クラスター内の Kafka ブローカーのアクセスまたは認証プロトコルを選択します。

        • Kafka ブローカーが SASL/PLAIN 認証を使用する場合は、[BASIC_AUTH] を選択します。

        • ブローカーが SALS/SCRAM 認証を使用する場合は、[SASL_SCRAM] プロトコルのいずれかを選択します。

        • mTLS 認証を設定している場合は、[CLIENT_CERTIFICATE_TLS_AUTH] プロトコルを選択します。

      2. SASL/SCRAM または mTLS 認証の場合は、Kafka クラスターの認証情報が含まれる Secrets Manager シークレットキーを選択します。

    10. (オプション) Kafka ブローカーがプライベート CA によって署名された証明書を使用する場合、[Encryption] (暗号化) には Kafka ブローカーが TLS 暗号化に使用するルート CA 証明書が含まれる Secrets Manager シークレットを選択します。

      この設定は、SASL/SCRAM または SASL/PLAIN の TLS 暗号化、および mTLS 認証に適用されます。

    11. テスト用に無効状態のトリガーを作成する (推奨) には、[Enable trigger] (トリガーを有効にする) を解除します。または、トリガーをすぐに有効にするには、[Enable trigger] (トリガーを有効にする) を選択します。

  5. トリガーを追加するには、[Add] (追加) を選択します。

セルフマネージド型 Kafka クラスターを追加する (AWS CLI)

Lambda 関数のセルフマネージド型 Apache Kafka トリガーを作成および表示するには、次の AWS CLI コマンドの例を使用します。

SASL/SCRAM を使用する

Kafka ユーザーがインターネット経由で Kafka ブローカーにアクセスする場合は、SASL/SCRAM 認証用に作成した Secrets Manager シークレットを指定します。次の例では、create-event-source-mapping AWS CLI コマンドを使用して、my-kafka-function という名前の Lambda 関数を AWSKafkaTopic という名前の Kafka トピックにマッピングします。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

VPC の使用

Kafka ブローカーにアクセスするのが VPC 内の Kafka ユーザーのみである場合、VPC、サブネット、および VPC セキュリティグループを指定する必要があります。次の例では、create-event-source-mapping AWS CLI コマンドを使用して、my-kafka-function という名前の Lambda 関数を AWSKafkaTopic という名前の Kafka トピックにマッピングします。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

AWS CLI を使用したステータスの表示

次の例では、get-event-source-mapping AWS CLI コマンドを使用して、作成したイベントソースマッピングのステータスを記述します。

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7