セルフマネージド型の Apache Kafka で Lambda を使用する
このトピックでは、セルフマネージド型の Kafka クラスターで Lambda を使用する方法を説明します。AWS の用語集では、セルフマネージド型クラスターには、非 AWS のホストされた Kafka クラスターが含まれています。例えば、お使いの Kafka クラスターを、Confluent Cloud
この章では、セルフマネージド Apache Kafka クラスターを Lambda 関数のイベントソースとして使用する方法について説明します。セルフマネージド Apache Kafka と Lambda を統合する一般的なプロセスには、次の手順が含まれます。
-
クラスターとネットワークのセットアップ – まず、Lambda がクラスターにアクセスできるように、適切なネットワーク設定でセルフマネージド Apache Kafka クラスターを設定します。
-
イベントソースマッピングのセットアップ – 次に、Apache Kafka クラスターを関数に安全に接続するために Lambda が必要とするイベントソースマッピングリソースを作成します。
-
関数とアクセス許可のセットアップ – 最後に、関数を正しくセットアップし、実行ロールに必要なアクセス許可を付与します。
イベントソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは設定可能です (デフォルトでは 100 メッセージ)。詳細については、「バッチ処理動作」を参照してください。
セルフマネージド Apache Kafka イベントソースマッピングのスループットを最適化するには、プロビジョンドモードを設定します。プロビジョンドモードでは、イベントソースマッピングに割り当てられるイベントポーラーの最小数と最大数を定義できます。これにより、イベントソースマッピングが予期しないメッセージスパイクを処理する能力を向上させることができます。詳細については、「プロビジョンドモード」を参照してください。
警告
Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?
Kafka ベースのイベントソースの場合、Lambda はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。詳しくは、「バッチ処理動作」を参照してください。
セルフマネージド型 Kafka をイベントソースとして使用する方法の例については、AWS Compute Blog の Using self-hosted Apache Kafka as an event source for AWS Lambda
トピック
イベントの例
Lambda は、Lambda 関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Kafka トピックと Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。
{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }