Step Functions のアクティビティについて - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Step Functions のアクティビティについて

Step Functions アクティビティを使用すると、実際の作業が Step Functions の外部で実行されているワーカーによって行われるタスクをステートマシンに設定できます。例えば、ワーカープログラムを Amazon Elastic Compute Cloud (Amazon EC2)、Amazon Elastic Container Service (Amazon ECS)、またはモバイルデバイスでも実行できます。

概要:

ではAWS Step Functions、アクティビティは、どこか (アクティビティワーカーと呼ばれる) で実行されているコードをステートマシン内の特定のタスクに関連付ける方法です。Step Functions コンソールを使用して、または CreateActivity を呼び出すことでアクティビティを作成できます。これによって、タスクの状態の Amazon リソースネーム (ARN) が提供されます。この ARN を使用して、アクティビティワーカーでの作業のタスク状態をポーリングします。

注記

アクティビティはバージョニングされず、下位互換性があることが予期されます。アクティビティ定義に下位互換性のない変更を加える必要がある場合は、一意の名前を使用して、Step Functions で新しいアクティビティを作成する必要があります。

アクティビティワーカーは、Amazon EC2 インスタンス、AWS Lambda 関数、モバイルデバイスで実行されているアプリケーションです。任意の場所にホストされている、HTTP 接続を行うことができる任意のアプリケーションです。Step Functions がアクティビティタスクの状態に達すると、ワークフローはアクティビティワーカーのタスクのポーリングを待機します。アクティビティワーカーは、GetActivityTask を使用して Step Functions をポーリングし、関連するアクティビティの ARN を送信します。GetActivityTaskinput (タスクの JSON 入力文字列) およびtaskToken (タスクの一意の識別子) を含むレスポンスを返します。アクティビティワーカーは作業の完了後に SendTaskSuccess または SendTaskFailure を使用して成功または失敗のレポートを提供します。これら 2 つの呼び出しは、GetActivityTask によって提供される taskToken を使用して結果をタスクと関連付けます。

アクティビティタスクに関連する API

Step Functions では、アクティビティの作成と一覧表示、タスクのリクエスト、ワーカーの結果に基づくステートマシンのフロー管理のための API を提供しています。

以下は、アクティビティに関連する Step Functions API です。

注記

GetActivityTask を使用してアクティビティタスクをポーリングすると、一部の実装でレイテンシーが発生する可能性があります。「アクティビティタスクのポーリング時のレイテンシーを回避する」を参照してください。

アクティビティタスクの完了を待機中

タスク定義で TimeoutSeconds を設定して、ステートが待機する時間を設定します。タスクをアクティブおよび待機中にしておくためには、TimeoutSeconds で設定した時間内に SendTaskHeartbeat を使用して、アクティビティワーカーから定期的にハートビートを送信します。長いタイムアウト期間を設定し、能動的にハートビートを送信することで、Step Functions のアクティビティは実行の完了を最大で 1 年間待機できます。

例えば、長いプロセスの結果を待機するワークフローが必要な場合は、以下の操作を実行します。

  1. コンソールを使用して、または CreateActivity を呼び出してアクティビティを作成します。アクティビティ ARN を書き留めます。

  2. ステートマシンの定義でアクティビティのタスク状態の ARN を参照し、TimeoutSeconds を設定します。

  3. GetActivityTask を使用して作業をポーリングするアクティビティワーカーを実装し、そのアクティビティ ARN を参照します。

  4. ステートマシンのタスク定義の HeartbeatSeconds で設定した時間内に定期的に SendTaskHeartbeat を使用して、タスクがタイムアウトしないようにします。

  5. ステートマシンの実行を開始します。

  6. アクティビティワーカーのプロセスを開始します。

実行はアクティビティタスクの状態で一時停止して、アクティビティワーカーのタスクのポーリングを待機します。taskToken がアクティビティワーカーに提供されると、ワークフローはステータス提供のため SendTaskSuccess または SendTaskFailure を待機します。TimeoutSeconds で設定された時間より前に、これらのいずれかまたは SendTaskHeartbeat 呼び出しを実行が受け取らない場合、実行は失敗し、実行履歴に ExecutionTimedOut イベントが含まれることになります。

例: Ruby のアクティビティワーカー

以下のアクティビティワーカーコードの例には、ポーラーおよびアクティビティワーカー用に構成可能な数のスレッドを含む、コンシューマー/プロデューサーパターンが実装されています。ポーラースレッドは、Step Functions のアクティビティタスクを常にロングポーリングします。アクティビティタスクが取得されると、アクティビティスレッドの区切られたブロックキューを通じて渡され、選択されます。

次のコードは、この例の Ruby アクティビティワーカーのメインエントリポイントです。

require_relative '../lib/step_functions/activity' credentials = Aws::SharedCredentials.new region = 'us-west-2' activity_arn = 'ACTIVITY_ARN' activity = StepFunctions::Activity.new( credentials: credentials, region: region, activity_arn: activity_arn, workers_count: 1, pollers_count: 1, heartbeat_delay: 30 ) # Start method block contains your custom implementation to process the input activity.start do |input| { result: :SUCCESS, echo: input['value'] } end

アクティビティ ARN とリージョンを指定する必要があります。コードには、スレッド数やハートビートの遅延など、設定できるデフォルトが含まれています。

Item 説明

require_relative

次の例のアクティビティワーカーコードへの相対パス。

region

AWSアクティビティのリージョン。

workers_count

アクティビティワーカーのスレッドの数。ほとんどの実装では、10~20 のスレッドで十分です。アクティビティの処理に時間がかかればかかるほど、必要なスレッドが多くなります。試算するには、1 秒あたりのプロセスアクティビティの数と、99 パーセンタイルアクティビティ処理レイテンシー (秒単位) を乗算してください。

pollers_count

ポーラーのスレッドの数。ほとんどの実装では、10~20 のスレッドで十分です。

heartbeat_delay

ハートビート間の遅延 (秒)。

input アクティビティの実装ロジック。

次のステップ

アクティビティワーカーを使用するステートマシンの作成の詳細については、以下を参照してください。