

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon EventBridge Pipes ソース
<a name="eb-pipes-event-source"></a>

EventBridge Pipes は、さまざまなソースからイベントデータを受け取り、そのデータにオプションのフィルターとエンリッチメントを適用して、送信先に送信します。

ソースが EventBridge Pipes に送信されるイベントに順序を強制する場合、その順序は送信先までのプロセス全体を通して維持されます。

EventBridge Pipes のソースとして、次の AWS サービスを指定できます。
+ [Amazon DynamoDB ストリーム](eb-pipes-dynamodb.md)
+ [Amazon Kinesis Streams](eb-pipes-kinesis.md)
+ [Amazon MQ ブローカー](eb-pipes-mq.md)
+ [Amazon MSK ストリーム](eb-pipes-msk.md)
+ [Amazon SQS キュー](eb-pipes-sqs.md)
+ [Apache Kafka ストリーム](eb-pipes-kafka.md)

  Apache Kafka ストリームをパイプソースとして指定するときは、自分で管理する Apache Kafka ストリーム、または次のようなサードパーティープロバイダーによって管理される Apache Kafka ストリームを指定できます。
  + [https://www.confluent.io/](https://www.confluent.io/)
  + [https://www.cloudkarafka.com/](https://www.cloudkarafka.com/)
  + [https://redpanda.com/](https://redpanda.com/)

# EventBridge Pipes のソースとしての Amazon DynamoDB Stream
<a name="eb-pipes-dynamodb"></a>

EventBridge Pipes を使用して DynamoDB Streams のレコードを受け取ることができます。その後、オプションでこれらのレコードをフィルタリングまたは拡張してから、ターゲットに送信して処理できます。Amazon DynamoDB Streams に固有の設定があり、パイプをセットアップするときに選択できます。EventBridge Pipes は、データを送信先に送信するときに、データストリームのレコードの順序を維持します。

**重要**  
パイプのソースである DynamoDB ストリームを無効にすると、ストリームを再度有効にしても、そのパイプは使用できなくなります。この理由は、以下のとおりです。  
ソースが無効になっているパイプを停止、開始、または更新することはできません。
作成後のパイプを新しいソースで更新することはできません。DynamoDB ストリームを再度有効にすると、そのストリームには新しい Amazon リソースネーム (ARN) が割り当てられ、パイプとの関連付けはなくなります。
DynamoDB ストリームを再度有効にする場合は、ストリームの新しい ARN を使用して新しいパイプを作成する必要があります。

**イベントの例**

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「[Amazon EventBridge Pipes フィルタリング](eb-pipes-event-filtering.md)」を参照してください。

```
[
  {
    "eventID": "1",
    "eventVersion": "1.0",
    "dynamodb": {
      "Keys": {
        "Id": {
          "N": "101"
        }
      },
      "NewImage": {
        "Message": {
          "S": "New item!"
        },
        "Id": {
          "N": "101"
        }
      },
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "SequenceNumber": "111",
      "SizeBytes": 26
    },
    "awsRegion": "us-west-2",
    "eventName": "INSERT",
    "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable",
    "eventSource": "aws:dynamodb"
  },
  {
    "eventID": "2",
    "eventVersion": "1.0",
    "dynamodb": {
      "OldImage": {
        "Message": {
          "S": "New item!"
        },
        "Id": {
          "N": "101"
        }
      },
      "SequenceNumber": "222",
      "Keys": {
        "Id": {
          "N": "101"
        }
      },
      "SizeBytes": 59,
      "NewImage": {
        "Message": {
          "S": "This item has changed"
        },
        "Id": {
          "N": "101"
        }
      },
      "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "awsRegion": "us-west-2",
    "eventName": "MODIFY",
    "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable",
    "eventSource": "aws:dynamodb"
  }
]
```

## ポーリングストリームとバッチストリーム
<a name="pipes-ddb-polling"></a>

EventBridge は、レコードの DynamoDB Streams にあるシャードを 1 秒あたり 4 回の基本レートでポーリングします。レコードが利用可能になると、EventBridge はイベントを処理して結果を待機します。処理が成功すると、EventBridge は、さらに多くのレコードを受け取るまでポーリングを再開します。

デフォルトで、EventBridge はレコードが使用可能になると同時にパイプを呼び出します。EventBridge がソースから読み取るバッチにレコードが 1 つしかない場合、1 つのイベントだけを処理します。少数のレコードを処理しないようにするには、バッチ処理ウィンドウを設定して、最大 5 分間レコードをバッファリングするようにパイプに指示できます。イベントを処理する前に、EventBridge は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでソースからのレコードの読み取りを継続します。

**重要**  
パイプは、ストリームレコードを DynamoDB から Amazon SQS に少なくとも 1 回配信します。レコードがドロップされないようにするため、再試行ポリシーの最大有効期間を DynamoDB ストリームの保持期間よりも短く設定することをお勧めします。通常、DynamoDB ストリームはイベントを 24 時間保持します。

また、各シャードから複数のバッチを並行して処理することで、並行性を高めることもできます。EventBridge は、各シャードで最大 10 個のバッチを同時に処理できます。シャードあたりの同時バッチの数を増やしても、EventBridge はパーティションキーレベルでの順序立った処理を確実に行います。

`ParallelizationFactor` 設定を使用することで、複数のパイプの同時実行により、Kinesis または DynamoDB データストリームの 1 つのシャードを処理します。EventBridge がシャードからポーリングする同時バッチの数は、1 (デフォルト)～10 の並列化係数で指定できます。例えば、`ParallelizationFactor` を 2 に設定すると、最大 200 個の EventBridge Pipe の同時実行により、100 個の Kinesis データシャードを処理できます。これにより、データボリュームが揮発性で `IteratorAge` が高いときに処理のスループットをスケールアップすることができます。Kinesis 集約を使用している場合、並列化係数は機能しません。

## ポーリングとストリームの開始位置
<a name="pipes-ddb-stream-start-position"></a>

パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。
+ パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。
+ ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。

つまり、`LATEST` をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を `TRIM_HORIZON` として指定します。

## バッチ項目の失敗の報告
<a name="pipes-ddb-batch-failures"></a>

EventBridge がソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に チェックポイントが設定されます。正常に処理されたメッセージが失敗したバッチで再処理されないようにするには、成功したメッセージと失敗したメッセージを示すオブジェクトを返すようにエンリッチメントまたはターゲットを設定できます。これを部分的なバッチレスポンスと呼びます。

詳細については、「[部分的なバッチ処理失敗](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure)」を参照してください。

### 成功条件と失敗の条件
<a name="pipes-ddb-batch-failures-conditions"></a>

次のいずれかを返すと、EventBridge は完全な成功として処理します:
+ 空の `batchItemFailure` リスト
+ null の `batchItemFailure` リスト
+ 空の `EventResponse`
+ null の `EventResponse`

次のいずれかを返すと、EventBridge は完全な失敗として処理します:
+ 空の文字列 `itemIdentifier`
+ ヌル `itemIdentifier`
+ 不正なキー名を持つ `itemIdentifier`

EventBridge は、再試行戦略に基づいて失敗を再試行します。

# Amazon Kinesis ストリームを EventBridge Pipes のソースとして使用します。
<a name="eb-pipes-kinesis"></a>

EventBridge Pipes を使用して Kinesis データストリームでレコードを受信できます。その後、オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。パイプをセットアップするときに選択できる Kinesis 固有の設定があります。EventBridge Pipes は、データを送信先に送信するときに、データストリームのレコードの順序を維持します。

Kinesis データストリームは、[シャード](https://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html#shard)のセットです。各シャードには、一連のデータレコードが含まれます。**コンシューマー**は、Kinesis データストリームからのデータを処理するアプリケーションです。EventBridge Pipe を共有スループットコンシューマー (標準イテレーター) にマップすることも、[拡張ファンアウト](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html)を使用する専用スループットコンシューマーにマップすることもできます。

標準イテレーターの場合、EventBridge は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。このパイプでは、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小限に抑え、読み取りスループットを最大化するために、拡張ファンアウトを使用するデータストリームコンシューマーを作成できます。ストリームコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。専用のスループットは、多数のアプリケーションで同じデータを読み取っている場合や、大きなレコードでストリームを再処理する場合に役立ちます。Kinesis は、HTTP/2 経由で EventBridge にレコードをプッシュします。Kinesis Data Streams の詳細については、「[Amazon Kinesis Data Streams からのデータの読み取り](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html)」を参照してください。

**イベントの例**

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「[Amazon EventBridge Pipes フィルタリング](eb-pipes-event-filtering.md)」を参照してください。

```
[
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "approximateArrivalTimestamp": 1545084650.987,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  },
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
    "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
    "approximateArrivalTimestamp": 1545084711.166,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  }
]
```

## ポーリングストリームとバッチストリーム
<a name="pipes-ak-polling"></a>

EventBridge は、Kinesis ストリーム内のシャードをポーリングして、1 秒に 1 回の基本レートでレコードを探します。レコードが利用可能になると、EventBridge はイベントを処理して結果を待機します。処理が成功すると、EventBridge は、さらに多くのレコードを受け取るまでポーリングを再開します。

デフォルトで、EventBridge はレコードが使用可能になると同時にパイプを呼び出します。EventBridge がソースから読み取るバッチにレコードが 1 つしかない場合、1 つのイベントだけを処理します。少数のレコードを処理しないようにするには、バッチ処理ウィンドウを設定して、最大 5 分間レコードをバッファリングするようにパイプに指示できます。イベントを処理する前に、EventBridge は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでソースからのレコードの読み取りを継続します。

また、各シャードから複数のバッチを並行して処理することで、並行性を高めることもできます。EventBridge は、各シャードで最大 10 個のバッチを同時に処理できます。シャードあたりの同時バッチの数を増やしても、EventBridge はパーティションキーレベルでの順序立った処理を確実に行います。

`ParallelizationFactor` 設定を使用することで、複数のパイプの同時実行により、Kinesis または DynamoDB データストリームの 1 つのシャードを処理します。EventBridge がシャードからポーリングする同時バッチの数は、1 (デフォルト)～10 の並列化係数で指定できます。例えば、`ParallelizationFactor` を 2 に設定すると、最大 200 個の EventBridge Pipe の同時実行により、100 個の Kinesis データシャードを処理できます。これにより、データボリュームが揮発性で `IteratorAge` が高いときに処理のスループットをスケールアップすることができます。Kinesis 集約を使用している場合、並列化係数は機能しません。

## ポーリングとストリームの開始位置
<a name="pipes-ak-stream-start-position"></a>

パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。
+ パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。
+ ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。

つまり、`LATEST` をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を `TRIM_HORIZON` または `AT_TIMESTAMP` として指定します。

## バッチ項目の失敗の報告
<a name="pipes-ak-batch-failures"></a>

EventBridge がソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に チェックポイントが設定されます。正常に処理されたメッセージが失敗したバッチで再処理されないようにするには、成功したメッセージと失敗したメッセージを示すオブジェクトを返すようにエンリッチメントまたはターゲットを設定できます。これを部分的なバッチレスポンスと呼びます。

詳細については、「[部分的なバッチ処理失敗](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure)」を参照してください。

### 成功条件と失敗の条件
<a name="pipes-ak-batch-failures-conditions"></a>

次のいずれかを返すと、EventBridge は完全な成功として処理します:
+ 空の `batchItemFailure` リスト
+ null の `batchItemFailure` リスト
+ 空の `EventResponse`
+ null の `EventResponse`

次のいずれかを返すと、EventBridge は完全な失敗として処理します:
+ 空の文字列 `itemIdentifier`
+ ヌル `itemIdentifier`
+ 不正なキー名を持つ `itemIdentifier`

EventBridge は、再試行戦略に基づいて失敗を再試行します。

# EventBridge Pipes のソースとしての Amazon MQ メッセージブローカー。
<a name="eb-pipes-mq"></a>

EventBridge Pipes を使用して、Amazon MQ メッセージブローカーからレコードを受け取ることができます。その後、オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。Amazon MQ に固有の設定があり、パイプをセットアップするときに選択できます。EventBridge Pipes は、データを送信先に送信するときに、メッセージブローカーのレコードの順序を維持します。

Amazon MQ は、[Apache ActiveMQ](https://activemq.apache.org/) および [RabbitMQ](https://www.rabbitmq.com/) 用のマネージドメッセージブローカーサービスです。メッセージブローカーを使用すると、ソフトウェアアプリケーションおよびコンポーネントは、さまざまなプログラミング言語、オペレーティングシステム、および、トピックまたはキューをイベント送信先とする正式なメッセージングプロトコルを使って、通信できるようになります。

また Amazon MQ は、ActiveMQ か RabbitMQ ブローカーをインストールすることにより、ユーザーに代わって Amazon Elastic Compute Cloud (Amazon EC2)インスタンスを管理することもできます。ブローカーをインストールすると、さまざまなネットワークトポロジやその他のインフラストラクチャのニーズがインスタンスに提供されます。

Amazon MQ ソースには、次の設定制限があります。
+ **クロスアカウント** – EventBridge はクロスアカウント処理をサポートしていません。EventBridge を使用して、別の AWS アカウントにある Amazon MQ メッセージブローカーからのレコードを処理することはできません。
+ **認証** - ActiveMQ では、[ActiveMQ SimpleAuthenticationPlugin](https://activemq.apache.org/security#simple-authentication-plugin) のみサポートされています。RabbitMQ の場合、[PLAIN](https://www.rabbitmq.com/access-control.html#mechanisms) 認証メカニズムのみサポートされています。認証情報を管理するには、 AWS Secrets Managerを使用してください。ActiveMQ 認証の詳細については、「Amazon MQ デベロッパーガイド」の「[ Integrating ActiveMQ brokers with LDAP](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/security-authentication-authorization.html)」を参照してください。
+ **接続クォータ** - ブローカーは、ワイヤレベルプロトコルごとに最大の接続可能数を持っています。このクォータは、ブローカーインスタンスタイプに基づいています。これらの制限の詳細については、「Amazon MQ デベロッパーガイド」の「**\$1Amazon MQ のクォータ**」 の「[ブローカー](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-limits.html#broker-limits)」のセクションを参照してください。
+ **接続** - ブローカーをパブリックまたはプライベートの Virtual Private Cloud (VPC) に作成できます。プライベート VPC の場合、 パイプが VPC にアクセスしてメッセージを受信する必要があります。
+ **イベント送信先** - キューの送信先のみがサポートされます。ただし、仮想トピックを使用することができます。仮想トピックは、パイプと対話するとき、内部的にトピックとして、かつ外部的にキューとして動作します。詳細については、Apache ActiveMQ ウェブサイトの [Virtual Destinations](https://activemq.apache.org/virtual-destinations) および RabbitMQ ウェブサイトの [Virtual Hosts](https://www.rabbitmq.com/vhosts.html)を参照してください。
+ **ネットワークトポロジ** - ActiveMQ の場合、パイプに対して、１つの単一インスタンスまたはスタンバイブローカーがサポートされます。RabbitMQ の場合、パイプごとに、単一インスタンスブローカーまたはクラスターデプロイメントがサポートされます。単一インスタンスブローカーには、フェイルオーバーエンドポイントが必要です。これらのブローカーデプロイメントモードの詳細については、「Amazon MQ デベロッパーガイド」の「[Active MQ ブローカーアーキテクチャ](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-broker-architecture.html)」および「[Rabbit MQ ブローカーアーキテクチャ](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/rabbitmq-broker-architecture.html)」を参照してください。
+ **プロトコル** — サポートされるプロトコルは、使用する Amazon MQ の統合のタイプによって異なります。
  + ActiveMQ 統合の場合、EventBridge は OpenWire/Java Message Service (JMS) プロトコルを使用してメッセージを使用します。メッセージの使用は、他のプロトコルではサポートされていません。EventBridge は、JMS プロトコル内の `TextMessage` および `BytesMessage`オペレーションのみをサポートします。OpenWire プロトコルの詳細については、Apache ActiveMQ ウェブサイトの [ OpenWire ](https://activemq.apache.org/openwire.html) を参照してください。
  + RabbitMQ 統合の場合、EventBridge は AMQP 0-9-1 プロトコルを使ってメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。RabbitMQ による AMQP 0-9-1 プロトコルの実装の詳細については、RabbitMQ ウェブサイトの [AMQP 0-9-1 Complete Reference Guide](https://www.rabbitmq.com/amqp-0-9-1-reference.html) を参照してください。

EventBridge は、Amazon MQ がサポートする ActiveMQ および RabbitMQ の最新バージョンを自動的にサポートします。サポートされている最新バージョンについては、「Amazon MQ デベロッパーガイド」の「[Amazon MQ リリースノート](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-release-notes.html)」を参照してください。

**注記**  
デフォルトでは、Amazon MQ には毎週、ブローカー用のメンテナンスウィンドウがあります。その期間中、ブローカーは利用できません。スタンバイしていないブローカーの場合、EventBridge はウィンドウが終了するまでメッセージを処理しません。

**イベントの例**

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「[Amazon EventBridge Pipes フィルタリング](eb-pipes-event-filtering.md)」を参照してください。

**ActiveMQ**

```
[
  {
    "eventSource": "aws:amq",
    "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",    
    "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-west-2.amazonaws.com.rproxy.govskope.ca-37557-1234520418293-4:1:1:1:1",
    "messageType": "jms/text-message",
    "data": "QUJDOkFBQUE=",
    "connectionId": "myJMSCoID",
    "redelivered": false,
    "destination": {
      "physicalname": "testQueue"
    },
    "timestamp": 1598827811958,
    "brokerInTime": 1598827811958,
    "brokerOutTime": 1598827811959
  },
  {
    "eventSource": "aws:amq",
    "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",        
    "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-west-2.amazonaws.com.rproxy.govskope.ca-37557-1234520418293-4:1:1:1:1",
    "messageType": "jms/bytes-message",
    "data": "3DTOOW7crj51prgVLQaGQ82S48k=",
    "connectionId": "myJMSCoID1",
    "persistent": false,
    "destination": {
      "physicalname": "testQueue"
    },
    "timestamp": 1598827811958,
    "brokerInTime": 1598827811958,
    "brokerOutTime": 1598827811959
  }
]
```

**RabbitMQ**

```
[
  {
    "eventSource": "aws:rmq",
    "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
    "eventSourceKey": "pizzaQueue::/",
    "basicProperties": {
      "contentType": "text/plain",
      "contentEncoding": null,
      "headers": {
        "header1": {
          "bytes": [
            118,
            97,
            108,
            117,
            101,
            49
          ]
        },
        "header2": {
          "bytes": [
            118,
            97,
            108,
            117,
            101,
            50
          ]
        },
        "numberInHeader": 10
      },
      "deliveryMode": 1,
      "priority": 34,
      "correlationId": null,
      "replyTo": null,
      "expiration": "60000",
      "messageId": null,
      "timestamp": "Jan 1, 1970, 12:33:41 AM",
      "type": null,
      "userId": "AIDACKCEVSQ6C2EXAMPLE",
      "appId": null,
      "clusterId": null,
      "bodySize": 80
    },
    "redelivered": false,
    "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
  }
]
```

## コンシューマーグループ
<a name="pipes-mq-consumer-group"></a>

Amazon MQ と対話するため、EventBridge は、Amazon MQ ブローカーから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、パイプ UUID と同じ ID で作成されます。

Amazon MQ トソースの場合、EventBridge はレコードをまとめてバッチ処理し、それらを単一のペイロードで関数に送信します。動作を制御するには、バッチ処理ウィンドウとバッチサイズを設定できます。EventBridge は、以下のいずれかが発生すると、メッセージをプルします。
+ 処理されたレコードのペイロードサイズは最大 6 MB に達します。
+ バッチウィンドウの有効期限が切れる。
+ レコード数がバッチサイズ全体に達します。

EventBridge はバッチを単一のペイロードに変換してから、関数を呼び出します。メッセージは永続化も逆シリアル化もされません。その代わりに、コンシューマーグループはそれらをバイトの BLOB として取得します。次に、base64 でエンコードして JSON ペイロードにします。パイプがバッチ内のいずれかのメッセージに対してエラーを返すとEventBridge は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

## ネットワーク構成
<a name="pipes-mq-vpc-config"></a>

デフォルトではAmazon MQ、 ブローカーは `PubliclyAccessible` フラグを false に設定して作成されます。ブローカーにパブリック IP アドレスが与えられるのは、`PubliclyAccessible` が true に設定されている場合のみです。パイプでフルアクセスする場合、ブローカーはパブリックエンドポイントを使用するか、VPC へアクセスを提供する必要があります。

Amazon MQ ブローカーがパブリックにアクセス可能ではない場合、EventBridge は、Amazon MSK クラスターに関連付けられている Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。
+ Amazon MQ ブローカーの VPC にアクセスするため、EventBridge はソースのサブネットへのインターネットアクセス·(アウトバウンド)·を使用できます。パブリックサブネットの場合、これはマネージド [NAT ゲートウェイ](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)である必要があります。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。
+ EventBridge Pipes は を介したイベント配信もサポートしているため[AWS PrivateLink](https://aws.amazon.com/privatelink/)、パブリックインターネットを経由することなく、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。Pipes を使用すると、インターネットゲートウェイをデプロイしたり、ファイアウォールルールを設定したり、プロキシサーバーを設定したりすることなく、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。

  VPC エンドポイントを設定するには、「*AWS PrivateLink ユーザーガイド*」の「[VPC エンドポイントの作成](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws)」を参照してください。サービス名では、`com.amazonaws.region.pipes-data` を選択します。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。
+ インバウンドルール – ソース用に指定されたセキュリティグループに対して Amazon MQ ブローカーポート上のすべてのトラフィックを許可します。
+ アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソース用に指定されたセキュリティグループに対して Amazon MQ ブローカーポート上のすべてのトラフィックを許可します。

  ブローカーポートには以下が含まれます。
  + プレーンテキスト用 9092
  + TLS 用 9094
  + SASL 用 9096
  + IAM 用 9098

**注記**  
Amazon VPC の設定は、[Amazon MQ API](https://docs.aws.amazon.com/amazon-mq/latest/api-reference/resources.html) を使用して検出できます。セットアップ中に設定する必要はありません。

# Amazon Managed Streaming for Apache Kafka のトピックを EventBridge Pipes のソースとして使用します。
<a name="eb-pipes-msk"></a>

EventBridge Pipes を使用して、[Amazon MSK Serverless クラスタータイプを含む Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) [https://aws.amazon.com/msk/features/msk-serverless/](https://aws.amazon.com/msk/features/msk-serverless/)トピックからレコードを受信できます。オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。Amazon MSK に固有の設定があり、パイプをセットアップするときに選択できます。EventBridge Pipes は、データを送信先に送信するときに、メッセージブローカーのレコードの順序を維持します。

Amazon MSK は、Apache Kafka をストリーミングデータの処理に使用するアプリケーションを構築および実行できるようにするフルマネージドサービスです。Amazon MSK は、Apache Kafka を実行するクラスターのセットアップ、スケーリング、管理を簡素化します。Amazon MSK では、複数のアベイラビリティーゾーンと AWS Identity and Access Management (IAM) によるセキュリティのためにアプリケーションを設定できます。Amazon MSK は、Kafka の複数のオープンソースバージョンをサポートします。

Amazon MSK は、ソースとして Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。EventBridge は、ソースからの新しいメッセージを内部的にポーリングした後、ターゲットを同期的に呼び出します。EventBridge はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Apache Kafkaa ベースのソースの場合、EventBridge はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。

EventBridge は、パーティションごとにメッセージを順番に読み込みます。EventBridge は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。パイプのターゲットがバッチ内のいずれかのメッセージに対してエラーを返すとEventBridge は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

EventBridge は、ターゲットを呼び出すとき、イベント内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

**イベントの例**

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「[Amazon EventBridge Pipes フィルタリング](eb-pipes-event-filtering.md)」を参照してください。

```
[
  {
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",    
    "eventSourceKey": "mytopic-0",
    "topic": "mytopic",
    "partition": "0",
    "offset": 15,
    "timestamp": 1545084650987,
    "timestampType": "CREATE_TIME",
    "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", 
    "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "headers": [
      {
        "headerKey": [
          104,
          101,
          97,
          100,
          101,
          114,
          86,
          97,
          108,
          117,
          101
        ]
      }
    ]
  }
]
```

## ポーリングとストリームの開始位置
<a name="pipes-msk-stream-start-position"></a>

パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。
+ パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。
+ ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。

つまり、`LATEST` をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を `TRIM_HORIZON` として指定します。

## MSK クラスター認証
<a name="pipes-msk-cluster-permissions"></a>

EventBridge には、Amazon MSK クラスターにアクセスする、レコードを取得する、およびその他タスクを実行するための許可が必要です。Amazon MSK は、MSK クラスターへのクライアントアクセスを制御するためのいくつかのオプションをサポートしています。どの認証方法がいつ使用されるかについての詳細は、「[EventBridge でのブートストラップブローカーの選択方法](#pipes-msk-bootstrap-brokers)」を参照してください。

**Topics**
+ [非認証アクセス](#pipes-msk-permissions-none)
+ [SASL/SCRAM 認証](#pipes-msk-permissions-add-secret)
+ [IAM ロールベースの認証](#pipes-msk-permissions-iam-policy)
+ [相互 TLS 認証](#pipes-msk-permissions-mTLS)
+ [mTLS シークレットの設定](#smaa-auth-secret)
+ [EventBridge でのブートストラップブローカーの選択方法](#pipes-msk-bootstrap-brokers)

### 非認証アクセス
<a name="pipes-msk-permissions-none"></a>

開発には非認証アクセスのみを使用することをお勧めします。非認証アクセスは、クラスターの IAM ロールベース認証が無効になっている場合にのみ機能します。

### SASL/SCRAM 認証
<a name="pipes-msk-permissions-add-secret"></a>

Amazon MSK は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。EventBridge がクラスターに接続するには、認証情報 (サインイン認証情報) を AWS Secrets Manager シークレットに保存します。

Secrets Manager の使用に関する詳細については、「*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*」の「[AWS Secrets Managerを使用したユーザーネームとパスワードの認証](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html)」を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしません。

### IAM ロールベースの認証
<a name="pipes-msk-permissions-iam-policy"></a>

IAM を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。MSK クラスターで IAM 認証がアクティブ化されており、認証用のシークレットを指定しない場合、EventBridg はデフォルトで自動的に IAM 認証を使用します。IAM ユーザーまたはロールベースのポリシーを作成してデプロイするには、IAM コンソール、または API を使用します。詳細については、「*Amazon Managed Streaming for Apache Kafka Developer Guide*」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「[IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)」(IAM アクセスコントロール) を参照してください。

EventBridge が MSK クラスターに接続し、レコードを読み取り、その他の必要なアクションを実行できるようにするには、パイプの実行ロールに以下の許可を追加します。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeGroup",
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeClusterDynamicConfiguration"
            ],
            "Resource": [
            "arn:aws:kafka:us-east-1:123456789012:cluster/cluster-name/cluster-uuid",
    "arn:aws:kafka:us-east-1:123456789012:topic/cluster-name/cluster-uuid/topic-name",
    "arn:aws:kafka:us-east-1:123456789012:group/cluster-name/cluster-uuid/consumer-group-id"
            ]
        }
    ]
}
```

------

これらの許可は、特定のクラスター、トピック、およびグループにスコープできます。詳細については、「*Amazon Managed Streaming for Apache Kafka Developer Guide*」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「[Amazon MSK Kafka actions](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#kafka-actions)」(Amazon MSK Kafka アクション) を参照してください。

### 相互 TLS 認証
<a name="pipes-msk-permissions-mTLS"></a>

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

Amazon MSK の場合、EventBridge がクライアントとして機能します。MSK クラスターのブローカーで EventBridge を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。MSK クラスターは、EventBridge でブローカーを認証するために EventBridge にサーバー証明書を送信します。サーバー証明書は、 AWS 信頼ストアにある CA によって署名される必要があります。

Amazon MSK は自己署名のサーバー証明書をサポートしません。これは、Amazon MSK のすべてのブローカーが、EventBridge がデフォルトで信頼する [Amazon Trust Services CA](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html) によって署名された[パブリック証明書](https://www.amazontrust.com/repository/)を使用するためです。



Amazon MSK のための mTLS に関する詳細については、「*Amazon Managed Streaming for Apache Kafka Developer Guide*」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「[Mutual TLS Authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)」(相互 TLS 認証) を参照してください。

### mTLS シークレットの設定
<a name="smaa-auth-secret"></a>

CLIENT\$1CERTICATE\$1TLS\$1AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

**注記**  
EventBridge は、[PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) 形式にする必要があります。

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

暗号化されたプライベートキーには、以下の構造を使用します。

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

### EventBridge でのブートストラップブローカーの選択方法
<a name="pipes-msk-bootstrap-brokers"></a>

EventBridge は、クラスターで使用可能な認証方法、および認証用のシークレットが提供されているかどうかに基づき、[ブートストラップブローカー](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html)を選択します。mTLS または SASL/SCRAM のシークレットを指定すると、EventBridge は自動的にその認証方法を選択します。シークレットを指定しない場合、EventBridge は、クラスターでアクティブ化されている中で、最も強力な認証方法を選択します。以下は、EventBridge によるブローカー選択の優先度を、最も強力な認証から弱い認証の順に示したものです。
+ mTLS (mTLS 用のシークレットを提供)
+ SASL/SCRAM (SASL/SCRAM 用のシークレットを提供)
+ SASL IAM (シークレットが提供されておらず、IAM 認証がアクティブ)
+ 非認証の TLS (シークレットが提供されておらず、IAM 認証も非アクティブ)
+ プレーンテキスト (シークレットが提供されておらず、IAM 認証と非認証 TLS の両方が非アクティブ)

**注記**  
EventBridge から最も安全なブローカータイプへの接続ができない場合でも、 は別の (安全性の低い) ブローカータイプへの接続を試行しません。安全性の低いブローカータイプを EventBridge に選択させたい場合は、クラスターが使用している、より強力な認証方法をすべて無効にします。

## ネットワーク構成
<a name="pipes-msk-vpc-config"></a>

EventBridge は、Amazon MSK クラスターに関連付けられている Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。
+ Amazon MSK クラスターの VPC にアクセスするため、EventBridge はソースのサブネットへのインターネットアクセス·(アウトバウンド)·を使用できます。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。パブリックサブネットの場合は、VPC エンドポイントを使用する必要があります (以下で説明)。
+ EventBridge Pipes は を介したイベント配信もサポートしているため[AWS PrivateLink](https://aws.amazon.com/privatelink/)、パブリックインターネットを経由することなく、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。Pipes を使用すると、インターネットゲートウェイをデプロイしたり、ファイアウォールルールを設定したり、プロキシサーバーを設定したりすることなく、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。VPC エンドポイントを使用して、パブリックサブネット内の Kafka クラスターからの配信をサポートすることもできます。

  VPC エンドポイントを設定するには、「*AWS PrivateLink ユーザーガイド*」の「[VPC エンドポイントの作成](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws)」を参照してください。サービス名では、`com.amazonaws.region.pipes-data` を選択します。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。
+ インバウンドルール – ソース用に指定されたセキュリティグループに対して Amazon MSK ブローカーポート上のすべてのトラフィックを許可します。
+ アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソース用に指定されたセキュリティグループに対して Amazon MSK ブローカーポート上のすべてのトラフィックを許可します。

  ブローカーポートには以下が含まれます。
  + プレーンテキスト用 9092
  + TLS 用 9094
  + SASL 用 9096
  + IAM 用 9098

**注記**  
Amazon VPC の設定は、[Amazon MSK API](https://docs.aws.amazon.com/msk/1.0/apireference/resources.html) を使用して検出できます。セットアップ中に設定する必要はありません。

## カスタマイズ可能なコンシューマーグループ ID
<a name="pipes-msk-consumer-group"></a>

Apache Kafka をソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、パイプを結合したい Apache Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Apache Kafka レコード処理設定を他のコンシューマーから EventBridge に移行できます。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Apache Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、EventBridge は Apache Kafka トピックのメッセージをすべて受け取るわけではありません。EventBridge にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。

さらに、コンシューマーグループ ID を指定し、Apache Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、EventBridge は、パイプの `StartingPosition` パラメーターを無視します。代わりに、EventBridge はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Apache Kafka が既存のコンシューマーグループを見つけられない場合、EventBridge は指定された `StartingPosition` を使用してソースを設定します。

指定するコンシューマーグループ ID は、すべての Apache Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定してパイプを作成した後は、この値を更新することはできません。

## Amazon MSK ソースの Auto Scaling
<a name="pipes-msk-ops-scaling"></a>

初めて Amazon MSK ソースを作成するときは、EventBridge が Apache Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、EventBridge は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

EventBridge は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは EventBridge で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、EventBridge はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットのがオーバーロードすると、EventBridge はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得しパイプに送信するメッセージの数が減り、パイプへのワークロードが軽減されます。

# EventBridge Pipes のソースとして Apache Kafka ストリームを使用しています。
<a name="eb-pipes-kafka"></a>

Apache Kafka は、データパイプラインやストリーミング分析などのワークロードをサポートする、オープンソースのイベントストリーミングプラットフォームです。ユーザーは、[Amazon Managed Streaming for Apache Kafka](eb-pipes-msk.md) (Amazon MSK)、またはセルフマネージドの Apache Kafka クラスターを使用できます。AWS の用語では、*セルフマネージド*クラスターは、AWS によってホストされていない Apache Kafka クラスターを指します。これには、自分で管理するクラスターと、[https://www.confluent.io/](https://www.confluent.io/)、[https://www.cloudkarafka.com/](https://www.cloudkarafka.com/)、[https://redpanda.com/](https://redpanda.com/) などのサードパーティープロバイダーがホストするクラスターの両方が含まれます。

クラスターの他の AWS のホスティングオプションの詳細については、*AWS Big Data Blog* の「[AWS 上で Apache Kafka を実行するためのベストプラクティス](https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-kafka-on-aws/)」を参照してください。

ソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。EventBridge は、ソースからの新しいメッセージを内部的にポーリングした後、ターゲットを同期的に呼び出します。EventBridge はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Apache Kafkaa ベースのソースの場合、EventBridge はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。

EventBridge は、パイプを呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Apache Kafka トピックと Apache Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

**イベントの例**

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「[Amazon EventBridge Pipes フィルタリング](eb-pipes-event-filtering.md)」を参照してください。

```
[
  {
    "eventSource": "SelfManagedKafka",
    "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "eventSourceKey": "mytopic-0",
    "topic": "mytopic",
    "partition": 0,
    "offset": 15,
    "timestamp": 1545084650987,
    "timestampType": "CREATE_TIME",
    "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", 
    "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "headers": [
      {
        "headerKey": [
          104,
          101,
          97,
          100,
          101,
          114,
          86,
          97,
          108,
          117,
          101
        ]
      }
    ]
  }
]
```

## Apache Kafka クラスター認証
<a name="pipes-smaa-authentication"></a>

EventBridge Pipes は、セルフマネージド Apache Kafka クラスターで認証するための方法をいくつかサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Apache Kafka クラスターを設定しておいてください。Apache Kafka セキュリティの詳細については、Apache Kafka ドキュメントの「[Security](http://kafka.apache.org/documentation.html#security)」(セキュリティ) セクションを参照してください。

### VPC アクセス
<a name="pipes-smaa-auth-vpc"></a>

VPC 内の Apache Kafka ユーザーのみが Apache Kafka ブローカーにアクセスできるセルフマネージド Apache Kafka 環境を使用している場合は、Apache Kafka ソースで Amazon Virtual Private Cloud (Amazon VPC) を設定する必要があります。

### SASL/SCRAM 認証
<a name="pipes-smaa-auth-sasl"></a>

EventBridge Pipes は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。EventBridge Pipes は、暗号化された認証情報を送信してクラスターで認証します。SASL/SCRAM 認証の詳細については、「[RFC 5802](https://tools.ietf.org/html/rfc5802)」を参照してください。

EventBridge Pipes は TLS 暗号化による SASL/PLAIN 認証をサポートしています。SASL/PLAIN 認証では、EventBridge Pipes が認証情報をクリアテキスト (暗号化されていない状態) としてサーバーに送信します。

SASL 認証の場合は、サインイン認証情報をシークレットとして AWS Secrets Manager に保存します。

### 相互 TLS 認証
<a name="pipes-smaa-auth-mtls"></a>

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

セルフマネージド Apache Kafka の場合、Lambda はクライアントとして機能します。クライアント証明書を (Secrets Manager のシークレットとして) 設定することにより、Apache Kafka ブローカーで EventBridge Pipes を認証できるようにします。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。

Apache Kafka クラスターは、EventBridge Pipes にサーバー証明書を送信し、EventBridge Pipes で Apache Kafka ブローカーを認証します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、EventBridge Pipes 信頼トストア内の CA で署名する必要があります。プライベート CA/自己署名証明書の場合は、サーバルート CA 証明書を (Secrets Manager のシークレットとして) 設定します。EventBridge Pipes はルート証明書を使用して Apache Kafka ブローカーを検証します。

mTLS の詳細については、「[Introducing mutual TLS authentication for Amazon MSK as an source](https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-msk-as-an-event-source)」(ソースとしての Amazon MSK のための相互 TLS の紹介) を参照してください。

### クライアント証明書シークレットの設定
<a name="pipes-smaa-auth-secret"></a>

CLIENT\$1CERTICATE\$1TLS\$1AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

**注記**  
EventBridge Pipes は、[PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートしています。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) 形式にする必要があります。

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

暗号化されたプライベートキーには、以下の構造を使用します。

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

### サーバルート CA 証明書シークレットの設定
<a name="pipes-smaa-auth-ca-cert"></a>

このシークレットは、Apache Kafka ブローカーがプライベート CA によって署名された証明書で TLS 暗号化を使用する場合に作成します。TLS 暗号化は、VPC、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。

サーバールート CA 証明書シークレットには、PEM 形式の Apache Kafka ブローカーのルート CA 証明書が含まれるフィールドが必要です。以下は、このシークレットの構造を示す例です。

```
{
     "certificate": "-----BEGIN CERTIFICATE-----       
  MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx
  EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT
  HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs
  ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG...
  -----END CERTIFICATE-----"
```

## ネットワーク構成
<a name="pipes-kafka-vpc-config"></a>

プライベート VPC 接続を使用するセルフマネージド Apache Kafka 環境を使用している場合、EventBridge は Apache Kafka ブローカーに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。
+ Apache Kafka クラスターの VPC にアクセスするため、EventBridge はソースのサブネットへのインターネットアクセス (アウトバウンド) を使用できます。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。パブリックサブネットの場合は、VPC エンドポイントを使用する必要があります (以下で説明)。
+ EventBridge Pipes は [AWS PrivateLink](https://aws.amazon.com/privatelink/) を介したイベント配信もサポートしているため、パブリックインターネットを経由することなく、Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースからパイプターゲットにイベントを送信できます。パイプを使用して、インターネットゲートウェイのデプロイ、ファイアウォールルールの設定、プロキシサーバーの設定を必要とせずに、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド型 Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。VPC エンドポイントを使用して、パブリックサブネット内の Kafka クラスターからの配信をサポートすることもできます。

  VPC エンドポイントを設定するには、「*AWS PrivateLink ユーザーガイド*」の「[VPC エンドポイントの作成](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws)」を参照してください。サービス名では、`com.amazonaws.region.pipes-data` を選択します。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。
+ インバウンドルール – ソース用に指定されたセキュリティグループに対して Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。
+ アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソース用に指定されたセキュリティグループに対して Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。

  ブローカーポートには以下が含まれます。
  + プレーンテキスト用 9092
  + TLS 用 9094
  + SASL 用 9096
  + IAM 用 9098

## Apache Kafka ソースを使用したコンシューマーの自動スケーリング
<a name="pipes-kafka-ops-scaling"></a>

初めて Apache Kafka ソースを作成するときは、EventBridge が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、EventBridge は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

EventBridge は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは EventBridge で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、EventBridge はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

実行するためのベストプラクティターゲットのがオーバーロードすると、EventBridge はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

# Amazon Simple Queue Service を EventBridge Pipes のソースとして使用する
<a name="eb-pipes-sqs"></a>

EventBridge Pipes を使用して Amazon SQS キューからのレコードを受け取ることができます。その後、必要に応じてこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先に送信できます。

Amazon Simple Queue Service (Amazon SQS) キュー内のメッセージを処理するときは、パイプを使用します。EventBridge Pipes は、[標準のキュー](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html)および[ファーストイン、ファーストアウト (FIFO) ](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html)キューをサポートしています。Amazon SQS を使用すると、タスクをキューに送信して非同期的に処理することで、アプリケーションの 1 つのコンポーネントからタスクを任せることができます。

EventBridge はキューをポーリングし、パイプを、キューメッセージを含むイベントと共に同期的に呼び出します。EventBridge はメッセージをバッチで読み取り、バッチごとに、一度にパイプを呼び出します。パイプが正常にバッチを処理すると、EventBridge はキューからそのメッセージを削除します。

デフォルトでは、EventBridge はキュー内の最大 10 個のメッセージを一度にポーリングし、そのバッチをパイプに送信します。少数のレコードでパイプを呼び出さないようにするには、バッチウィンドウを設定して、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。パイプを呼び出す前に、EventBridge は次のいずれかが発生するまで Amazon SQS 標準キューからのメッセージをポーリングし続けます。
+ バッチウィンドウの有効期限が切れる。
+ 呼び出しペイロードのサイズがクォータに達する。
+ 最大バッチサイズに達する。

**注記**  
バッチウィンドウを使用していて、Amazon SQS キューのトラフィックが少ない場合、EventBridge はパイプを呼び出す前に最大 20 秒間待機することがあります。これは、バッチウィンドウを 20 秒未満に設定した場合であっても同様です。FIFO キューの場合、レコードには、重複除外と順序付けに関連する追加属性が含まれます。

EventBridge がバッチを読み取る際、メッセージはキューに留まりますが、キューの[可視性タイムアウト](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)の間は非表示です。パイプが正常にバッチを処理すると、EventBridge はそのメッセージをキューから削除します。デフォルトで、パイプがバッチを処理しているときにエラーが発生すると、そのバッチ内のすべてのメッセージが再びキューに表示されます。このため、パイプコードは、意図しない副次的影響を及ぼすことなく同じメッセージを複数回処理できるようにする必要があります。バッチアイテムの失敗をパイプのレスポンスに含めることで、この再処理動作を変更できます。以下の例では、2 つのメッセージのバッチのイベントを示しています。

**イベントの例**

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「[Amazon EventBridge Pipes フィルタリング](eb-pipes-event-filtering.md)」を参照してください。

**スタンダードキュー**

```
[
  {
    "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
    "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
    "body": "Test message.",
    "attributes": {
      "ApproximateReceiveCount": "1",
      "SentTimestamp": "1545082649183",
      "SenderId": "AIDAIENQZJOLO23YVJ4VO",
      "ApproximateFirstReceiveTimestamp": "1545082649185"
    },
    "messageAttributes": {},
    "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
    "awsRegion": "us-east-2"
  },
  {
    "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
    "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
    "body": "Test message.",
    "attributes": {
      "ApproximateReceiveCount": "1",
      "SentTimestamp": "1545082650636",
      "SenderId": "AIDAIENQZJOLO23YVJ4VO",
      "ApproximateFirstReceiveTimestamp": "1545082650649"
    },
    "messageAttributes": {},
    "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
    "awsRegion": "us-east-2"
  }
]
```

**FIFO キュー**

```
[
  {
    "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5",
    "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...",
    "body": "Test message.",
    "attributes": {
      "ApproximateReceiveCount": "1",
      "SentTimestamp": "1573251510774",
      "SequenceNumber": "18849496460467696128",
      "MessageGroupId": "1",
      "SenderId": "AIDAIO23YVJENQZJOL4VO",
      "MessageDeduplicationId": "1",
      "ApproximateFirstReceiveTimestamp": "1573251510774"
    },
    "messageAttributes": {},
    "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo",
    "awsRegion": "us-east-2"
  }
]
```

## スケーリングと処理
<a name="pipes-sqs-scaling"></a>

標準キューの場合、EventBridge は[ロングポーリング](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html)を使用して、キューがアクティブになるまでキューをポーリングします。メッセージが利用可能な場合、EventBridge は最大 5 個のバッチを読み込み、それらをパイプに送信します。メッセージがまだ利用可能な場合、EventBridge はバッチを読み込むプロセスの数を 1 分あたり最大 300 インスタンスまで増やします。パイプによって同時に処理できるバッチの最大数は 1,000 です。

FIFO キューの場合、EventBridge は、受信した順序でメッセージをパイプに送信します。FIFO キューにメッセージを送信する場合、[メッセージグループ ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) を指定します。Amazon SQS では、同じグループ内のメッセージを EventBridge に順番に配信することが容易になります。EventBridge は受信したメッセージをグループにソートし、グループに対して一度に 1 つのバッチのみを送信します。パイプがエラーを返す場合、そのパイプは、EventBridge が同じグループから追加のメッセージを受信する前に、対象メッセージですべての再試行を試みます。

## EventBridge Pipes で使用するキューの設定
<a name="pipes-sqs-configure-queue"></a>

[Amazon SQS キューを作成して](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-create-queue.html)、パイプのソースとして機能できるようにします。次に、パイプがイベントの各バッチを処理できるよう、またスケールアップ時に EventBridge がスロットリングエラーに反応して再試行できるように、時間を見越してキューで設定します。

パイプがレコードの各バッチを処理するために十分な時間を取るため、ソースキューの可視性タイムアウトは、パイプのエンリッチメントとターゲットコンポーネントのランタイムを組み合わせた時間の少なくとも 6 倍に設定してください。追加の時間は、パイプが前のバッチの処理中にスロットリングされた場合に、EventBridge が再試行することを可能にします。

パイプがメッセージの処理に何回も失敗する場合、Amazon SQS はこのメッセージを[デッドレターキュー](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html)に送信できます。パイプがエラーを返すと、EventBridge はそのメッセージをキューで保持します。可視性タイムアウトが発生すると、EventBridge はメッセージをもう一度受信します。多数の受信後に 2 番目のキューにメッセージを送信するには、ソースキューにデッドレターキューを設定します。

**注記**  
パイプではなく、ソースキューのデッドレターキューを設定するようにしてください。パイプで設定したデッドレターキューは、ソースキューではなく、パイプの非同期呼び出しキューに使用されます。

パイプからエラーが返された場合や、同時実行数の最大値に達しているために関数を呼び出せない場合は、追加の試行で処理が成功する場合があります。メッセージをデッドレターキューに送信する前にメッセージが処理される確率を高めるには、送信元キューのリドライブポリシーの `maxReceiveCount` を **5** 以上に設定します。

## バッチ項目の失敗の報告
<a name="pipes-sqs-batch-failures"></a>

EventBridge がソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に チェックポイントが設定されます。正常に処理されたメッセージが失敗したバッチで再処理されないようにするには、成功したメッセージと失敗したメッセージを示すオブジェクトを返すようにエンリッチメントまたはターゲットを設定できます。これを部分的なバッチレスポンスと呼びます。

詳細については、「[部分的なバッチ処理失敗](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure)」を参照してください。

### 成功条件と失敗の条件
<a name="pipes-sqs-batch-failures-conditions"></a>

次のいずれかを返すと、EventBridge は完全な成功として処理します:
+ 空の `batchItemFailure` リスト
+ null の `batchItemFailure` リスト
+ 空の `EventResponse`
+ null の `EventResponse`

次のいずれかを返すと、EventBridge は完全な失敗として処理します:
+ 空の文字列 `itemIdentifier`
+ ヌル `itemIdentifier`
+ 不正なキー名を持つ `itemIdentifier`

EventBridge は、再試行戦略に基づいて失敗を再試行します。