

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exemples de sources de données personnalisées
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Cette section fournit des exemples d'implémentations de sources de données personnalisées pour les intégrateurs de fonctionnalités. Pour plus d'informations sur les sources de données personnalisées, consultez [Sources de données personnalisées](feature-store-feature-processor-data-sources-custom.md).

La sécurité est une responsabilité partagée AWS entre nos clients. AWS est chargé de protéger l'infrastructure qui gère les services dans le AWS Cloud. Les clients sont responsables de toutes leurs tâches de configuration et de gestion de sécurité nécessaires. Par exemple, des secrets tels que les informations d'identification d'accès aux magasins de données ne doivent pas être codés en dur dans vos sources de données personnalisées. Vous pouvez les utiliser AWS Secrets Manager pour gérer ces informations d'identification. Pour plus d'informations sur Secrets Manager, consultez [Qu'est-ce que c'est AWS Secrets Manager ?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) dans le guide de AWS Secrets Manager l'utilisateur. Les exemples suivants utilisent Secrets Manager pour vos informations d’identification.

**Topics**
+ [Exemples de source de données personnalisée de clusters Amazon Redshift (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Exemples de sources de données personnalisées Snowflake](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Exemples de sources de données personnalisées Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Exemples de sources de données personnalisées de streaming](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Exemples de source de données personnalisée de clusters Amazon Redshift (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift propose un pilote JDBC qui peut être utilisé pour lire des données avec Spark. Pour obtenir des informations sur le téléchargement du pilote JDBC Amazon Redshift, consultez [Téléchargement du pilote JDBC Amazon Redshift version 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Pour créer la classe de sources de données Amazon Redshift personnalisée, vous devez remplacer la méthode `read_data` à partir des [Sources de données personnalisées](feature-store-feature-processor-data-sources-custom.md). 

Pour vous connecter à un cluster Amazon Redshift, vous avez besoin des éléments suivants :
+ URL JDBC Amazon Redshift (`{{jdbc-url}}`)

  Pour obtenir des informations sur l'obtention de votre URL JDBC Amazon Redshift, consultez [Obtention de l'URL JDBC](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) dans le Guide du développeur de base de données Amazon Redshift.
+ Nom d'utilisateur (`{{redshift-user}}`) et mot de passe (`{{redshift-password}}`) Amazon Redshift

  Pour obtenir des informations sur la manière de créer et de gérer des utilisateurs de base de données à l'aide des commandes SQL Amazon Redshift, consultez [Utilisateurs](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) dans le Guide du développeur de base de données Amazon Redshift.
+ Nom de table Amazon Redshift (`{{redshift-table-name}}`)

  Pour obtenir des informations sur la manière de créer une table à partir de quelques exemples, consultez [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) dans le Guide du développeur de base de données Amazon Redshift.
+ (Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (`{{secret-redshift-account-info}}`) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Amazon Redshift dans Secrets Manager.

  Pour plus d'informations sur Secrets Manager, consultez la section [Rechercher des secrets AWS Secrets Manager dans](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) le Guide de AWS Secrets Manager l'utilisateur. 
+ Région AWS (`{{your-region}}`)

  Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) dans la documentation de Boto3.

L'exemple suivant montre comment récupérer l'URL JDBC et le jeton d'accès personnel depuis Secrets Manager et comment remplacer `read_data` pour votre classe de sources de données personnalisée, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "{{redshift-resource-arn}}"
    
    def read_data(self, spark, params):
        url = "{{jdbc-url}}?user={{redshift-user}}&password={{redshift-password}}"
        aws_iam_role_arn = "{{redshift-command-access-role}}"
        secret_name = "{{secret-redshift-account-info}}"
        region_name = "{{your-region}}"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("{{jdbc-url}}", secrets["jdbcurl"]).replace("{{redshift-user}}", secrets['username']).replace("{{redshift-password}}", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "{{redshift-table-name}}") \
             .option("tempdir", "s3a://{{your-bucket-name}}/{{your-bucket-prefix}}") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

L'exemple suivant montre comment connecter `RedshiftDataSource` à votre décorateur `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Pour exécuter la tâche de l’intégrateur de caractéristiques à distance, vous devez fournir le pilote jdbc en définissant `SparkConfig` et le transmettre au décorateur `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemples de sources de données personnalisées Snowflake
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake fournit un connecteur Spark qui peut être utilisé pour votre décorateur `feature_processor`. Pour obtenir des informations sur le connecteur Snowflake pour Spark, consultez [Connecteur Snowflake pour Spark](https://docs.snowflake.com/en/user-guide/spark-connector) dans la documentation Snowflake.

Pour créer la classe de sources de données Snowflake personnalisée, vous devez remplacer la méthode `read_data` à partir des [Sources de données personnalisées](feature-store-feature-processor-data-sources-custom.md) et ajouter les packages du connecteur Spark au chemin de classe Spark. 

Pour vous connecter à une source de données Snowflake, vous avez besoin des éléments suivants :
+ URL Snowflake (`{{sf-url}}`)

   URLs Pour plus d'informations sur l'accès aux interfaces Web de Snowflake, consultez la section [Identifiants de compte](https://docs.snowflake.com/en/user-guide/admin-account-identifier) dans la documentation de Snowflake.
+ Base de données Snowflake (`{{sf-database}}`) 

  Pour obtenir des informations sur l'obtention du nom de votre base de données à l'aide de Snowflake, consultez [CURRENT\_DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) dans la documentation de Snowflake.
+ Schéma de base de données Snowflake (`{{sf-schema}}`) 

  Pour en savoir plus sur l'obtention du nom de votre schéma à l'aide de Snowflake, consultez [CURRENT\_SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) dans la documentation de Snowflake.
+ Entrepôt Snowflake (`{{sf-warehouse}}`)

  Pour obtenir des informations sur l'obtention du nom de votre entrepôt à l'aide de Snowflake, consultez [CURRENT\_WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) dans la documentation de Snowflake.
+ Nom de table Snowflake (`{{sf-table-name}}`)
+ (Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (`{{secret-snowflake-account-info}}`) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Snowflake dans Secrets Manager. 

  Pour plus d'informations sur Secrets Manager, consultez la section [Rechercher des secrets AWS Secrets Manager dans](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) le Guide de AWS Secrets Manager l'utilisateur. 
+ Région AWS (`{{your-region}}`)

  Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) dans la documentation de Boto3.

L'exemple suivant montre comment récupérer le nom d'utilisateur et le mot de passe Snowflake depuis Secrets Manager et comment remplacer la fonction `read_data` pour votre classe de sources de données personnalisée `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "{{sf-url}}",
        "sfDatabase" : "{{sf-database}}",
        "sfSchema" : "{{sf-schema}}",
        "sfWarehouse" : "{{sf-warehouse}}",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "{{sf-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-snowflake-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "{{sf-table-name}}") \
                        .load()
```

L'exemple suivant montre comment connecter `SnowflakeDataSource` à votre décorateur `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Pour exécuter la tâche de l'intégrateur de fonctionnalités à distance, vous devez fournir les packages en définissant `SparkConfig` et les transmettre au décorateur `@remote`. Dans l’exemple suivant, les packages Spark sont tels que `spark-snowflake_2.12` correspond à la version Scala de l’intégrateur de caractéristiques, `2.12.0` à la version de Snowflake que vous souhaitez utiliser et `spark_3.3` à la version Spark de l’intégrateur de caractéristiques. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="{{feature-group-arn>}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemples de sources de données personnalisées Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark peut lire les données de Databricks à l'aide du pilote JDBC Databricks. Pour obtenir des informations sur le pilote JDBC Databricks, consultez [Configuration des pilotes ODBC et JDBC Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) (langue française non garantie) dans la documentation de Databricks.

**Note**  
Vous pouvez lire les données de n'importe quelle autre base de données en incluant le pilote JDBC correspondant dans le chemin de classe Spark. Pour plus d'informations, consultez [JDBC vers d'autres bases de données](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) (langue française non garantie) dans le Guide de Spark SQL.

Pour créer la classe de sources de données Databricks personnalisée, vous devez remplacer la méthode `read_data` à partir des [Sources de données personnalisées](feature-store-feature-processor-data-sources-custom.md) et ajouter le fichier JAR JDBC au chemin de classe Spark. 

Pour vous connecter à une source de données Databricks, vous avez besoin des éléments suivants :
+ URL Databricks (`{{databricks-url}}`)

  Pour obtenir des informations sur votre URL Databricks, consultez [Création de l'URL de connexion pour le pilote Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) (langue française non garantie) dans la documentation de Databricks.
+ Jeton d'accès personnel Databricks (`{{personal-access-token}}`)

  Pour obtenir des informations sur votre jeton d'accès Databricks, consultez [Authentification par jeton d'accès personnel Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) (langue française non garantie) dans la documentation de Databricks.
+ Nom de catalogue de données (`{{db-catalog}}`) 

  Pour obtenir des informations sur le nom de votre catalogue Databricks, consultez [Nom de catalogue](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) (langue française non garantie) dans la documentation de Databricks.
+ Nom de schéma (`{{db-schema}}`)

  Pour obtenir des informations sur le nom de votre schéma Databricks, consultez [Nom de schéma](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) (langue française non garantie) dans la documentation de Databricks.
+ Nom de table (`{{db-table-name}}`)

  Pour obtenir des informations sur le nom de votre table Databricks, consultez [Nom de table](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) (langue française non garantie) dans la documentation de Databricks.
+ (Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (`{{secret-databricks-account-info}}`) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Databricks dans Secrets Manager. 

  Pour plus d'informations sur Secrets Manager, consultez la section [Rechercher des secrets AWS Secrets Manager dans](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) le Guide de AWS Secrets Manager l'utilisateur. 
+ Région AWS (`{{your-region}}`)

  Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) dans la documentation de Boto3.

L'exemple suivant montre comment récupérer l'URL JDBC et le jeton d'accès personnel depuis Secrets Manager et comment remplacer `read_data` pour votre classe de sources de données personnalisée `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "{{databricks-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-databricks-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("{{personal-access-token}}", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`{{db-catalog}}`.`{{db-schema}}`.`{{db-table-name}}`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

L'exemple suivant montre comment charger le fichier JAR de pilote JDBC, `{{jdbc-jar-file-name}}.jar`, sur Amazon S3 afin de l'ajouter au chemin de classe Spark. Pour obtenir des informations sur le téléchargement du pilote JDBC Spark (`{{jdbc-jar-file-name}}.jar`) depuis Databricks, consultez [Téléchargement du pilote JDBC](https://www.databricks.com/spark/jdbc-drivers-download) (langue française non garantie) sur le site Web de Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"}
)
def transform(input_df):
    return input_df
```

Pour exécuter la tâche de l’intégrateur de caractéristiques à distance, vous devez fournir les fichiers JAR en définissant `SparkConfig` et les transmettre au décorateur `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemples de sources de données personnalisées de streaming
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Vous pouvez vous connecter à des sources de données de streaming telles qu’Amazon Kinesis, et créer des transformations avec Spark Structured Streaming pour lire à partir de sources de données de streaming. Pour plus d'informations sur le connecteur Kinesis, consultez la section Connecteur [Kinesis pour Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis) in. GitHub Pour en savoir plus sur Amazon Kinesis, consultez [Présentation d’Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) dans le Guide du développeur Amazon Kinesis.

Pour créer la classe de sources de données Amazon Kinesis personnalisée, vous devez étendre la classe `BaseDataSource` et remplacer la méthode `read_data` à partir des [Sources de données personnalisées](feature-store-feature-processor-data-sources-custom.md).

Pour vous connecter à un flux de données Amazon Kinesis, vous avez besoin des éléments suivants :
+ ARN Kinesis (`{{kinesis-resource-arn}}`) 

  Pour plus d'informations sur le flux de données Kinesis ARNs, consultez [Amazon Resource Names (ARNs) for Kinesis Data Streams dans le manuel Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) Developer Guide.
+ Nom du flux de données Kinesis (`{{kinesis-stream-name}}`)
+ Région AWS (`{{your-region}}`)

  Pour en savoir plus sur l’obtention du nom de région de votre session en cours à l’aide du kit SDK pour Python (Boto3), consultez [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) dans la documentation de Boto3.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "{{kinesis-resource-arn}}"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "{{kinesis-stream-name}}") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.{{your-region}}.amazonaws.com")
            .load()
```

L’exemple suivant montre comment connecter `KinesisDataSource` à votre décorateur `feature_processor`. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "{{feature-group-arn}}"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="{{feature-group-arn}}"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "{{s3a://checkpoint-path}}")
        .start()
    )
    output_stream.awaitTermination()
```

Dans l’exemple de code ci-dessus, nous utilisons quelques options de Spark Structured Streaming pour diffuser des micro-lots dans votre groupe de caractéristiques. Pour obtenir la liste complète des options, consultez [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) dans la documentation d’Apache Spark. 
+ Le mode récepteur `foreachBatch` est une fonctionnalité qui vous permet d’appliquer des opérations et d’écrire la logique sur les données de sortie de chaque micro-lot d’une requête de streaming. 

  Pour plus d'informations`foreachBatch`, consultez la section [Utilisation de Foreach et](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) le guide ForeachBatch de programmation de streaming structuré d'Apache Spark. 
+ L’option `checkpointLocation` enregistre régulièrement l’état de l’application de streaming. Le journal de streaming est enregistré à l’emplacement `{{s3a://checkpoint-path}}` du point de contrôle.

  Pour obtenir des informations sur l’option `checkpointLocation`, consultez [Recovering from Failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) dans le Guide de programmation de streaming structuré d’Apache Spark. 
+ Le paramètre `trigger` définit la fréquence à laquelle le traitement par micro-lots est déclenché dans une application de streaming. Dans cet exemple, le type de déclencheur du temps de traitement est utilisé avec des intervalles de micro-lots d’une minute, spécifiés par `trigger(processingTime="1 minute")`. Pour effectuer un remplissage à partir d’une source de flux, vous pouvez utiliser le type de déclencheur available-now, spécifié par `trigger(availableNow=True)`.

  Pour obtenir la liste complète des types `trigger`, consultez [Triggers](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) dans le Guide de programmation de streaming structuré d’Apache Spark.

**Streaming continu et nouvelles tentatives automatiques à l’aide de déclencheurs basés sur des événements**

Le Feature Processor utilise la SageMaker formation comme infrastructure de calcul et sa durée d'exécution maximale est de 28 jours. Vous pouvez utiliser des déclencheurs basés sur des événements pour prolonger votre streaming continu sur une plus longue période et récupérer d’échecs temporaires. Pour plus d’informations sur les exécutions basées sur la planification et les événements, consultez [Exécutions planifiées et basées sur des événements pour les pipelines d’intégrateur de caractéristiques](feature-store-feature-processor-schedule-pipeline.md).

Voici un exemple de configuration d’un déclencheur basé sur les événements pour que le pipeline d’intégrateur de caractéristiques de streaming continue à fonctionner en continu. Cela utilise la fonction de transformation de streaming définie dans l’exemple précédent. Un pipeline cible peut être configuré pour être déclenché lorsqu’un événement `STOPPED` ou `FAILED` se produit pour l’exécution d’un pipeline source. Notez que le même pipeline est utilisé comme source et cible afin qu’il fonctionne en continu.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "{{streaming-pipeline}}"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```