

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

本節提供特徵處理器的自訂資料來源實作範例。如需有關自訂資料來源的更多資訊，請參閱[自訂資料來源](feature-store-feature-processor-data-sources-custom.md)。

安全性是 AWS 與 客戶共同的責任。 AWS 負責保護在 中執行服務的基礎設施 AWS 雲端。客戶必須負責所有必要的安全組態和管理任務。例如，機密 (例如資料存放區的存取登入資料) 不應在您的自訂資料來源中進行硬式編碼。您可以使用 AWS Secrets Manager 來管理這些登入資料。如需 Secrets Manager 的資訊，請參閱[什麼是 AWS Secrets Manager？](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) AWS Secrets Manager 使用者指南中的 。以下範例會使用 Secrets Manager 做為您的憑證。

**Topics**
+ [Amazon Redshift 叢集 (JDBC) 自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Snowflake 自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Databricks (JDBC) 自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [串流自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Amazon Redshift 叢集 (JDBC) 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift 提供了一個 JDBC 驅動程式，可用於使用 Spark 讀取資料。如需有關如何下載 Amazon Redshift JDBC 驅動程式的資訊，請參閱[下載 Amazon Redshift JDBC 驅動程式 2.1 版](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html)。

若要建立自訂的 Amazon Redshift 資料來源類別，您將需要覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

若要與 Amazon Redshift 叢集連線，您需要：
+ Amazon Redshift JDBC URL (`{{jdbc-url}}`)

  如需有關如何取得 Amazon Redshift JDBC URL 的資訊，請參閱 Amazon Redshift 資料庫開發人員指南中的[取得 JDBC URL](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html)。
+ Amazon Redshift 使用者名稱 (`{{redshift-user}}`) 和密碼 (`{{redshift-password}}`)

  如需有關如何使用 Amazon Redshift SQL 命令建立和管理資料庫使用者的資訊，請參閱 Amazon Redshift 資料庫開發人員指南中的[使用者](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html)。
+ Amazon Redshift 資料表名稱 (`{{redshift-table-name}}`)

  如需有關如何使用範例建立資料表的資訊，請參閱 Amazon Redshift 資料庫開發人員指南中的[建立資料表](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html)。
+ (可選) 如果使用 Secrets Manager，則需要機密名稱 (`{{secret-redshift-account-info}}`)，您將在其中儲存 Secrets Manager 上的 Amazon Redshift 存取使用者名稱和密碼。

  如需 Secrets Manager 的資訊，請參閱 AWS Secrets Manager 《 使用者指南》中的在 [中尋找秘密 AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 區域 (`{{your-region}}`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下範例會示範如何從 Secrets Manager 擷取 JDBC URL 和個人存取權杖，並覆寫自訂資料來源類別 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "{{redshift-resource-arn}}"
    
    def read_data(self, spark, params):
        url = "{{jdbc-url}}?user={{redshift-user}}&password={{redshift-password}}"
        aws_iam_role_arn = "{{redshift-command-access-role}}"
        secret_name = "{{secret-redshift-account-info}}"
        region_name = "{{your-region}}"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("{{jdbc-url}}", secrets["jdbcurl"]).replace("{{redshift-user}}", secrets['username']).replace("{{redshift-password}}", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "{{redshift-table-name}}") \
             .option("tempdir", "s3a://{{your-bucket-name}}/{{your-bucket-prefix}}") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

以下範例示範如何將 `RedshiftDataSource` 連線至 `feature_processor` 裝飾器。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

若要遠端執行特徵處理器工作，您需要透過定義 `SparkConfig` 並將其傳遞給 `@remote` 裝飾器來提供 jdbc 驅動程式。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Snowflake 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake 提供了一個 Spark 連接器，可用於您的 `feature_processor` 裝飾器。如需有關 Spark 的 Snowflake 連接器的資訊，請參閱 Snowflake 文件中的 [Spark 的 Snowflake 連接器](https://docs.snowflake.com/en/user-guide/spark-connector)。

若要建立自訂 Snowflake 資料來源類別，您必須覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法，並將 Spark 連接器套件新增至 Spark 類別路徑。

若要與 Snowflake 資料來源連線，您需要：
+ Snowflake URL (`{{sf-url}}`)

  如需有關如何存取 Snowflake Web 介面之 URL 的資訊，請參閱 Snowflake 文件中的[帳戶識別符](https://docs.snowflake.com/en/user-guide/admin-account-identifier)。
+ Snowflake 資料庫 (`{{sf-database}}`) 

  如需有關如何使用 Snowflake 取得資料庫名稱的資訊，請參閱 Snowflake 文件中的 [CURRENT\_DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database)。
+ Snowflake 資料庫結構描述 (`{{sf-schema}}`) 

  如需有關如何使用 Snowflake 取得你的結構描述名稱的資訊，請參閱 Snowflake 文件中的 [CURRENT\_SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema)。
+ Snowflake 倉儲 (`{{sf-warehouse}}`)

  如需有關如何使用 Snowflake 取得倉儲名稱的資訊，請參閱 Snowflake 文件中的 [CURRENT\_WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse)。
+ Snowflake 資料表名稱 (`{{sf-table-name}}`)
+ (可選) 如果使用 Secrets Manager，則需要機密名稱 (`{{secret-snowflake-account-info}}`)，您將在其中儲存 Secrets Manager 上的 Snowflake 存取使用者名稱和密碼。

  如需 Secrets Manager 的資訊，請參閱 AWS Secrets Manager 《 使用者指南》中的在 [中尋找秘密 AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 區域 (`{{your-region}}`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下範例會示範如何從 Secrets Manager 擷取 Snowflake 使用者名稱和密碼，並覆寫自訂資料來源類別 `SnowflakeDataSource` 的 `read_data` 函式。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "{{sf-url}}",
        "sfDatabase" : "{{sf-database}}",
        "sfSchema" : "{{sf-schema}}",
        "sfWarehouse" : "{{sf-warehouse}}",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "{{sf-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-snowflake-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "{{sf-table-name}}") \
                        .load()
```

以下範例示範如何將 `SnowflakeDataSource` 連線至 `feature_processor` 裝飾器。

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

若要遠端執行特徵處理器工作，您需要透過定義 `SparkConfig` 並將其傳遞給 `@remote` 裝飾器來提供套件。在下面的範例中的 Spark 套件是這樣的：`spark-snowflake_2.12` 是特徵處理器 Scala 版本，`2.12.0` 是您希望使用的 Snowflake 版本，`spark_3.3` 是特徵處理器 Spark 版本。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="{{feature-group-arn>}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Databricks (JDBC) 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark 可以透過使用 Databricks JDBC 驅動程式從 Databricks 讀取資料。如需有關 Databricks JDBC 驅動程式的更多資訊，請參閱 Databricks 文件中的[設定 Databricks ODBC 和 JDBC 驅動程式](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers)。

**注意**  
您可以透過包括 Spark 類路徑相應的 JDBC 驅動程式從任何其他資料庫讀取資料。如需詳細資訊，請參閱 Spark SQL 指南中的 [JDBC 至其他資料庫](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)。

若要建立自訂 Databricks 資料來源類別，您必須覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法，並將 JDBC jar 新增至 Spark classpath。

若要與 Databricks 資料來源連線，您需要：
+ Databricks URL (`{{databricks-url}}`)

  如需有關 Databricks URL 的更多資訊，請參閱 Databricks 文件中的[建置 Databricks 驅動程式的連線 URL](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver)。
+ Databricks 個人存取權杖 (`{{personal-access-token}}`)

  如需有關 Databricks 存取權杖的更多資訊，請參閱 Databricks 文件中的 [Databricks 個人存取權杖驗證](https://docs.databricks.com/en/dev-tools/auth.html#pat)。
+ 資料型錄名稱 (`{{db-catalog}}`) 

  如需有關 Databricks 目錄名稱的資訊，請參閱 Databricks 文件中的[目錄名稱](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name)。
+ 結構描述名稱 (`{{db-schema}}`)

  如需有關 Databricks 結構描述名稱的資訊，請參閱 Databricks 文件中的[結構描述名稱](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)。
+ 資料表名稱 (`{{db-table-name}}`)

  如需有關 Databricks 資料表名稱的資訊，請參閱 Databricks 文件中的[資料表名稱](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name)。
+ (可選) 如果使用 Secrets Manager，則需要機密名稱 (`{{secret-databricks-account-info}}`)，您將在其中儲存 Secrets Manager 上的 Databricks 存取使用者名稱和密碼。

  如需 Secrets Manager 的資訊，請參閱 AWS Secrets Manager 《 使用者指南》中的在 [中尋找秘密 AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 區域 (`{{your-region}}`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下範例會示範如何從 Secrets Manager 擷取 JDBC URL 和個人存取權杖，並覆寫自訂資料來源類別 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "{{databricks-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-databricks-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("{{personal-access-token}}", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`{{db-catalog}}`.`{{db-schema}}`.`{{db-table-name}}`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

下面的範例說明如何上傳 JDBC 驅動程式 jar 檔案`{{jdbc-jar-file-name}}.jar` 至 Amazon S3，以將其新增到 Spark classpath。如需有關從 Databricks 下載 Spark JDBC 驅動程式 (`{{jdbc-jar-file-name}}.jar`) 的更多資訊，請參閱 Databricks 網站中的[下載 JDBC 驅動程式](https://www.databricks.com/spark/jdbc-drivers-download)。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"}
)
def transform(input_df):
    return input_df
```

若要遠端執行特徵處理器工作，您需要透過定義 `SparkConfig` 並將其傳遞給 `@remote` 裝飾器來提供 jar。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## 串流自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

您可以連線至 Amazon Kinesis 等串流資料來源，並使用 Spark 結構化串流撰寫轉換，以便從串流資料來源讀取。如需有關 Kinesis 連接器的資訊，請參閱 GitHub 中的[適用於 Spark 結構化串流的 Kinesis 連接器](https://github.com/roncemer/spark-sql-kinesis)。如需有關 Amazon Kinesis 的更多資訊，請參閱 Amazon Kinesis 開發人員指南中的[什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)。

若要建立自訂的 Amazon Kinesis 資料來源類別，您將需要擴展 `BaseDataSource` 類別並覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

若要連接 Amazon Kinesis Data Streams，您需要：
+ Kinesis ARN (`{{kinesis-resource-arn}}`) 

  如需有關 Kinesis 資料串流 ARN 的資訊，請參閱 Amazon Kinesis 開發人員指南中的 [Kinesis Data Streams 的 Amazon Resource Name (ARN)](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format)。
+ Kinesis 資料串流名稱 (`{{kinesis-stream-name}}`)
+ AWS 區域 (`{{your-region}}`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "{{kinesis-resource-arn}}"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "{{kinesis-stream-name}}") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.{{your-region}}.amazonaws.com")
            .load()
```

下面的範例示範了如何將 `KinesisDataSource` 連線至 `feature_processor` 裝飾器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "{{feature-group-arn}}"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="{{feature-group-arn}}"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "{{s3a://checkpoint-path}}")
        .start()
    )
    output_stream.awaitTermination()
```

在上面範例程式碼中，我們使用一些 Spark 結構化串流選項，同時將微批次串流傳輸到您的特徵群組中。如需選項的完整清單，請參閱 Apache Spark 文件中的[結構化串流程式設計指南](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)。
+ `foreachBatch` 接收器模式是一項功能，可讓您在串流查詢的每個微批次的輸出資料上套用操作並寫入邏輯。

  如需有關 `foreachBatch` 的資訊，請參閱 Apache Spark 結構化串流程式設計指南中的[使用 Foreach 和 ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)。
+ `checkpointLocation` 選項會定期儲存串流應用程式的狀態。串流日誌會儲存在檢查點位置 `{{s3a://checkpoint-path}}`。

  如需有關 `checkpointLocation` 選項的資訊，請參閱 Apache Spark 結構化串流程式設計指南中的[使用檢查點從失敗中復原](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)。
+ `trigger` 設定定義在串流應用程式中觸發微批次處理的頻率。在此範例中，處理時間觸發條件類型會以一分鐘的微批次間隔 (由 `trigger(processingTime="1 minute")` 指定) 使用。若要從串流來源回填，您可以透過 `trigger(availableNow=True)` 指定的現在可用的觸發條件類型。

  如需 `trigger` 類型的完整清單，請參閱 Apache Spark 結構化串流程式設計指南中的[觸發程式](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers)。

**使用事件型觸發程式連續串流和自動重試**

特徵處理器使用 SageMaker 訓練作為運算基礎設施，其執行期上限為 28 天。您可以使用事件型觸發程式，將連續串流延長一段時間，並從暫時性失敗中復原。如需有關排程和事件型執行的更多資訊，請參閱[以排程和事件為基礎執行特徵處理器管道](feature-store-feature-processor-schedule-pipeline.md)。

以下是設定事件型觸發程式以保持串流特徵處理器管道持續執行的範例。這會使用前面範例中定義的串流轉換函式。您可以將目標管道配置為在來源管道執行發生 `STOPPED` 或 `FAILED` 事件時觸發。請注意，使用相同的管道作為來源和目標，以便持續執行。

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "{{streaming-pipeline}}"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```