

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Beispiele für benutzerdefinierte Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Dieser Abschnitt enthält Beispiele für Implementierungen benutzerdefinierter Datenquellen für Feature-Prozessoren. Weitere Informationen zu gemeinsamen Datenquellen finden Sie unter [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md).

Sicherheit ist eine gemeinsame Verantwortung zwischen AWS und unseren Kunden. AWS ist verantwortlich für den Schutz der Infrastruktur, auf der die Dienste in der ausgeführt AWS Cloud werden. Kunden sind für alle erforderlichen Sicherheitskonfigurations- und Verwaltungsaufgaben verantwortlich. Beispielsweise sollten Geheimnisse wie Zugangsdaten für Datenspeicher in Ihren benutzerdefinierten Datenquellen nicht fest codiert sein. Sie können diese Anmeldeinformationen AWS Secrets Manager zur Verwaltung verwenden. Informationen zu Secrets Manager finden Sie unter [Was ist AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) im AWS Secrets Manager Benutzerhandbuch. In den folgenden Beispielen wird Secrets Manager für Ihre Anmeldeinformationen verwendet.

**Topics**
+ [Beispiele für benutzerdefinierte Amazon Redshift Clusters (JDBC)-Datenquellen](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Beispiele für benutzerdefinierte Snowflake-Datenquellen](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Beispiele für benutzerdefinierte Datenquellen von Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Beispiele für das Streamen benutzerdefinierter Datenquellen](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Beispiele für benutzerdefinierte Amazon Redshift Clusters (JDBC)-Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift bietet einen JDBC-Treiber, der zum Lesen von Daten mit Spark verwendet werden kann. Informationen zum Herunterladen des Amazon Redshift JDBC-Treibers finden Sie unter [Herunterladen des Amazon Redshift JDBC-Treibers](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html), Version 2.1. 

Um die benutzerdefinierte Amazon Redshift Redshift-Datenquellenklasse zu erstellen, müssen Sie die `read_data` Methode aus der [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) überschreiben. 

Um eine Verbindung mit einem Amazon Redshift Redshift-Cluster herzustellen, benötigen Sie:
+ Amazon Redshift JDBC-URL (`{{jdbc-url}}`)

  Informationen zum Abrufen Ihrer Amazon Redshift JDBC-URL finden Sie unter [Getting the JDBC URL](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) im Datenbankentwicklerhandbuch zu Amazon Redshift.
+ Amazon Redshift Redshift-Benutzername (`{{redshift-user}}`) und Passwort (`{{redshift-password}}`)

  Informationen zum Erstellen und Verwalten von Datenbankbenutzern mithilfe der Amazon-Redshift-SQL-Befehle finden Sie unter [Benutzer](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) im Amazon-Redshift-Dabase-Entwicklerhandbuch.
+ Name der Amazon-Redshift-Tabelle (`{{redshift-table-name}}`)

  Informationen zum Erstellen einer Tabelle mit einigen Beispielen finden Sie unter [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) im Datenbankentwicklerhandbuch zu Amazon Redshift.
+ (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (`{{secret-redshift-account-info}}`), unter dem Sie Ihren Amazon Redshift Redshift-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern.

  Informationen zu Secrets Manager [finden Sie unter Find Secrets AWS Secrets Manager im](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) AWS Secrets Manager Benutzerhandbuch. 
+ AWS-Region (`{{your-region}}`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die `read_data` für Ihre benutzerdefinierte Datenquellenklasse, `DatabricksDataSource` überschreiben.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "{{redshift-resource-arn}}"
    
    def read_data(self, spark, params):
        url = "{{jdbc-url}}?user={{redshift-user}}&password={{redshift-password}}"
        aws_iam_role_arn = "{{redshift-command-access-role}}"
        secret_name = "{{secret-redshift-account-info}}"
        region_name = "{{your-region}}"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("{{jdbc-url}}", secrets["jdbcurl"]).replace("{{redshift-user}}", secrets['username']).replace("{{redshift-password}}", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "{{redshift-table-name}}") \
             .option("tempdir", "s3a://{{your-bucket-name}}/{{your-bucket-prefix}}") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

Das folgende Beispiel zeigt, wie Sie eine Verbindung `RedshiftDataSource` zu Ihrem `feature_processor` Dekorateur herstellen.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Um den Featureprozessor-Job remote auszuführen, müssen Sie den JDBC-Treiber bereitstellen, indem Sie ihn definieren `SparkConfig` und an den `@remote` Decorator übergeben.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Beispiele für benutzerdefinierte Snowflake-Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake bietet einen Spark-Konnektor, der für Ihren `feature_processor` Dekorateur verwendet werden kann. Informationen zum Snowflake-Konnektor für Spark finden Sie unter Snowflake-Konnektor für Spark in der [Snowflake-Dokumentation](https://docs.snowflake.com/en/user-guide/spark-connector).

Um die benutzerdefinierte Snowflake-Datenquellenklasse zu erstellen, müssen Sie die `read_data` Methode aus dem [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) überschreiben und die Spark-Connector-Pakete zum Spark-Klassenpfad hinzufügen. 

Um eine Verbindung mit einer Snowflake-Datenquelle herzustellen, benötigen Sie:
+ Snowflake-URL (`{{sf-url}}`)

  Informationen URLs zum Zugriff auf Snowflake-Weboberflächen finden Sie unter [Konto-Identifikatoren](https://docs.snowflake.com/en/user-guide/admin-account-identifier) in der Snowflake-Dokumentation.
+ Snowflake-Datenbank (`{{sf-database}}`) 

  Informationen zum Abrufen des Namens Ihrer Datenbank mit Snowflake finden Sie unter [CURRENT\_DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) in der Snowflake-Dokumentation.
+ Snowflake-Datenbankschema (`{{sf-schema}}`) 

  Informationen zum Abrufen des Namens Ihres Schemas mithilfe von Snowflake finden Sie unter [CURRENT\_SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) in der Snowflake-Dokumentation.
+ Snowflake-Warehouse (`{{sf-warehouse}}`)

  Informationen zum Abrufen des Namens Ihres Warehouse mithilfe von Snowflake finden Sie unter [CURRENT\_WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) in der Snowflake-Dokumentation.
+ Name der Snowflake-Tabelle (`{{sf-table-name}}`)
+ (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (`{{secret-snowflake-account-info}}`), unter dem Sie Ihren Snowflake-Zugriffsbenutzernamen und Ihr Passwort in Secrets Manager speichern. 

  Informationen zu Secrets Manager [finden Sie unter Find Secrets AWS Secrets Manager im](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) AWS Secrets Manager Benutzerhandbuch. 
+ AWS-Region (`{{your-region}}`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie den Snowflake-Benutzernamen und das Kennwort aus Secrets Manager abrufen und die `read_data` Funktion für Ihre benutzerdefinierte Datenquellenklasse überschreiben. `SnowflakeDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "{{sf-url}}",
        "sfDatabase" : "{{sf-database}}",
        "sfSchema" : "{{sf-schema}}",
        "sfWarehouse" : "{{sf-warehouse}}",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "{{sf-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-snowflake-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "{{sf-table-name}}") \
                        .load()
```

Das folgende Beispiel zeigt, wie Sie eine Verbindung `SnowflakeDataSource` zu Ihrem `feature_processor` Dekorateur herstellen.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Um den Feature-Prozessor-Job remote auszuführen, müssen Sie die Pakete per Definition `SparkConfig` bereitstellen und an den `@remote` Decorator übergeben. Bei den Spark-Paketen im folgenden Beispiel handelt es sich um die `spark-snowflake_2.12` Feature-Prozessor Scala-Version, `2.12.0` um die Snowflake-Version, die Sie verwenden möchten, und `spark_3.3` um die Feature-Prozessor Spark-Version. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="{{feature-group-arn>}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Beispiele für benutzerdefinierte Datenquellen von Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark kann mithilfe des Databricks JDBC-Treibers Daten aus Databricks lesen. Informationen zum Databricks-JDBC-Treiber finden Sie unter [Konfigurieren der Databricks-ODBC- und JDBC-Treiber in der](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) Databricks-Dokumentation.

**Anmerkung**  
Sie können Daten aus jeder anderen Datenbank lesen, indem Sie den entsprechenden JDBC-Treiber in den Spark-Klassenpfad aufnehmen. Weitere Informationen finden Sie unter [JDBC in anderen Datenbanken](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) in Apache Spark SQL.

Um die benutzerdefinierte Databricks-Datenquellenklasse zu erstellen, müssen Sie die `read_data` Methode aus dem überschreiben [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) und das JDBC-JAR zum Spark-Klassenpfad hinzufügen. 

Um eine Verbindung mit einer Databricks-Datenquelle herzustellen, benötigen Sie:
+ Databricks-URL (`{{databricks-url}}`)

  Informationen zu Ihrer Databricks-URL finden Sie unter [Erstellen der Verbindungs-URL für den Databricks-Treiber](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) in der Databricks-Dokumentation.
+ Persönliches Zugriffstoken von Databricks (`{{personal-access-token}}`)

  Informationen zu Ihrem Databricks-Zugriffstoken finden Sie unter Authentifizierung mit dem [persönlichen Zugriffstoken von Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) in der Databricks-Dokumentation.
+ Name des Datenkatalogs (`{{db-catalog}}`) 

  Informationen zu Ihrem Databricks-Katalognamen finden Sie unter [Katalogname](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) in der Databricks-Dokumentation.
+ Schemaname (`{{db-schema}}`)

  Informationen zu Ihrem Databricks-Schemanamen finden Sie unter Schemaname in der [Databricks-Dokumentation](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name).
+ Tabellenname (`{{db-table-name}}`)

  Informationen zu Ihrem Databricks-Tabellennamen finden Sie unter [Tabellenname](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) in derDatabricks-Dokumentation.
+ (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (`{{secret-databricks-account-info}}`), unter dem Sie Ihren Databricks-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern. 

  Informationen zu Secrets Manager [finden Sie unter Find Secrets AWS Secrets Manager im](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) AWS Secrets Manager Benutzerhandbuch. 
+ AWS-Region (`{{your-region}}`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die `read_data` für Ihre benutzerdefinierte Datenquellenklasse, `DatabricksDataSource` überschreiben.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "{{databricks-url}}"
    
    def read_data(self, spark, params):
        secret_name = "{{secret-databricks-account-info}}"
        region_name = "{{your-region}}"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("{{personal-access-token}}", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`{{db-catalog}}`.`{{db-schema}}`.`{{db-table-name}}`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

Das folgende Beispiel zeigt, wie der JDBC-Treiber jar, `{{jdbc-jar-file-name}}.jar`, auf Amazon S3 hochgeladen wird, um ihn dem Spark-Klassenpfad hinzuzufügen. Informationen zum Herunterladen des Spark-JDBC-Treibers (`{{jdbc-jar-file-name}}.jar`) von Databricks finden Sie unter [JDBC-Treiber herunterladen](https://www.databricks.com/spark/jdbc-drivers-download) auf der Databricks-Website.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output={{feature-group-arn}},
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"}
)
def transform(input_df):
    return input_df
```

Um den Featureprozessor-Job remote auszuführen, müssen Sie die JAR-Dateien `SparkConfig` durch Definition bereitstellen und an den `@remote` Decorator übergeben.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://{{your-bucket-name}}/{{your-bucket-prefix}}/{{jdbc-jar-file-name}}.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="{{feature-group-arn}}",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Beispiele für das Streamen benutzerdefinierter Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Sie können eine Verbindung zu Streaming-Datenquellen wie Amazon Kinesis herstellen und Transformationen mit Spark Structured Streaming erstellen, um aus Streaming-Datenquellen zu lesen. Informationen zum Kinesis-Konnektor finden Sie unter [Kinesis Connector for Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis) in. GitHub Weitere Informationen finden Sie unter [Was ist Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) im Entwicklerhandbuch für Amazon Kinesis Data Streams.

Um die benutzerdefinierte Amazon Kinesis Kinesis-Datenquellenklasse zu erstellen, müssen Sie die `BaseDataSource` Klasse erweitern und die `read_data` Methode von [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) überschreiben.

Zur Herstellung einer Verbindung mit einem Amazon Kinesis Kinesis-Daten-Stream benötigen Sie:
+ Kinesis ARN (`{{kinesis-resource-arn}}`) 

  Informationen zu Kinesis Data Stream ARNs finden Sie unter [Amazon Resource Names (ARNs) for Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) im Amazon Kinesis Developer Guide.
+ Kinesis-Datenstreamname (`{{kinesis-stream-name}}`)
+ AWS-Region (`{{your-region}}`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\_name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "{{kinesis-resource-arn}}"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "{{kinesis-stream-name}}") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.{{your-region}}.amazonaws.com")
            .load()
```

Das folgende Beispiel zeigt, wie Sie eine Verbindung `KinesisDataSource` zu Ihrem `feature_processor` Dekorateur herstellen. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "{{feature-group-arn}}"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="{{feature-group-arn}}"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "{{s3a://checkpoint-path}}")
        .start()
    )
    output_stream.awaitTermination()
```

Im obigen Beispielcode verwenden wir einige Spark-Optionen für strukturiertes Streaming, während wir Mikrobatches in Ihre Feature-Gruppe streamen. Eine vollständige Liste der Optionen finden Sie im [Leitfaden zur Programmierung von strukturiertem Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) in der Apache-Spark-Dokumentation. 
+ Der `foreachBatch` Sink-Modus ist eine Funktion, mit der Sie Operationen und Schreiblogik auf die Ausgabedaten jedes Mikrobatches einer Streaming-Abfrage anwenden können. 

  Informationen dazu finden Sie unter `foreachBatch` [Using Foreach und ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) im Apache Spark Structured Streaming Programming Guide. 
+ Die `checkpointLocation`-Option speichert regelmäßig den Status der Streaming-Anwendung. Das Streaming-Protokoll wird am Checkpoint gespeichert `{{s3a://checkpoint-path}}`.

  Informationen zu dieser `checkpointLocation` Option finden Sie unter [Wiederherstellung nach Fehlern mit Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) in der strukturierten Streaming-Programmierung von Apache Spark. 
+ Die `trigger` Einstellung definiert, wie oft die Mikro-Batch-Verarbeitung in einer Streaming-Anwendung ausgelöst wird. In diesem Beispiel wird der Triggertyp „Verarbeitungszeit“ mit Mikrobatch-Intervallen von einer Minute verwendet, die von `trigger(processingTime="1 minute")` spezifiziert sind. Für das Backfill aus einer Stream-Quelle können Sie den Triggertyp Available-now verwenden, der von `trigger(availableNow=True)` spezifiziert ist.

  Eine vollständige Liste der `trigger`-Typen finden Sie unter [Trigger](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) in der strukturierten Streaming-Programmierung von Apache Spark.

**Kontinuierliches Streaming und automatische Wiederholungsversuche mit ereignisbasierten Triggern**

Der Feature Processor verwendet SageMaker Training als Recheninfrastruktur und hat eine maximale Laufzeit von 28 Tagen. Sie können ereignisbasierte Trigger verwenden, um Ihr kontinuierliches Streaming über einen längeren Zeitraum zu verlängern und vorübergehende Ausfälle zu beheben. Weitere Informationen zu zeitplan- und ereignisbasierten Ausführungen finden Sie unter [Geplante und ereignisbasierte Ausführungen für Feature-Prozessor-Pipelines](feature-store-feature-processor-schedule-pipeline.md).

Im Folgenden finden Sie ein Beispiel für die Einrichtung eines ereignisbasierten Triggers, um die Streaming-Featureprozessor-Pipeline kontinuierlich am Laufen zu halten. Dabei wird die im vorherigen Beispiel definierte Streaming-Transformationsfunktion verwendet. Eine Ziel-Pipeline kann so konfiguriert werden, dass sie ausgelöst wird, wenn bei der `STOPPED` Ausführung oder `FAILED` Quellpipeline-Ereignis eintritt. Beachten Sie, dass dieselbe Pipeline als Quelle und Ziel verwendet wird, sodass sie kontinuierlich ausgeführt wird.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "{{streaming-pipeline}}"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```