

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# カスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

このセクションでは、Feature Processor のカスタムデータソース実装の例を示します。カスタムデータソースの詳細については、「[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md)」を参照してください。

セキュリティは、 AWS とお客様の責任を共有します。 AWS は、 のサービスを実行するインフラストラクチャを保護する責任があります AWS クラウド。必要なセキュリティ設定と管理タスクはすべてお客様の責任となります。例えば、データストアへのアクセス認証情報などのシークレットは、カスタムデータソースにハードコーディングしないでください。を使用して AWS Secrets Manager 、これらの認証情報を管理できます。Secrets Manager の詳細については、[「 とは」を参照してください AWS Secrets Manager。](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) AWS Secrets Manager ユーザーガイドの「」を参照してください。以下の例では、Secrets Manager を認証情報として使用します。

**Topics**
+ [Amazon Redshift Clusters (JDBC) のカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Snowflake のカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Databricks (JDBC) のカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [ストリーミングのカスタムデータソースの例](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Amazon Redshift Clusters (JDBC) のカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift には、Spark でデータを読み取るために使用できる JDBC ドライバーが用意されています。Amazon Redshift JDBC ドライバーをダウンロードする方法については、「[Amazon Redshift JDBC ドライバーのバージョン 2.1 をダウンロードする](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html)」を参照してください。

カスタムの Amazon Redshift データソースクラスを作成するには、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md).の `read_data` メソッドを上書きする必要があります。

Amazon Redshift クラスターに接続するには、以下が必要です。
+ Amazon Redshift JDBC URL (`{{jdbc-url}}`)

  Amazon Redshift JDBC URL の取得に関する情報については、「Amazon Redshift データベース開発者ガイド」の「[JDBC URL の取得](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html)」を参照してください。
+ Amazon Redshift ユーザー名 (`{{redshift-user}}`) とパスワード (`{{redshift-password}}`)

  Amazon Redshift SQL コマンドを使用してデータベースユーザーを作成、管理する方法については、「Amazon Redshift データベース開発者ガイド」の「[ユーザー](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html)」を参照してください。
+ Amazon Redshift テーブル名 (`{{redshift-table-name}}`)

  いくつかの例を使用してテーブルを作成する方法については、「Amazon Redshift データベース開発者ガイド」の「[CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html)」を参照してください。
+ (オプション) Secrets Manager を使用する場合は、Amazon Redshift アクセスユーザー名とパスワードを Secrets Manager に保存するシークレット名 (`{{secret-redshift-account-info}}`) が必要です。

  Secrets Manager の詳細については、 AWS Secrets Manager 「 ユーザーガイド」の「 [でシークレットを検索する AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)」を参照してください。
+ AWS リージョン (`{{your-region}}`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

次の例は、Secrets Manager から JDBC URL と個人用のアクセストークンを取得し、カスタムデータソースクラス、`DatabricksDataSource` の `read_data` をオーバーライドする方法を示しています。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "{{redshift-resource-arn}}"
    
    def read_data(self, spark, params):
        url = "{{jdbc-url}}?user={{redshift-user}}&password={{redshift-password}}"
        aws_iam_role_arn = "{{redshift-command-access-role}}"
        secret_name = "{{secret-redshift-account-info}}"
        region_name = "{{your-region}}"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("{{jdbc-url}}", secrets["jdbcurl"]).replace("{{redshift-user}}", secrets['username']).replace("{{redshift-password}}", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "{{redshift-table-name}}") \
             .option("tempdir", "s3a://{{your-bucket-name}}/{{your-bucket-prefix}}") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

次の例は、`RedshiftDataSource` を `feature_processor` デコレータに接続する方法を示しています。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

特徴量プロセッサジョブをリモートで実行するには、`SparkConfig` を定義して JDBC ドライバーを提供し、`@remote` デコレータに渡す必要があります。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Snowflake のカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake には、`feature_processor` デコレータに使用できる Spark コネクタが用意されています。Spark 用 Snowflake コネクタの詳細については、「Snowflake ドキュメント」の「[Spark 用 Snowflake コネクタ](https://docs.snowflake.com/en/user-guide/spark-connector)」を参照してください。

Snowflake のカスタムデータソースクラスを作成するには、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md) から `read_data` メソッドをオーバーライドし、Spark コネクタパッケージを Spark クラスパスに追加する必要があります。

Snowflake データソースに接続するには、以下が必要です。
+ Snowflake URL (`{{sf-url}}`)

  Snowflake ウェブインターフェイスにアクセスするための URL については、「Snowflake ドキュメント」の「[アカウント識別子](https://docs.snowflake.com/en/user-guide/admin-account-identifier)」を参照してください。
+ Snowflake データベース (`{{sf-database}}`) 

  Snowflake を使用してデータベース名を取得する方法については、「Snowflakeドキュメント」の「[CURRENT\_DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database)」を参照してください。
+ Snowflake データベーススキーマ (`{{sf-schema}}`) 

  Snowflake を使用してスキーマ名を取得する方法については、Snowflake ドキュメントの「[CURRENT\_SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema)」を参照してください。
+ Snowflake ウェアハウス (`{{sf-warehouse}}`)

  Snowflake を使用してウェアハウス名を取得する方法については、「Snowflakeドキュメント」の「[CURRENT\_WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse)」を参照してください。
+ Snowflake テーブル名 (`{{sf-table-name}}`)
+ (オプション) Secrets Manager を使用する場合は、Snowflake アクセスユーザー名とパスワードを Secrets Manager に保存するシークレット名 (`{{secret-snowflake-account-info}}`) が必要です。

  Secrets Manager の詳細については、 AWS Secrets Manager 「 ユーザーガイド」の「 [でシークレットを検索する AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)」を参照してください。
+ AWS リージョン (`{{your-region}}`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

次の例は、Secrets Manager から Snowflake のユーザー名とパスワードを取得し、カスタムデータソースクラス、`SnowflakeDataSource` の `read_data` 関数をオーバーライドする方法を示しています。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "{{sf-url}}",
        "sfDatabase" : "{{sf-database}}",
        "sfSchema" : "{{sf-schema}}",
        "sfWarehouse" : "{{sf-warehouse}}",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "{{sf-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-snowflake-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "{{sf-table-name}}") \
                        .load()
```

次の例は、`SnowflakeDataSource` を `feature_processor` デコレータに接続する方法を示しています。

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

特徴量プロセッサジョブをリモートで実行するには、`SparkConfig` を定義してパッケージを提供し、それを `@remote` デコレータに渡す必要があります。次の例の Spark パッケージは、`spark-snowflake_2.12` が Feature Processor プロセッサの Scala バージョン、`2.12.0` が使用する Snowflake バージョン、`spark_3.3` が Feature Processor プロセッサの Spark バージョンになります。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="{{feature-group-arn>}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Databricks (JDBC) のカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark は、Databricks JDBC ドライバーを使用して Databricks からデータを読み取ることができます。Databricks JDBC ドライバーの詳細については、「Databricks ドキュメント」の「[Databricks ODBC/JDBC ドライバーを設定する](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers)」を参照してください。

**注記**  
対応する JDBC ドライバーを Spark クラスパスに含めることで、他のデータベースからデータを読み取ることができます。詳細については、「Spark SQL ガイド」の「[JDBC から他のデータベースへ](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)」を参照してください。

Databricks のカスタムデータソースクラスを作成するには、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md) から `read_data` メソッドをオーバーライドし、JDBC jar を Spark クラスパスに追加する必要があります。

Databricks データソースに接続するには、以下が必要です。
+ Databricks URL (`{{databricks-url}}`)

  Databricks URL の詳細については、「Databricks ドキュメント」の「[Databricks ドライバーの接続 URL を構築する](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver)」を参照してください。
+ Databricks 個人用アクセストークン (`{{personal-access-token}}`)

  Databricks のアクセストークンの詳細については、「Databricks ドキュメント」の「[Databricks 個人用アクセストークン認証](https://docs.databricks.com/en/dev-tools/auth.html#pat)」を参照してください。
+ データカタログ名 (`{{db-catalog}}`) 

  Databricks カタログ名については、「Databricks ドキュメント」の「[カタログ名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name)」を参照してください。
+ スキーマ名 (`{{db-schema}}`)

  Databricks スキーマ名については、「Databricks ドキュメント」の「[スキーマ名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)」を参照してください。
+ テーブル名 (`{{db-table-name}}`)

  Databricks テーブル名については、「Databricks ドキュメント」の「[テーブル名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name)」を参照してください。
+ (オプション) Secrets Manager を使用する場合は、Databricks アクセスユーザー名とパスワードを Secrets Manager に保存するシークレット名 (`{{secret-databricks-account-info}}`) が必要です。

  Secrets Manager の詳細については、 AWS Secrets Manager 「 ユーザーガイド」の「 [でシークレットを検索する AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)」を参照してください。
+ AWS リージョン (`{{your-region}}`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

次の例は、Secrets Manager から JDBC URL と個人用アクセストークンを取得し、カスタムデータソースクラス、`DatabricksDataSource` の `read_data` を上書きする方法を示しています。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "{{databricks-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-databricks-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("{{personal-access-token}}", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`{{db-catalog}}`.`{{db-schema}}`.`{{db-table-name}}`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

次の例は、JDBC ドライバー jar、`{{jdbc-jar-file-name}}.jar` を Amazon S3 にアップロードして Spark classpath に追加する方法を示しています。Spark JDBC ドライバー (`{{jdbc-jar-file-name}}.jar`) をダウンロードする方法については、Databricks ウェブサイトの「[Download JDBC Driver](https://www.databricks.com/spark/jdbc-drivers-download)」を参照してください。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"}
)
def transform(input_df):
    return input_df
```

特徴量プロセッサジョブをリモートで実行するには、`SparkConfig` を定義して jar を提供し、`@remote` デコレータに渡す必要があります。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## ストリーミングのカスタムデータソースの例
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Amazon Kinesis などのストリーミングデータソースに接続し、Spark Structured Streaming を使って変換を作成してストリーミングデータソースから読み取ることができます。Kinesis コネクタの詳細については、GitHub の「[Kinesis Connector for Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis)」を参照してください。Amazon Kinesis の詳細については、「Amazon Kinesis デベロッパーガイド」の「[Amazon Kinesis Data Streams とは](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)」を参照してください。

カスタム Amazon Kinesis データソースクラスを作成するには、`BaseDataSource` クラスを拡張して、[カスタムデータソース](feature-store-feature-processor-data-sources-custom.md) の `read_data` メソッドを上書きする必要があります。

Amazon Kinesis データストリームに接続するには、以下が必要です。
+ Kinesis の ARN (`{{kinesis-resource-arn}}`) 

  Kinesis のデータストリームの ARN の詳細については、「Amazon Kinesis デベロッパーガイド」の「[Kinesis Data Streams の Amazon リソースネーム (ARN)](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format)」を参照してください。
+ Kinesis データストリーム名 (`{{kinesis-stream-name}}`)
+ AWS リージョン (`{{your-region}}`)

  Python SDK (Boto3) を使用して現在のセッションのリージョン名を取得する方法については、「Boto3 documentation」の「[region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)」を参照してください。

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "{{kinesis-resource-arn}}"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "{{kinesis-stream-name}}") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.{{your-region}}.amazonaws.com")
            .load()
```

次の例は、`KinesisDataSource` を `feature_processor` デコレータに接続する方法を説明しています。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "{{feature-group-arn}}"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="{{feature-group-arn}}"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "{{s3a://checkpoint-path}}")
        .start()
    )
    output_stream.awaitTermination()
```

上記のサンプルコードでは、マイクロバッチを特徴量グループにストリーミングする際に、Spark Structured Streaming オプションをいくつか使用しています。オプションの完全なリストについては、「Apache Spark ドキュメント」の[「Structured Streaming プログラミングガイド](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)」を参照してください。
+ `foreachBatch` のシンクモードは、ストリーミングクエリの各マイクロバッチの出力データにオペレーションを適用し、ロジックを記述できる機能です。

  `foreachBatch` の詳細については、「Apache Spark Structured Streaming プログラミングガイド」の「[Using Foreach and ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)」を参照してください。
+ `checkpointLocation` オプションは、ストリーミングアプリケーションのステータスを定期的に保存します。このようなストリーミングログはチェックポイントの場所である `{{s3a://checkpoint-path}}` に保存されます。

  `checkpointLocation` オプションの詳細については、「Apache Spark Structured Streaming プログラミングガイド」の「[Recovering from Failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)」を参照してください。
+ `trigger` 設定は、ストリーミングアプリケーションでマイクロバッチ処理がトリガーされる頻度を定義します。この例では、`trigger(processingTime="1 minute")` で指定された 1 分間のマイクロバッチ間隔で処理時間トリガータイプが使用されています。ストリームソースからバックフィルするには、`trigger(availableNow=True)` で指定された available-now トリガータイプを使用できます。

  `trigger` タイプの完全なリストについては、「Apache Spark Structured Streaming プログラミングガイド」の「[Triggers](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers)」を参照してください。

**イベントベースのトリガーを使用した継続的なストリーミングと自動再試行**

Feature Processor は SageMaker Training をコンピューティングインフラストラクチャとして使用します。最大ランタイムの制限は 28 日間です。イベントベースのトリガーを使用すると、継続的なストリーミングをこれより長い期間延長し、一時的な障害から回復できます。スケジュールとイベントベースの実行の詳細については、「[Feature Processor パイプラインのスケジュール済みの実行とイベントベースの実行](feature-store-feature-processor-schedule-pipeline.md)」を参照してください。

以下は、ストリーミング Feature Processor パイプラインの継続的な実行を維持するようにイベントベースのトリガーを設定する例です。ここでは、以前の例で定義したストリーミング変換関数を使用します。ターゲットパイプラインは、ソースパイプラインの実行で `STOPPED` イベントまたは `FAILED`イベントが発生した場合にトリガーされるように設定できます。継続的な実行のために、同じパイプラインがソースとターゲットとして使用されることに注意が必要です。

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "{{streaming-pipeline}}"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```