

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Exemplos de fontes de dados personalizadas
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Esta seção fornece exemplos de implantações de fontes de dados personalizadas para Processadores de atributos. Para obter mais informações sobre fontes de dados personalizadas, consulte [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md).

A segurança é uma responsabilidade compartilhada AWS entre nossos clientes. AWS é responsável por proteger a infraestrutura que executa os serviços no Nuvem AWS. Os clientes são responsáveis por todas as tarefas necessárias de configuração e gerenciamento de segurança. Por exemplo, segredos como credenciais de acesso aos armazenamentos de dados não devem ser codificados em suas fontes de dados personalizadas. Você pode usar AWS Secrets Manager para gerenciar essas credenciais. Para obter informações sobre o Secrets Manager, consulte [O que é AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) no guia do AWS Secrets Manager usuário. Os exemplos a seguir usarão o Secrets Manager para suas credenciais.

**Topics**
+ [Exemplos de fontes de dados personalizadas dos Clusters do Amazon Redshift (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Exemplos de fontes de dados personalizadas do Snowflake](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Exemplos de fontes de dados personalizadas do Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Exemplos de fontes de dados personalizadas de streaming](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Exemplos de fontes de dados personalizadas dos Clusters do Amazon Redshift (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

O Amazon Redshift oferece um driver JDBC que pode ser usado para ler dados com o Spark. Para obter informações sobre como baixar o driver JDBC do Amazon Redshift, consulte [Baixar o driver JDBC do Amazon Redshift, versão 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Para criar a classe de fonte de dados personalizada do Amazon Redshift, você precisará substituir o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md). 

Para se conectar a um cluster do Amazon Redshift, você precisa de:
+ URL do JDBC do Amazon Redshift (`{{jdbc-url}}`)

  Para obter informações sobre como obter o URL do JDBC do Amazon Redshift, consulte [Obter o URL do JDBC no Guia do desenvolvedor de banco de dados do Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html).
+ Nome de usuário (`{{redshift-user}}`) e senha (`{{redshift-password}}`) do Amazon Redshift

  Para obter informações sobre como criar e gerenciar usuários de banco de dados usando os comandos SQL do Amazon Redshift, consulte [Usuários](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) no Guia do desenvolvedor de banco de dados do Amazon Redshift.
+ Nome da tabela do Amazon Redshift (`{{redshift-table-name}}`)

  Para obter informações sobre como criar uma tabela com alguns exemplos, consulte [CRIAR TABELA](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) no Guia do desenvolvedor de banco de dados do Amazon Redshift.
+ (Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (`{{secret-redshift-account-info}}`) onde você armazena seu nome de usuário e senha de acesso ao Amazon Redshift no Secrets Manager.

  Para obter informações sobre o Secrets Manager, consulte [Encontre segredos AWS Secrets Manager no](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Guia AWS Secrets Manager do Usuário. 
+ Região da AWS (`{{your-region}}`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

O exemplo a seguir demonstra como recuperar o URL do JDBC e o token de acesso pessoal do Secrets Manager e substituir o `read_data` pela sua classe de fonte de dados personalizada, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "{{redshift-resource-arn}}"
    
    def read_data(self, spark, params):
        url = "{{jdbc-url}}?user={{redshift-user}}&password={{redshift-password}}"
        aws_iam_role_arn = "{{redshift-command-access-role}}"
        secret_name = "{{secret-redshift-account-info}}"
        region_name = "{{your-region}}"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("{{jdbc-url}}", secrets["jdbcurl"]).replace("{{redshift-user}}", secrets['username']).replace("{{redshift-password}}", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "{{redshift-table-name}}") \
             .option("tempdir", "s3a://{{your-bucket-name}}/{{your-bucket-prefix}}") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

O exemplo a seguir mostra como conectar o `RedshiftDataSource` ao decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Para executar o trabalho do processador de atributos remotamente, você precisa fornecer o driver jdbc definindo o `SparkConfig` e passando-o para o decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemplos de fontes de dados personalizadas do Snowflake
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

O Snowflake fornece um conector Spark que pode ser usado para seu decorador `feature_processor`. Para obter informações sobre o conector Snowflake para Spark, consulte [Conector Snowflake para Spark](https://docs.snowflake.com/en/user-guide/spark-connector) na documentação do Snowflake.

Para criar a classe de fonte de dados personalizada do Snowflake, você precisará substituir o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md) e adicionar os pacotes de conectores do Spark ao classpath do Spark. 

Para se conectar a uma fonte de dados do Snowflake, você precisa:
+ URL do Snowflake (`{{sf-url}}`)

  Para obter informações sobre URLs como acessar as interfaces web do Snowflake, consulte [Identificadores de conta](https://docs.snowflake.com/en/user-guide/admin-account-identifier) na documentação do Snowflake.
+ Banco de dados do Snowflake (`{{sf-database}}`) 

  Para obter informações sobre como obter o nome do seu banco de dados usando o Snowflake, consulte [CURRENT\_DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) na documentação do Snowflake.
+ Esquema do banco de dados do Snowflake (`{{sf-schema}}`) 

  Para obter informações sobre como obter o nome do seu esquema usando o Snowflake, consulte [CURRENT\_SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) na documentação do Snowflake.
+ Warehouse do Snowflake (`{{sf-warehouse}}`)

  Para obter informações sobre como obter o nome do seu warehouse usando o Snowflake, consulte [CURRENT\_WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) na documentação do Snowflake.
+ Nome da tabela do Snowflake (`{{sf-table-name}}`)
+ (Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (`{{secret-snowflake-account-info}}`) onde você armazena seu nome de usuário e senha de acesso ao Snowflake no Secrets Manager. 

  Para obter informações sobre o Secrets Manager, consulte [Encontre segredos AWS Secrets Manager no](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Guia AWS Secrets Manager do Usuário. 
+ Região da AWS (`{{your-region}}`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

O exemplo a seguir demonstra como recuperar o nome de usuário e senha do Snowflake no Secrets Manager e substituir a função `read_data` pela sua classe de fonte de dados personalizada `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "{{sf-url}}",
        "sfDatabase" : "{{sf-database}}",
        "sfSchema" : "{{sf-schema}}",
        "sfWarehouse" : "{{sf-warehouse}}",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "{{sf-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-snowflake-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "{{sf-table-name}}") \
                        .load()
```

O exemplo a seguir mostra como conectar o `SnowflakeDataSource` ao decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Para executar o trabalho do processador de atributos remotamente, você precisa fornecer os pacotes definindo o `SparkConfig` e passando-os para o decorador `@remote`. Os pacotes Spark no exemplo a seguir mostra que `spark-snowflake_2.12` é a versão Scala do Processador de atributos, `2.12.0` é a versão do Snowflake que você deseja usar e `spark_3.3` é a versão do Spark do Processador de atributos. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="{{feature-group-arn>}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemplos de fontes de dados personalizadas do Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

O Spark pode ler dados do Databricks usando o driver JDBC do Databricks. Para obter informações sobre o driver JDBC do Databricks, consulte [Configurar os drivers ODBC e JDBC do Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) na documentação do Databricks.

**nota**  
Você pode ler dados de qualquer outro banco de dados incluindo o driver JDBC correspondente no classpath do Spark. Para obter mais informações, consulte [JDBC para outros bancos de dados](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) no Guia do Spark SQL.

Para criar a classe de fonte de dados personalizada do Databricks, você precisará substituir o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md) e adicionar os arquivos jar do JDBC ao classpath do Spark. 

Para se conectar a uma fonte de dados do Databricks, você precisa:
+ URL do Databricks (`{{databricks-url}}`)

  Para obter informações sobre o URL do Databricks, consulte [Criar URL da conexão para o driver do Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) na documentação do Databricks.
+ Token de acesso pessoal do Databricks (`{{personal-access-token}}`)

  Para obter informações sobre seu token de acesso ao Databricks, consulte [Autenticação do token de acesso pessoal do Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) na documentação do Databricks.
+ Nome do catálogo de dados (`{{db-catalog}}`) 

  Para obter informações sobre o nome do catálogo do Databricks, consulte [Nome de catálogo](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) na documentação do Databricks.
+ Nome do esquema (`{{db-schema}}`)

  Para obter informações sobre o nome do esquema do Databricks, consulte [Nome do esquema](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) na documentação do Databricks.
+ Nome da tabela (`{{db-table-name}}`)

  Para obter informações sobre o nome da tabela do Databricks, consulte [Nome da tabela](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) na documentação do Databricks.
+ (Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (`{{secret-databricks-account-info}}`) onde você armazena seu nome de usuário e senha de acesso ao Databricks no Secrets Manager. 

  Para obter informações sobre o Secrets Manager, consulte [Encontre segredos AWS Secrets Manager no](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Guia AWS Secrets Manager do Usuário. 
+ Região da AWS (`{{your-region}}`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

O exemplo a seguir demonstra como recuperar o URL do JDBC e o token de acesso pessoal do Secrets Manager e substituir o `read_data` pela sua classe de fonte de dados personalizada, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "{{databricks-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-databricks-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("{{personal-access-token}}", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`{{db-catalog}}`.`{{db-schema}}`.`{{db-table-name}}`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

O exemplo a seguir mostra como fazer o upload dos arquivos jar do driver JDBC, `{{jdbc-jar-file-name}}.jar`, para o Amazon S3 para adicioná-lo ao classpath do Spark. Para obter informações sobre como baixar o driver JDBC (`{{jdbc-jar-file-name}}.jar`) do Spark a partir do Databricks, consulte [Baixar o driver JDBC](https://www.databricks.com/spark/jdbc-drivers-download) no site do Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"}
)
def transform(input_df):
    return input_df
```

Para executar o trabalho do processador de atributos remotamente, você precisa fornecer os arquivos jar definindo o `SparkConfig` e passando-os para o decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemplos de fontes de dados personalizadas de streaming
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Você pode se conectar a fontes de dados de streaming, como o Amazon Kinesis, e criar transformações com o Spark Structured Streaming para ler a partir de fontes de dados de streaming. Para obter informações sobre o conector Kinesis, consulte Conector [Kinesis para streaming estruturado do Spark](https://github.com/roncemer/spark-sql-kinesis) em. GitHub Para obter mais informações sobre o Amazon Kinesis, consulte [O que é o Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) no Guia do desenvolvedor do Amazon Kinesis.

Para criar a classe de fonte de dados personalizada do Amazon Kinesis, você precisará estender a classe `BaseDataSource` e sobrescrever o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md).

Para se conectar a um Amazon Kinesis Data Streams, você precisa:
+ ARN do Kinesis (`{{kinesis-resource-arn}}`) 

  Para obter informações sobre o stream de dados do Kinesis ARNs, consulte [Amazon Resource Names (ARNs) para Kinesis Data Streams no Guia do desenvolvedor do Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format).
+ Nome do fluxo de dados do Kinesis (`{{kinesis-stream-name}}`)
+ Região da AWS (`{{your-region}}`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "{{kinesis-resource-arn}}"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "{{kinesis-stream-name}}") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.{{your-region}}.amazonaws.com")
            .load()
```

O exemplo a seguir demonstra como conectar o `KinesisDataSource` ao decorador `feature_processor`. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "{{feature-group-arn}}"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="{{feature-group-arn}}"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "{{s3a://checkpoint-path}}")
        .start()
    )
    output_stream.awaitTermination()
```

No código de exemplo acima, usamos algumas opções do Spark Structured Streaming ao transmitir microlotes para seu grupo de atributos. Para ver uma lista completa de opções, consulte o [Guia de programação de streaming estruturado](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) na documentação do Apache Spark. 
+ O modo sink `foreachBatch` é um atributo que permite aplicar operações e escrever lógica nos dados de saída de cada microlote de uma consulta de streaming. 

  Para obter informações sobre isso`foreachBatch`, consulte [Usando o Foreach e ForeachBatch no Guia](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) de programação de streaming estruturado do Apache Spark. 
+ A opção `checkpointLocation` salva periodicamente o estado da aplicação de streaming. O log de streaming é salvo no local `{{s3a://checkpoint-path}}` do ponto de verificação.

  Para obter informações sobre a opção `checkpointLocation`, consulte [Recuperando-se de falhas com pontos de verificação](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) no Guia de programação do Apache Spark Structured Streaming. 
+ A configuração `trigger` define com que frequência o processamento em microlote é acionado em uma aplicação de streaming. No exemplo, o tipo de gatilho de tempo de processamento é usado com intervalos de microlote de um minuto, especificados por `trigger(processingTime="1 minute")`. Para preencher a partir de uma fonte de fluxo, você pode usar o tipo de gatilho disponível agora, especificado por `trigger(availableNow=True)`.

  Para ver uma lista completa dos tipos de `trigger`, consulte [Gatilhos](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) no Guia de programação do Apache Spark Structured Streaming.

**Streaming contínuo e novas tentativas automáticas usando gatilhos baseados em eventos**

O Feature Processor usa o SageMaker treinamento como infraestrutura computacional e tem um limite máximo de tempo de execução de 28 dias. Você pode usar gatilhos baseados em eventos para estender seu streaming contínuo por um longo período de tempo e se recuperar de falhas transitórias. Para obter mais informações sobre execuções baseadas em programações e eventos, consulte [Execuções programadas e baseadas em eventos para pipelines do Processador de atributos](feature-store-feature-processor-schedule-pipeline.md).

Veja a seguir um exemplo de configuração de um gatilho baseado em eventos para manter o pipeline de streaming do Processador de atributos funcionando continuamente. Ele usa a função de transformação de streaming definida no exemplo anterior. Um pipeline de destino pode ser configurado para ser acionado quando ocorre um evento `STOPPED` ou `FAILED` para a execução de um pipeline de origem. Observe que o mesmo pipeline é usado como origem e destino para que seja executado continuamente.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "{{streaming-pipeline}}"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```