

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 機能の処理
<a name="feature-store-feature-processing"></a>

Amazon SageMaker Feature Store 特徴量処理 は、raw データを機械学習 (ML) 機能に変換できる機能です。バッチデータソースのデータを変換して特徴量グループに取り込むことができる Feature Processor SDK を提供します。特徴量ストアはこの機能を使って、コンピューティング環境のプロビジョニングや、データの読み込みと取り込みを行う Pipelines の作成とメンテナンスなど、基盤となるインフラストラクチャを管理します。これにより、変換関数 (製品ビュー数、トランザクション金額の平均値など)、ソース (この変換を適用する場所)、シンク (計算された特徴量値を書き込む場所) を含む特徴量プロセッサ定義に集中できます。

Feature Processor Pipelines は Pipelines のパイプラインです。Pipelines として、コンソールで SageMaker AI リネージを使用してスケジュール済みの Feature Processor パイプラインを追跡することもできます。SageMaker AI のリネージの詳細については、「[Amazon SageMaker ML 系統追跡](lineage-tracking.md)」を参照してください。これには、スケジュール済みの実行の追跡、リネージュの視覚化による特徴量のデータソースまでの追跡、単一環境での共有特徴量プロセッサの表示が含まれます。コンソールで特徴量ストアを使用する方法の詳細については、「[コンソールからパイプライン実行を表示する](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio)」を参照してください。

**Topics**
+ [Feature Store Feature Processor SDK](feature-store-feature-processor-sdk.md)
+ [Feature Store Feature Processor のリモートでの実行](feature-store-feature-processor-execute-remotely.md)
+ [Feature Store Feature Processor パイプラインの作成と実行](feature-store-feature-processor-create-execute-pipeline.md)
+ [Feature Processor パイプラインのスケジュール済みの実行とイベントベースの実行](feature-store-feature-processor-schedule-pipeline.md)
+ [Amazon SageMaker Feature Store Feature Processor パイプラインのモニタリング](feature-store-feature-processor-monitor-pipeline.md)
+ [IAM アクセス許可と実行ロール](feature-store-feature-processor-iam-permissions.md)
+ [Feature Processor の制約、制限、クォータ](feature-store-feature-processor-quotas.md)
+ [データソース](feature-store-feature-processor-data-sources.md)
+ [一般的なユースケース向けの特徴量処理 コードの例](feature-store-feature-processor-examples.md)

# Feature Store Feature Processor SDK
<a name="feature-store-feature-processor-sdk"></a>

変換関数を `@feature_processor` デコレータでデコレートして、Feature Store Feature Processor 定義を宣言します。SageMaker AI SDK for Python (Boto3) は、設定済みの入力データソースからデータを自動的に読み込み、デコレーションションされた変換関数を適用して、変換されたデータをターゲット特徴量グループに取り込みます。デコレーションされた変換関数は `@feature_processor` デコレータの期待するシグネチャに従わなければなりません。`@feature_processor` デコレータの詳細については、「 Read the Docs」の「Amazon SageMaker Feature Store」で「[@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator)」を参照してください。

`@feature_processor` デコレータを使用すると、変換関数は Spark ランタイム環境で実行されます。この環境では、関数に提供される入力引数とその戻り値は Spark DataFrames です。変換関数の入力パラメータの数は、`@feature_processor` デコレータに設定された入力の数と一致する必要があります。

`@feature_processor` デコレータの詳細については、「[Feature Processor Feature Store SDK for Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor)」を参照してください。

次のコードは `@feature_processor` デコレータの使用方法の基本的な例です。より具体的な使用例については、「[一般的なユースケース向けの特徴量処理 コードの例](feature-store-feature-processor-examples.md)」を参照してください。

Feature Processor SDK は、次のコマンドを使用して SageMaker Python SDK とその追加機能からインストールできます。

```
pip install sagemaker[feature-processor]
```

次の例では、`us-east-1` はリソースのリージョン、`111122223333` はリソース所有者のアカウント ID、`your-feature-group-name` は特徴量グループ名です。

以下は基本的な特徴量プロセッサの定義です。`@feature_processor` デコレータは Amazon S3 からの CSV 入力を読み込んで変換関数 (例: `transform`) に提供するように設定し、それを特徴量グループに取り込む準備をします。最後の行ではこの関数が実行されます。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`@feature_processor` パラメータには以下が含まれます。
+ `inputs` (List[str]): Feature Store Feature Processor.で使用されるデータソースのリスト。データソースが特徴量グループであるか、Amazon S3 に保存されている場合は、Feature Store が提供する特徴量プロセッサのデータソース定義を使用できる場合があります。Feature Store が提供するデータソース定義の全リストについては、「Read the Docs」の「Amazon SageMaker Feature Store」で「[Feature Processor Data Source](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source)」を参照してください。ドキュメントをお読みください。
+ `output` (str): デコレーションされた関数の出力を取り込む特徴量グループの ARN。
+ `target_stores` (オプション [List[str]]): 出力に取り込むストアのリスト (例えば、`OnlineStore` または `OfflineStore`)。指定しない場合、データは出力特徴量グループで有効になっているすべてのストアに取り込まれます。
+ `parameters` (Dict[str, Any]): 変換関数に提供されるディクショナリ。
+ `enable_ingestion` (bool): 変換関数の出力を出力特徴量グループに取り込むかどうかを示すフラグ。このフラグは開発段階で役立ちます。指定しない場合、取り込みは有効になります。

オプションのラップ関数パラメータ (関数シグネチャで指定されている場合は引数として提供) には以下が含まれます。
+ `params` (Dict[str, Any]): `@feature_processor` パラメータで定義されているディクショナリ。また、`scheduled_time` パラメータなど、キー `system` で参照できるシステム設定パラメータも含まれています。
+ `spark` (SparkSession): Spark アプリケーション用に初期化された SparkSession インスタンスへの参照。

以下のコードは、`params` パラメータと `spark` パラメータの使用例です。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

`scheduled_time` システムパラメータ (関数の `params` 引数で指定) は、実行それぞれの再試行をサポートする重要な値です。この値は、Feature Processor の実行を一意に識別するのに役立ち、コードの実際の実行時間とは無関係に入力範囲を確保するために、日付範囲ベースの入力 (過去 24 時間分のデータのみをロードするなど) の基準点として使用できます。Feature Processor がスケジュールに従って実行されている場合 (「[Feature Processor パイプラインのスケジュール済みの実行とイベントベースの実行](feature-store-feature-processor-schedule-pipeline.md)」を参照)、その値は実行予定時刻に固定されます。SDK の実行 API を使用して同期実行中に引数をオーバーライドできます。これにより、データのバックフィルや、失敗した過去の実行の再実行などのユースケースに対応できます。Feature Processor が他の方法で実行されている場合、その値は現在の時刻です。

Spark コードの作成については、「[ Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)」を参照してください。

一般的なユースケースのその他のコードサンプルについては、「[一般的なユースケース向けの特徴量処理 コードの例](feature-store-feature-processor-examples.md)」を参照してください。

`@feature_processor` でデコレーションされた変換関数は値を返さないことに注意してください。関数をプログラムでテストするには、`@feature_processor` デコレータを削除するかモンキーパッチを適用して、ラップ関数へのパススルーとして機能させることができます。`@feature_processor` デコレータの詳細については、「[Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html)」を参照してください。

# Feature Store Feature Processor のリモートでの実行
<a name="feature-store-feature-processor-execute-remotely"></a>

ローカルで使用できるものよりも強力なハードウェアを必要とする大規模なデータセットで Feature Processor を実行するには、コードを `@remote` デコレータでデコレートして、ローカルの Python コードを単一ノードまたは複数ノードの分散型 SageMaker トレーニングジョブとして実行できます。コードを SageMaker トレーニングジョブとして実行する方法の詳細については、「[ローカルコードを SageMaker トレーニングジョブとして実行する](train-remote-decorator.md)」を参照してください。

以下は、`@remote` デコレータと `@feature_processor` デコレータの使用例です。

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

この `spark_config` パラメータは、リモートジョブが Spark アプリケーションとして実行されることを示します。この `SparkConfig` インスタンスを使用して Spark 設定を行い、Python ファイル、JAR、ファイルなどの Spark アプリケーションへの依存関係を追加できます。

特徴量処理コードを開発する際の反復処理を高速化するために、`@remote` デコレータに `keep_alive_period_in_seconds` 引数を指定して、設定したリソースをウォームプールで保持し、その後のトレーニングジョブに備えることができます。ウォームプールの詳細については、API リファレンスガイドの「`[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)`」を参照してください。

次のコードはローカル `requirements.txt:` の例です。

```
sagemaker>=2.167.0
```

これにより、`@feature-processor`.の注釈が付けられたメソッドを実行するために必要な、対応する SageMaker SDK バージョンがリモートジョブにインストールされます。

# Feature Store Feature Processor パイプラインの作成と実行
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

Feature Processor SDK は、Feature Processor 定義をフルマネージド SageMaker AI パイプラインに昇格させるための API を提供しています。Pipelines の詳細については、「[Pipelines の概要](pipelines-overview.md)」を参照してください。Feature Processor 定義を SageMaker AI パイプラインに変換するには、`to_pipeline` API と Feature Processor 定義を組み合わせて使用します。Feature Processor 定義の実行をスケジュールしたり、CloudWatch メトリクスで運用面からモニタリングしたり、EventBridge と統合してイベントソースまたはサブスクライバーとして機能させたりできます。Pipelines で作成したパイプラインのモニタリングの詳細については、「[Amazon SageMaker Feature Store Feature Processor パイプラインのモニタリング](feature-store-feature-processor-monitor-pipeline.md)」を参照してください。

Feature Processor パイプラインを表示するには、「[コンソールからパイプライン実行を表示する](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio)」を参照してください。

関数にも `@remote` デコレータがデコレートされていれば、その設定は Feature Processor パイプラインに引き継がれます。`@remote` デコレータを使用して、コンピューティングインスタンスのタイプと数、ランタイムの依存関係、ネットワークとセキュリティの設定などの高度な設定を指定できます。

次の例では、`to_pipeline` API と `execute` API を使用します。

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

`to_pipeline` API は意味的にはアップサート操作です。パイプラインが既に存在する場合は更新し、存在しない場合はパイプラインを作成します。

`to_pipeline` API は必要に応じて Feature Processor 定義を含むファイルを参照する Amazon S3 URI を受け入れ、それを Feature Processor パイプラインに関連付けて、SageMaker AI 機械学習リネージ内の変換関数とそのバージョンを追跡します。

アカウント内のすべての Feature Processor パイプラインのリストを取得するには、`list_pipelines` API を使用します。`describe` API へのその後のリクエストでは、Pipelines やスケジュールの詳細などの Feature Processor パイプラインに関連する詳細が返されます。

次の例では、`list_pipelines` API と `describe` API を使用します。

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# Feature Processor パイプラインのスケジュール済みの実行とイベントベースの実行
<a name="feature-store-feature-processor-schedule-pipeline"></a>

Amazon SageMaker Feature Store Feature Processing パイプラインの実行は、事前設定されたスケジュールに基づいて、または別の AWS サービスイベントの結果として、自動的かつ非同期的に開始するように設定できます。例えば、Feature Processing パイプラインを毎月 1 日に実行するようにスケジュールを指定したり、2 つのパイプラインを連結して、ソースパイプラインの実行が完了した後にターゲットパイプラインが自動的に実行されるように指定したりできます。

**Topics**
+ [スケジュールベースの実行](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [イベントベースの実行](#feature-store-feature-processor-schedule-pipeline-event-based)

## スケジュールベースの実行
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

Feature Processor SDK は、Amazon EventBridge スケジューラとの統合により、Feature Processor パイプラインを定期的に実行するための [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API を提供します。スケジュールは、Amazon EventBridge でサポートされているのと同じ式で [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression) パラメータを使用して、`at` 式、`rate` 式、`cron` 式を使って指定できます。schedule API は、スケジュールが既に指定されている場合は更新し、指定されていない場合は作成するという点で、意義的には upsert オペレーションです。EventBridge 式と例の詳細については、「EventBridge スケジューラユーザーガイド」の「[Schedule types on EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html)」を参照してください。

次の例では、`at` 式、`rate` 式、`cron` 式を使用して、Feature Processor [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API を使用しています。

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

`schedule` API の日付と時刻の入力のデフォルトタイムゾーンは UTC です。EventBridge Scheduler のスケジュール式の詳細については、「EventBridge Scheduler API Reference」ドキュメントの「[https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)」を参照してください。

スケジュール済み Feature Processor パイプラインを実行すると、変換関数にスケジュール済みの実行時間が与えられ、これをべき等性トークンまたは日付範囲ベースの入力の固定基準点として使用できます。スケジュールを無効 (一時停止) にしたり再び有効にしたりするには、[https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API の `state` パラメータをそれぞれ `‘DISABLED’` または `‘ENABLED’` にして使用します。

Feature Processor の詳細については、「[Feature Processor SDK データソース](feature-store-feature-processor-data-sources-sdk.md)」を参照してください。

## イベントベースの実行
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

Feature Processing パイプラインは、 AWS イベントが発生すると自動的に実行されるように設定できます。Feature Processing SDK は、ソースイベントのリストとターゲットパイプラインのリストを受け入れる [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) 関数を提供しています。ソースイベントは、パイプラインと[実行ステータス](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus)イベントを指定する [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent) のインスタンスである必要があります。

`put_trigger` 関数は、イベントをルーティングするように Amazon EventBridge ルールとターゲットを設定し、 AWS イベントに応答する EventBridge イベントパターンを指定できます。これらの概念の詳細については、「Amazon EventBridge [ルール](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)」、[「ターゲット](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)」、[「イベントパターン](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)」を参照してください。

トリガーは有効にしたり無効にしたりできます。EventBridge は、`put_trigger` API の `role_arn` パラメータで指定されたロールを使用して、ターゲットパイプラインの実行を開始します。SDK が Amazon SageMaker Studio Classic または Notebook 環境で使用されている場合、デフォルトで実行ロールが使用されます。実行ロールを取得する方法の詳細については、「[実行ロールを取得する](sagemaker-roles.md#sagemaker-roles-get-execution-role)」を参照してください。

次の例では、以下を設定します。
+ `to_pipeline` API を使用する SageMaker AI パイプライン。ターゲットパイプライン名 (`target-pipeline`) と変換関数 (`transform`) を受け取ります。Feature Processor と transform 関数の詳細については、「[Feature Processor SDK データソース](feature-store-feature-processor-data-sources-sdk.md)」を参照してください。
+ `put_trigger` API を使用するトリガー。イベントの `FeatureProcessorPipelineEvent` とターゲットパイプライン名 (`target-pipeline`) を受け取ります。

  `FeatureProcessorPipelineEvent` は、ソースパイプライン (`source-pipeline`) のステータスが `Succeeded` になった場合のトリガーを定義します。Feature Processor Pipeline イベント関数の詳細については、特徴量ストアの「ドキュメントを読む」の「[https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent)」を参照してください。

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

イベントベースのトリガーを使用して、Feature Processor パイプラインの継続的な実行と自動再試行を作成する例については、「[イベントベースのトリガーを使用した継続的な実行と自動再試行](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries)」を参照してください。

イベントベースのトリガーを使用して、継続的な*ストリーミング*とイベントベースのトリガーで自動再試行を作成する例については、「[ストリーミングのカスタムデータソースの例](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming)」を参照してください。

# Amazon SageMaker Feature Store Feature Processor パイプラインのモニタリング
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS には、Amazon SageMaker AI リソースとアプリケーションをリアルタイムで監視し、問題が発生したときに報告し、必要に応じて自動アクションを実行するためのモニタリングツールが用意されています。特徴量ストア Feature Processor パイプラインは Pipelines であるため、標準のモニタリングメカニズムと統合を利用できます。実行の失敗などの運用メトリクスは、Amazon CloudWatch メトリクスと Amazon EventBridge イベントを介してモニタリングできます。

Feature Store Feature Processor をモニタリングして運用する方法の詳細については、以下のリソースを参照してください。
+ [Amazon SageMaker AI での AWS リソースのモニタリング](monitoring-overview.md) - SageMaker AI リソースのアクティビティのモニタリングと監査に関する一般的なガイダンス
+ [SageMaker パイプラインメトリクス](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines) - Pipelines が発行する CloudWatch メトリクス
+ [SageMaker パイプライン実行の状態変更](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline) - Pipelines と実行に発行される EventBridge イベント
+ [Amazon SageMaker Pipelines のトラブルシューティング](pipelines-troubleshooting.md) - Pipelines の一般的なデバッグとトラブルシューティングのヒント

Feature Store Feature Processor の実行ログは、Amazon CloudWatch Logs の `/aws/sagemaker/TrainingJobs` ロググループにあります。このロググループでは、ルックアップ規則を使用して実行ログストリームを確認できます。`@feature_processor` で修飾された関数を直接呼び出して作成された実行は、ローカルの実行環境コンソールでログを確認できます。` @remote` で修飾された実行の場合、CloudWatch Logs ストリーム名には関数の名前と実行タイムスタンプが含まれます。Feature Processor パイプライン実行の場合、ステップの CloudWatch Logs ストリームには `feature-processor` の文字列とパイプライン実行 ID が含まれます。

特徴量ストア Feature Processor パイプラインと最近の実行ステータスは、特徴量ストア UI の特定の特徴量グループの Amazon SageMaker Studio Classic で確認できます。Feature Processor パイプラインに関連する特徴量グループが入力または出力として UI に表示されます。また、リネージビューでは、データを生成する Feature Processor パイプラインやデータソースなどのアップストリーム実行にコンテキストを提供し、さらなるデバッグに役立てることができます。Studio Classic でのリネージビューの使用の詳細については、「[コンソールからリネージを表示する](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)」を参照してください。

# IAM アクセス許可と実行ロール
<a name="feature-store-feature-processor-iam-permissions"></a>

Amazon SageMaker Python SDK を使用するには、 を操作するためのアクセス許可が必要です AWS のサービス。Feature Processor の全機能には以下のポリシーが必要です。IAM ロールにアタッチされた [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html) と [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS 管理ポリシーをアタッチできます。IAM ロールにポリシーをアタッチする方法については、「[IAM ロールにポリシーを追加する](feature-store-adding-policies.md)」を参照してください。次の詳細の例を参照してください。

このポリシーが適用されるロールの信頼ポリシーでは、「scheduler.amazonaws.com」、「sagemaker.amazonaws.com」、「glue.amazonaws.com」の原則を許可する必要があります。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# Feature Processor の制約、制限、クォータ
<a name="feature-store-feature-processor-quotas"></a>

Amazon SageMaker Feature Store の Feature Processing は、SageMaker AI 機械学習 (ML) のリネージトラッキングに依存しています。Feature Store Feature Processor は、リネージコンテキストを使用して Feature Processing Pipelines とパイプラインのバージョンを表し、追跡します。各 Feature Store Feature Processor は、少なくとも 2 つのリネージコンテキスト (1 つは Feature Processing パイプライン用、もう 1 つはバージョン用) を消費します。Feature Processing パイプラインの入力または出力データソースが変更されると、追加のリネージコンテキストが作成されます。SageMaker AI ML リネージュ制限を更新するには、 に連絡して制限の引き上げ AWS をサポートします。Feature Store Feature Processor が使用するリソースのデフォルト制限は次のとおりです。SageMaker AI ML リネージトラッキングの詳細については、「[Amazon SageMaker ML 系統追跡](lineage-tracking.md)」を参照してください。

SageMaker AI クォータの詳細については、「[Amazon SageMaker AI エンドポイントとクォータ](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html)」を参照してください。

リージョンごとのリネージ制限
+ コンテキスト — 500 (ソフト制限)
+ アーティファクト — 6,000 (ソフト制限)
+ アソシエーション — 6,000 (ソフト制限)

リージョンごとのトレーニング制限
+ トレーニングジョブの最長の実行時間 – 432,000 秒
+ トレーニングジョブあたりの最大インスタンス数 – 20
+ 現在のリージョンにおいて、このアカウントで実行できる 1 秒あたりの `CreateTrainingJob` リクエストの最大数 – 1 TPS
+ クラスター再利用のキープアライブ期間 — 3,600 秒

リージョンあたりのパイプラインと同時パイプライン実行の最大数
+ 各アカウントで許可されるパイプラインの最大数 – 500
+ 各アカウントで許可される同時パイプライン実行の最大数 – 20
+ パイプライン実行がタイムアウトする時間 – 672 時間

# データソース
<a name="feature-store-feature-processor-data-sources"></a>

Amazon SageMaker Feature Store Feature Processing は、複数のデータソースをサポートします。 Feature Processor SDK for Python (Boto3) には、Amazon S3 に保存されている特徴量グループまたはオブジェクトからデータをロードするためのコンストラクトが用意されています。また、カスタムデータソースを作成して、他のデータソースからデータをロードすることもできます。特徴量ストアが提供するデータソースの詳細については、「[Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)」を参照してください。

**Topics**
+ [Feature Processor SDK データソース](feature-store-feature-processor-data-sources-sdk.md)
+ [カスタムデータソース](feature-store-feature-processor-data-sources-custom.md)
+ [カスタムデータソースの例](feature-store-feature-processor-data-sources-custom-examples.md)

# Feature Processor SDK データソース
<a name="feature-store-feature-processor-data-sources-sdk"></a>

Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) には、Amazon S3 に保存されている特徴量グループまたはオブジェクトからデータをロードするためのコンストラクトが用意されています。Feature Store が提供するデータソース定義の全リストについては、「[Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)」を参照してください。

Feature Store Python SDK データソース定義の使用方法の例については、「[一般的なユースケース向けの特徴量処理 コードの例](feature-store-feature-processor-examples.md)」参照してください。

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` は Feature Processor の入力データソースとして特徴量グループを指定するために使用されます。データはオフラインストア特徴量グループから読み込むことができます。オンラインストア特徴量グループからデータを読み込もうとすると、検証エラーが発生します。開始オフセットと終了オフセットを指定して、読み込まれるデータを特定の時間範囲に制限できます。例えば、開始オフセットを「14 日」に指定して過去 2 週間のデータのみを読み込むことができ、「7 日」の終了オフセットを指定して、入力を前週のデータに制限することもできます。

## Feature Store が提供するデータソース定義
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

Feature Store Python SDK には、Feature Processor のさまざまな入力データソースを指定するために使用できるデータソース定義が含まれています。これには CSV、Parquet、Iceberg テーブルソースが含まれます。Feature Store が提供するデータソース定義の全リストについては、「[Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)」を参照してください。

# カスタムデータソース
<a name="feature-store-feature-processor-data-sources-custom"></a>

このページでは、カスタムデータソースクラスの作成方法を説明し、使用例をいくつか示します。カスタムデータソースを使用すると、SageMaker AI SDK for Python (Boto3) が提供する API を、Amazon SageMaker Feature Store が提供するデータソースを使用する場合と同じように使用できます。

カスタムデータソースを使用して、Feature Processing を利用してデータを変換し、特徴量グループに取り込むには、次のクラスメンバーと関数で `PySparkDataSource` クラスを拡張する必要があります。
+ `data_source_name` (str): データソースの任意の名前。例えば、Amazon Redshift、Snowflake、Glue カタログ ARN などです。
+ `data_source_unique_id` (str): アクセスしている特定のリソースを参照する一意の識別子。例えば、テーブル名、DDB テーブル ARN、Amazon S3 プレフィックスなどです。カスタムデータソースの同じ `data_source_unique_id` の使用はすべて、リネージビュー内の同じデータソースに関連付けられます。リネージには、特徴量処理ワークフローの実行コード、使用されたデータソース、特徴量グループまたはフ特徴量への取り込み方法に関する情報が含まれます。**Studio** で特徴量グループのリネージを表示する方法の詳細については、「[コンソールからリネージを表示する](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)」を参照してください。
+ `read_data` (func): 特徴量プロセッサとの接続に使用される方法。Spark データフレームを返します。例については「[カスタムデータソースの例](feature-store-feature-processor-data-sources-custom-examples.md)」を参照してください。

`data_source_name` と `data_source_unique_id` は両方ともリネージエンティティを一意に識別するために使用されます。以下は `CustomDataSource` という名前のカスタムデータソースクラスの例です。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# カスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

このセクションでは、Feature Processor のカスタムデータソース実装の例を示します。カスタムデータソースの詳細については、「[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md)」を参照してください。

セキュリティは、 AWS とお客様の責任を共有します。 AWS は、 でサービスを実行するインフラストラクチャを保護する責任があります AWS クラウド。必要なセキュリティ設定と管理タスクはすべてお客様の責任となります。例えば、データストアへのアクセス認証情報などのシークレットは、カスタムデータソースにハードコーディングしないでください。を使用して AWS Secrets Manager 、これらの認証情報を管理できます。Secrets Manager の詳細については、 AWS Secrets Manager ユーザーガイドの[「 とは AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html)」を参照してください。以下の例では、Secrets Manager を認証情報として使用します。

**Topics**
+ [Amazon Redshift Clusters (JDBC) のカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Snowflake のカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Databricks (JDBC) のカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [ストリーミングのカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Amazon Redshift Clusters (JDBC) のカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift には、Spark でデータを読み取るために使用できる JDBC ドライバーが用意されています。Amazon Redshift JDBC ドライバーをダウンロードする方法については、「[Amazon Redshift JDBC ドライバーのバージョン 2.1 をダウンロードする](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html)」を参照してください。

カスタムの Amazon Redshift データソースクラスを作成するには、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md).の `read_data` メソッドを上書きする必要があります。

Amazon Redshift クラスターに接続するには、以下が必要です。
+ Amazon Redshift JDBC URL (`jdbc-url`)

  Amazon Redshift JDBC URL の取得に関する情報については、「Amazon Redshift データベース開発者ガイド」の「[JDBC URL の取得](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html)」を参照してください。
+ Amazon Redshift ユーザー名 (`redshift-user`) とパスワード (`redshift-password`)

  Amazon Redshift SQL コマンドを使用してデータベースユーザーを作成、管理する方法については、「Amazon Redshift データベース開発者ガイド」の「[ユーザー](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html)」を参照してください。
+ Amazon Redshift テーブル名 (`redshift-table-name`)

  いくつかの例を使用してテーブルを作成する方法については、「Amazon Redshift データベース開発者ガイド」の「[CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html)」を参照してください。
+ (オプション) Secrets Manager を使用する場合は、Amazon Redshift アクセスユーザー名とパスワードを Secrets Manager に保存するシークレット名 (`secret-redshift-account-info`) が必要です。

  Secrets Manager の詳細については、 AWS Secrets Manager 「 ユーザーガイド」の「 [でシークレットを検索する AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)」を参照してください。
+ AWS リージョン (`your-region`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

次の例は、Secrets Manager から JDBC URL と個人用のアクセストークンを取得し、カスタムデータソースクラス、`DatabricksDataSource` の `read_data` をオーバーライドする方法を示しています。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

次の例は、`RedshiftDataSource` を `feature_processor` デコレータに接続する方法を示しています。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

特徴量プロセッサジョブをリモートで実行するには、`SparkConfig` を定義して JDBC ドライバーを提供し、`@remote` デコレータに渡す必要があります。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Snowflake のカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake には、`feature_processor` デコレータに使用できる Spark コネクタが用意されています。Spark 用 Snowflake コネクタの詳細については、「Snowflake ドキュメント」の「[Spark 用 Snowflake コネクタ](https://docs.snowflake.com/en/user-guide/spark-connector)」を参照してください。

Snowflake のカスタムデータソースクラスを作成するには、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md) から `read_data` メソッドをオーバーライドし、Spark コネクタパッケージを Spark クラスパスに追加する必要があります。

Snowflake データソースに接続するには、以下が必要です。
+ Snowflake URL (`sf-url`)

  Snowflake ウェブインターフェイスにアクセスするための URL については、「Snowflake ドキュメント」の「[アカウント識別子](https://docs.snowflake.com/en/user-guide/admin-account-identifier)」を参照してください。
+ Snowflake データベース (`sf-database`) 

  Snowflake を使用してデータベース名を取得する方法については、「Snowflakeドキュメント」の「[CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database)」を参照してください。
+ Snowflake データベーススキーマ (`sf-schema`) 

  Snowflake を使用してスキーマ名を取得する方法については、Snowflake ドキュメントの「[CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema)」を参照してください。
+ Snowflake ウェアハウス (`sf-warehouse`)

  Snowflake を使用してウェアハウス名を取得する方法については、「Snowflakeドキュメント」の「[CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse)」を参照してください。
+ Snowflake テーブル名 (`sf-table-name`)
+ (オプション) Secrets Manager を使用する場合は、Snowflake アクセスユーザー名とパスワードを Secrets Manager に保存するシークレット名 (`secret-snowflake-account-info`) が必要です。

  Secrets Manager の詳細については、 AWS Secrets Manager 「 ユーザーガイド」の「 [でシークレットを検索する AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)」を参照してください。
+ AWS リージョン (`your-region`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

次の例は、Secrets Manager から Snowflake のユーザー名とパスワードを取得し、カスタムデータソースクラス、`SnowflakeDataSource` の `read_data` 関数をオーバーライドする方法を示しています。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

次の例は、`SnowflakeDataSource` を `feature_processor` デコレータに接続する方法を示しています。

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

特徴量プロセッサジョブをリモートで実行するには、`SparkConfig` を定義してパッケージを提供し、それを `@remote` デコレータに渡す必要があります。次の例の Spark パッケージは、`spark-snowflake_2.12` が Feature Processor プロセッサの Scala バージョン、`2.12.0` が使用する Snowflake バージョン、`spark_3.3` が Feature Processor プロセッサの Spark バージョンになります。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Databricks (JDBC) のカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark は、Databricks JDBC ドライバーを使用して Databricks からデータを読み取ることができます。Databricks JDBC ドライバーの詳細については、「Databricks ドキュメント」の「[Databricks ODBC/JDBC ドライバーを設定する](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers)」を参照してください。

**注記**  
対応する JDBC ドライバーを Spark クラスパスに含めることで、他のデータベースからデータを読み取ることができます。詳細については、「Spark SQL ガイド」の「[JDBC から他のデータベースへ](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)」を参照してください。

Databricks のカスタムデータソースクラスを作成するには、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md) から `read_data` メソッドをオーバーライドし、JDBC jar を Spark クラスパスに追加する必要があります。

Databricks データソースに接続するには、以下が必要です。
+ Databricks URL (`databricks-url`)

  Databricks URL の詳細については、「Databricks ドキュメント」の「[Databricks ドライバーの接続 URL を構築する](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver)」を参照してください。
+ Databricks 個人用アクセストークン (`personal-access-token`)

  Databricks のアクセストークンの詳細については、「Databricks ドキュメント」の「[Databricks 個人用アクセストークン認証](https://docs.databricks.com/en/dev-tools/auth.html#pat)」を参照してください。
+ データカタログ名 (`db-catalog`) 

  Databricks カタログ名については、「Databricks ドキュメント」の「[カタログ名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name)」を参照してください。
+ スキーマ名 (`db-schema`)

  Databricks スキーマ名については、「Databricks ドキュメント」の「[スキーマ名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)」を参照してください。
+ テーブル名 (`db-table-name`)

  Databricks テーブル名については、「Databricks ドキュメント」の「[テーブル名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name)」を参照してください。
+ (オプション) Secrets Manager を使用する場合は、Databricks アクセスユーザー名とパスワードを Secrets Manager に保存するシークレット名 (`secret-databricks-account-info`) が必要です。

  Secrets Manager の詳細については、 AWS Secrets Manager 「 ユーザーガイド」の「 [でシークレットを検索する AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)」を参照してください。
+ AWS リージョン (`your-region`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

次の例は、Secrets Manager から JDBC URL と個人用アクセストークンを取得し、カスタムデータソースクラス、`DatabricksDataSource` の `read_data` を上書きする方法を示しています。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

次の例は、JDBC ドライバー jar、`jdbc-jar-file-name.jar` を Amazon S3 にアップロードして Spark classpath に追加する方法を示しています。Spark JDBC ドライバー (`jdbc-jar-file-name.jar`) をダウンロードする方法については、Databricks ウェブサイトの「[Download JDBC Driver](https://www.databricks.com/spark/jdbc-drivers-download)」を参照してください。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

特徴量プロセッサジョブをリモートで実行するには、`SparkConfig` を定義して jar を提供し、`@remote` デコレータに渡す必要があります。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## ストリーミングのカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Amazon Kinesis などのストリーミングデータソースに接続し、Spark Structured Streaming を使って変換を作成してストリーミングデータソースから読み取ることができます。Kinesis コネクタの詳細については、GitHub の「[Kinesis Connector for Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis)」を参照してください。Amazon Kinesis の詳細については、「Amazon Kinesis デベロッパーガイド」の「[Amazon Kinesis Data Streams とは](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)」を参照してください。

カスタム Amazon Kinesis データソースクラスを作成するには、`BaseDataSource` クラスを拡張して、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md) の `read_data` メソッドを上書きする必要があります。

Amazon Kinesis データストリームに接続するには、以下が必要です。
+ Kinesis の ARN (`kinesis-resource-arn`) 

  Kinesis のデータストリームの ARN の詳細については、「Amazon Kinesis デベロッパーガイド」の「[Kinesis Data Streams の Amazon リソースネーム (ARN)](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format)」を参照してください。
+ Kinesis データストリーム名 (`kinesis-stream-name`)
+ AWS リージョン (`your-region`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

次の例は、`KinesisDataSource` を `feature_processor` デコレータに接続する方法を説明しています。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

上記のサンプルコードでは、マイクロバッチを特徴量グループにストリーミングする際に、Spark Structured Streaming オプションをいくつか使用しています。オプションの完全なリストについては、「Apache Spark ドキュメント」の[「Structured Streaming プログラミングガイド](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)」を参照してください。
+ `foreachBatch` のシンクモードは、ストリーミングクエリの各マイクロバッチの出力データにオペレーションを適用し、ロジックを記述できる機能です。

  `foreachBatch` の詳細については、「Apache Spark Structured Streaming プログラミングガイド」の「[Using Foreach and ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)」を参照してください。
+ `checkpointLocation` オプションは、ストリーミングアプリケーションのステータスを定期的に保存します。このようなストリーミングログはチェックポイントの場所である `s3a://checkpoint-path` に保存されます。

  `checkpointLocation` オプションの詳細については、「Apache Spark Structured Streaming プログラミングガイド」の「[Recovering from Failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)」を参照してください。
+ `trigger` 設定は、ストリーミングアプリケーションでマイクロバッチ処理がトリガーされる頻度を定義します。この例では、`trigger(processingTime="1 minute")` で指定された 1 分間のマイクロバッチ間隔で処理時間トリガータイプが使用されています。ストリームソースからバックフィルするには、`trigger(availableNow=True)` で指定された available-now トリガータイプを使用できます。

  `trigger` タイプの完全なリストについては、「Apache Spark Structured Streaming プログラミングガイド」の「[Triggers](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers)」を参照してください。

**イベントベースのトリガーを使用した継続的なストリーミングと自動再試行**

Feature Processor は SageMaker Training をコンピューティングインフラストラクチャとして使用します。最大ランタイムの制限は 28 日間です。イベントベースのトリガーを使用すると、継続的なストリーミングをこれより長い期間延長し、一時的な障害から回復できます。スケジュールとイベントベースの実行の詳細については、「[Feature Processor パイプラインのスケジュール済みの実行とイベントベースの実行](feature-store-feature-processor-schedule-pipeline.md)」を参照してください。

以下は、ストリーミング Feature Processor パイプラインの継続的な実行を維持するようにイベントベースのトリガーを設定する例です。ここでは、以前の例で定義したストリーミング変換関数を使用します。ターゲットパイプラインは、ソースパイプラインの実行で `STOPPED` イベントまたは `FAILED`イベントが発生した場合にトリガーされるように設定できます。継続的な実行のために、同じパイプラインがソースとターゲットとして使用されることに注意が必要です。

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# 一般的なユースケース向けの特徴量処理 コードの例
<a name="feature-store-feature-processor-examples"></a>

以下の例は、一般的なユースケース向けの特徴量処理コードのサンプルです。特定のユースケースを紹介する詳細なノートブックの例については、「[Amazon SageMaker Feature Store Feature Processing notebook](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb)」を参照してください。

次の例では、`us-east-1` はリソースのリージョン、`111122223333` はリソース所有者のアカウント ID、`your-feature-group-name` は特徴量グループ名です。

以下の例で使用されている `transactions` データセットのスキーマは次のとおりです。

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [Joining data from multiple data sources](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [Sliding window aggregates](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [Tumbling window aggregates](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [Promotion from the offline store to online store](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [Transformations with the Pandas library](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [イベントベースのトリガーを使用した継続的な実行と自動再試行](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## Joining data from multiple data sources
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## Sliding window aggregates
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## Tumbling window aggregates
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## Promotion from the offline store to online store
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## Transformations with the Pandas library
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**Transformations with the Pandas library**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## イベントベースのトリガーを使用した継続的な実行と自動再試行
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```