

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Feature-Verarbeitung
<a name="feature-store-feature-processing"></a>

Amazon SageMaker Feature Store Feature Processing ist eine Funktion, mit der Sie Rohdaten in Funktionen für maschinelles Lernen (ML) umwandeln können. Es bietet Ihnen ein Feature-Prozessor-SDK, mit dem Sie Daten aus Batch-Datenquellen transformieren und in Ihre Feature-Gruppen aufnehmen können. Mit dieser Funktion kümmert sich Feature Store um die zugrunde liegende Infrastruktur, einschließlich der Bereitstellung der Rechenumgebungen und der Erstellung und Wartung von Pipelines zum Laden und Erfassen von Daten. Auf diese Weise können Sie sich auf Ihre Feature-Prozessor-Definitionen konzentrieren, die eine Transformationsfunktion (z. B. Anzahl der Produktansichten, Mittelwert des Transaktionswerts), Quellen (auf die diese Transformation angewendet werden soll) und Senken (in die die berechneten Feature-Werte geschrieben werden sollen) umfassen.

Die Feature-Prozessor-Pipeline ist eine Pipelines-Pipeline. Als Pipelines können Sie auch geplante Feature Processor-Pipelines mit SageMaker KI-Herkunft in der Konsole verfolgen. Weitere Informationen zu SageMaker AI Lineage finden Sie unter [Amazon SageMaker ML Lineage Tracking](lineage-tracking.md) Dazu gehören das Verfolgen von geplanten Ausführungen, das Visualisieren der Herkunft, um Features bis zu ihren Datenquellen zurückzuverfolgen, und das Anzeigen gemeinsam genutzter Feature-Prozessoren in einer einzigen Umgebung. Informationen zur Verwendung von Feature Store mit der Konsole finden Sie unter [Pipeline-Ausführungen von der Konsole aus anzeigen](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

**Topics**
+ [Feature Store Feature Processor SDK](feature-store-feature-processor-sdk.md)
+ [Feature Store Feature Processor remote ausführen](feature-store-feature-processor-execute-remotely.md)
+ [Feature Store Feature-Prozessor-Pipelines erstellen und ausführen](feature-store-feature-processor-create-execute-pipeline.md)
+ [Geplante und ereignisbasierte Ausführungen für Feature-Prozessor-Pipelines](feature-store-feature-processor-schedule-pipeline.md)
+ [Überwachen Sie die SageMaker Feature-Prozessor-Pipelines im Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md)
+ [IAM-Berechtigungen und Ausführungsrollen](feature-store-feature-processor-iam-permissions.md)
+ [Einschränkungen, Beschränkungen und Kontingente für Feature-Prozessoren](feature-store-feature-processor-quotas.md)
+ [Datenquellen](feature-store-feature-processor-data-sources.md)
+ [Beispiel für Feature-Verarbeitungs-Code für allgemeine Anwendungsfälle](feature-store-feature-processor-examples.md)

# Feature Store Feature Processor SDK
<a name="feature-store-feature-processor-sdk"></a>

Deklarieren Sie eine Feature Store Feature Processor-Definition, indem Sie Ihre Transformationsfunktionen mit dem `@feature_processor` Decorator dekorieren. Das SageMaker AI SDK for Python (Boto3) lädt automatisch Daten aus den konfigurierten Eingabedatenquellen, wendet die dekorierte Transformationsfunktion an und nimmt die transformierten Daten dann in eine Ziel-Feature-Gruppe auf. Dekorierte Transformationsfunktionen müssen der erwarteten Signatur des `@feature_processor` Dekorators entsprechen. Weitere Informationen zum `@feature_processor` Dekorateur finden Sie unter [@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) im Amazon SageMaker Feature Store. Lesen Sie die Dokumente. 

Mit dem `@feature_processor` Decorator läuft Ihre Transformationsfunktion in einer Spark-Laufzeitumgebung, in der die für Ihre Funktion bereitgestellten Eingabeargumente und ihr Rückgabewert Spark sind. DataFrames Die Anzahl der Eingabeparameter in Ihrer Transformationsfunktion muss der Anzahl der im `@feature_processor` Decorator konfigurierten Eingaben entsprechen. 

Weitere Informationen zum `@feature_processor` Decorator finden Sie unter [Feature Processor Feature Store SDK for Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor).

Der folgende Code enthält grundlegende Beispiele für die Verwendung des `@feature_processor` Decorators. Spezifischere Anwendungsbeispiele finden Sie unter [Beispiel für Feature-Verarbeitungs-Code für allgemeine Anwendungsfälle](feature-store-feature-processor-examples.md).

Das Feature Processor SDK kann mit dem folgenden Befehl aus dem SageMaker Python-SDK und seinen Extras installiert werden. 

```
pip install sagemaker[feature-processor]
```

In den folgenden Beispielen ist `us-east-1` die Region der Ressource, `111122223333` ist die Konto-ID des Ressourcenbesitzers und `your-feature-group-name` ist der Name der Feature-Gruppe.

Im Folgenden finden Sie eine grundlegende Feature-Prozessor-Definition, bei der der `@feature_processor` Decorator eine CSV-Eingabe von Amazon S3 konfiguriert, die geladen und für Ihre Transformationsfunktion bereitgestellt wird (z. B. `transform`), und sie für die Aufnahme in eine Feature-Gruppe vorbereitet. In der letzten Zeile wird es ausgeführt.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

Schließen Sie den Parameter `@feature_processor` ein.
+ `inputs`(List [str]): Eine Liste von Datenquellen, die in Ihrem Feature Store Feature Processor verwendet werden. Wenn es sich bei Ihren Datenquellen um Feature-Gruppen handelt oder sie in Amazon S3 gespeichert sind, können Sie möglicherweise die vom Feature Store bereitgestellten Datenquellendefinitionen für den Feature-Prozessor verwenden. Eine vollständige Liste der vom Feature Store bereitgestellten Datenquellendefinitionen finden Sie unter [Feature Processor Data Source](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source) im Amazon SageMaker Feature Store Read the Docs.
+ `output`(str): Der ARN der Feature-Gruppe, um die Ausgabe der dekorierten Funktion aufzunehmen.
+ `target_stores`(Optional [List [str]]): Eine Liste von Speichern (zum Beispiel `OnlineStore` oder`OfflineStore`), die in die Ausgabe aufgenommen werden sollen. Falls nicht angegeben, werden Daten in alle aktivierten Speicher der Ausgabe-Feature-Gruppe aufgenommen.
+ `parameters`(Dict [str, Any]): Ein Wörterbuch, das für Ihre Transformationsfunktion bereitgestellt werden soll. 
+ `enable_ingestion`(bool): Eine Markierung, die angibt, ob die Ausgaben der Transformationsfunktion in die Ausgabe-Feature-Gruppe aufgenommen werden. Dieses Flag ist während der Entwicklungsphase nützlich. Falls nicht angegeben, ist die Aufnahme aktiviert.

Zu den optionalen umschlossenen Funktionsparametern (als Argument bereitgestellt, sofern in der Funktionssignatur angegeben) gehören:
+ `params`(Dict [str, Any]): Das in den `@feature_processor` Parametern definierte Wörterbuch. Es enthält auch vom System konfigurierte Parameter, auf die mit dem Schlüssel verwiesen werden kann `system`, z. B. den `scheduled_time` Parameter.
+ `spark`(SparkSession): Ein Verweis auf die SparkSession Instance, die für die Spark-Anwendung initialisiert wurde.

Das folgende Code ist ein Beispiel für die Benutzung von `params` und `spark` Parameter.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

Der `scheduled_time` Systemparameter (im `params` Argument Ihrer Funktion angegeben) ist ein wichtiger Wert, der es ermöglicht, bei jeder Ausführung erneut zu versuchen. Der Wert kann dabei helfen, die Ausführung des Feature Processor eindeutig zu identifizieren, und er kann als Referenzpunkt für datumsbereichsbasierte Eingaben verwendet werden (z. B. nur die Daten der letzten 24 Stunden laden), um sicherzustellen, dass der Eingabebereich unabhängig von der tatsächlichen Ausführungszeit des Codes ist. Wenn der Feature-Prozessor nach einem Zeitplan ausgeführt wird (siehe [Geplante und ereignisbasierte Ausführungen für Feature-Prozessor-Pipelines](feature-store-feature-processor-schedule-pipeline.md)), ist sein Wert auf die Zeit festgelegt, zu der er ausgeführt werden soll. Das Argument kann während der synchronen Ausführung mithilfe der Ausführungs-API des SDK überschrieben werden, um Anwendungsfälle wie Daten-Backfills oder das erneute Ausführen einer verpassten Ausführung zu unterstützen. Sein Wert ist die aktuelle Uhrzeit, wenn der Feature Processor auf andere Weise ausgeführt wird.

Informationen zur Erstellung von Spark-Code finden Sie im [Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html).

Weitere Codebeispiele für gängige Anwendungsfälle finden Sie im [Beispiel für Feature-Verarbeitungs-Code für allgemeine Anwendungsfälle](feature-store-feature-processor-examples.md). 

Beachten Sie, dass Transformationsfunktionen, die mit `@feature_processor` gekennzeichnet sind, keinen Wert zurückgeben. Um Ihre Funktion programmgesteuert zu testen, können Sie den `@feature_processor` Decorator entfernen oder patchen, sodass er als Passthrough für die umschlossene Funktion fungiert. Weitere Informationen zum `@feature_processor` Decorator finden Sie unter [Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html). 

# Feature Store Feature Processor remote ausführen
<a name="feature-store-feature-processor-execute-remotely"></a>

Um Ihre Feature-Prozessoren auf großen Datensätzen auszuführen, für die Hardware erforderlich ist, die leistungsfähiger ist als die lokal verfügbare, können Sie Ihren Code mit dem `@remote` Decorator dekorieren, um Ihren lokalen Python-Code als verteilten SageMaker Trainingsjob mit einem oder mehreren Knoten auszuführen. Weitere Informationen zur Ausführung Ihres Codes als SageMaker Trainingsjob finden Sie unter. [Führen Sie Ihren lokalen Code als SageMaker Trainingsjob aus](train-remote-decorator.md) 

Im Folgenden finden Sie ein Anwendungsbeispiel für den `@remote` Decorator zusammen mit dem `@feature_processor` Decorator.

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

Der `spark_config` Parameter gibt an, dass der Remote-Job als Spark-Anwendung ausgeführt wird. Die `SparkConfig` Instanz kann verwendet werden, um die Spark-Konfiguration zu konfigurieren und zusätzliche Abhängigkeiten für die Spark-Anwendung bereitzustellen, z. B. Python-Dateien JARs, und -Dateien.

Für schnellere Iterationen bei der Entwicklung Ihres Feature-Verarbeitungscodes können Sie das `keep_alive_period_in_seconds` Argument im `@remote` Decorator angeben, um die konfigurierten Ressourcen für nachfolgende Trainingsaufgaben in einem warmen Pool aufzubewahren. Weitere Informationen über warme Pools finden Sie unter `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)` im API Reference Guide.

Im Folgenden Code sehen Sie ein Beispiel für eine lokale `requirements.txt:`

```
sagemaker>=2.167.0
```

Dadurch wird die entsprechende SageMaker SDK-Version im Remote-Job installiert, die für die Ausführung der Methode mit den Anmerkungen von `@feature-processor` erforderlich ist. 

# Feature Store Feature-Prozessor-Pipelines erstellen und ausführen
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

Mit dem Feature Processor SDK APIs können Sie Ihre Feature-Processor-Definitionen in eine vollständig verwaltete SageMaker KI-Pipeline umwandeln. Weitere Informationen zu Pipelines finden Sie unter [Übersicht über die Pipelines](pipelines-overview.md). Verwenden Sie die `to_pipeline` API zusammen mit Ihrer Feature-Prozessor-Definition, um Ihre Feature-Prozessor-Definitionen in eine SageMaker AI-Pipeline umzuwandeln. Sie können die Ausführung Ihrer Feature-Prozessor-Definition planen, sie anhand von CloudWatch Metriken operativ überwachen und sie so integrieren, EventBridge dass sie als Ereignisquellen oder Abonnenten dienen. Weitere Informationen zur Überwachung von Pipelines, die mit Pipelines erstellt wurden, finden Sie unter [Überwachen Sie die SageMaker Feature-Prozessor-Pipelines im Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md).

Informationen zur Anzeige Ihrer Feature-Prozessor-Pipelines finden Sie unter [Pipeline-Ausführungen von der Konsole aus anzeigen](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

Wenn Ihre Funktion auch mit dem `@remote` Decorator ausgestattet ist, werden dessen Konfigurationen in die Feature-Prozessor-Pipeline übertragen. Mithilfe des `@remote` Decorators können Sie erweiterte Konfigurationen wie Typ und Anzahl der Rechen-Instances, Laufzeitabhängigkeiten sowie Netzwerk- und Sicherheitskonfigurationen angeben.

Im folgenden Beispiel wird und verwendet. `to_pipeline` `execute` APIs

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

Die `to_pipeline` API ist semantisch gesehen ein Upsert-Vorgang. Sie aktualisiert die Pipeline, falls sie bereits existiert; Andernfalls wird eine Pipeline erstellt.

Die `to_pipeline` API akzeptiert optional einen Amazon S3 S3-URI, der auf eine Datei verweist, die die Feature Processor-Definition enthält, um sie mit der Feature Processor-Pipeline zu verknüpfen, um die Transformationsfunktion und ihre Versionen in ihrer SageMaker KI-Machine-Learning-Herkunft nachzuverfolgen.

Um eine Liste aller Feature-Prozessor-Pipelines in Ihrem Konto abzurufen, können Sie die API `list_pipelines` verwenden. Eine nachfolgende Anfrage an die `describe`-API gibt Details zur Feature-Prozessor-Pipeline zurück, einschließlich, aber nicht beschränkt auf Pipelines und Zeitplandetails.

Im folgenden Beispiel wird `list_pipelines` und `describe` APIs verwendet.

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# Geplante und ereignisbasierte Ausführungen für Feature-Prozessor-Pipelines
<a name="feature-store-feature-processor-schedule-pipeline"></a>

Die Ausführung von SageMaker Feature Processing-Pipelines im Amazon Feature Store kann so konfiguriert werden, dass sie automatisch und asynchron auf der Grundlage eines vorkonfigurierten Zeitplans oder als Ergebnis eines anderen AWS Serviceereignisses gestartet werden. Sie können beispielsweise festlegen, dass Feature-Verarbeitungs-Pipelines am ersten jedes Monats ausgeführt werden, oder Sie können zwei Pipelines miteinander verketten, sodass eine Zielpipeline automatisch ausgeführt wird, nachdem die Ausführung einer Quell-Pipeline abgeschlossen ist.

**Topics**
+ [Ausführungen auf der Grundlage von Zeitplänen](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [Auf Ereignissen basierende Ausführungen](#feature-store-feature-processor-schedule-pipeline-event-based)

## Ausführungen auf der Grundlage von Zeitplänen
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

Das Feature Processor SDK bietet eine [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)API zur wiederkehrenden Ausführung von Feature Processor-Pipelines mit Amazon EventBridge Scheduler-Integration. Der Zeitplan kann mit einem`at`, oder `cron` -Ausdruck angegeben werden`rate`, indem der [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)Parameter mit denselben Ausdrücken verwendet wird, die von Amazon unterstützt werden EventBridge. Die Zeitplan-API ist semantisch gesehen ein Upsert-Vorgang, da sie den Zeitplan aktualisiert, falls er bereits existiert; andernfalls erstellt sie ihn. Weitere Informationen zu den EventBridge Ausdrücken und Beispielen finden Sie unter [Zeitplantypen auf EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html) im EventBridge Scheduler-Benutzerhandbuch.

In den folgenden Beispielen wird die Feature-Prozessor-[https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)-API mit den Ausdrücken `at`, `rate`, und `cron` verwendet.

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

Die Standardzeitzone für Datums- und Uhrzeiteingaben in der `schedule`-API ist UTC. Weitere Informationen zu EventBridge Scheduler-Zeitplanausdrücken finden Sie [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)in der EventBridge Scheduler-API-Referenzdokumentation.

Geplante Feature-Prozessor-Pipeline-Ausführungen stellen Ihrer Transformationsfunktion die geplante Ausführungszeit zur Verfügung, die als Idempotenz-Token oder als fester Bezugspunkt für datumsbereichsbasierte Eingaben verwendet werden kann. Um einen Zeitplan zu deaktivieren (d. h. anzuhalten) oder erneut zu aktivieren, verwenden Sie den `state` [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)-API-Parameter mit `‘DISABLED’` bzw. `‘ENABLED’`.

Weitere Informationen über RPO-Funktion finden Sie unter [Feature-Prozessor SDK-Datenquellen](feature-store-feature-processor-data-sources-sdk.md). 

## Auf Ereignissen basierende Ausführungen
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

Eine Feature-Verarbeitungs-Pipeline kann so konfiguriert werden, dass sie automatisch ausgeführt wird, wenn ein AWS Ereignis eintritt. Das Feature-Verarbeitungs-SDK bietet eine [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) Funktion, die eine Liste von Quellereignissen und eine Zielpipeline akzeptiert. Bei den Quellereignissen muss es sich um Instances von [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent)handeln, was eine Pipeline und Ereignisse zum [Ausführungsstatus](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus) angibt. 

Die `put_trigger` Funktion konfiguriert eine EventBridge Amazon-Regel und ein Ziel für die Weiterleitung von Ereignissen und ermöglicht es Ihnen, ein EventBridge Ereignismuster anzugeben, um auf jedes AWS Ereignis zu reagieren. Informationen zu diesen Konzepten finden Sie unter EventBridge [Regeln](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html), [Ziele](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html) und [Ereignismuster](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html) von Amazon.

Trigger können aktiviert oder deaktiviert werden. EventBridge startet eine Ziel-Pipeline-Ausführung unter Verwendung der im `role_arn` `put_trigger` API-Parameter angegebenen Rolle. Die Ausführungsrolle wird standardmäßig verwendet, wenn das SDK in einer Amazon SageMaker Studio Classic- oder Notebook-Umgebung verwendet wird. Weitere Informationen zum Abrufen Ihrer Ausführungsrolle finden Sie unter [Abrufen Ihrer Ausführungsrolle](sagemaker-roles.md#sagemaker-roles-get-execution-role).

Im folgenden Beispiel wird auf festgelegt.
+ Eine SageMaker AI-Pipeline, die die `to_pipeline` API verwendet und den Namen Ihrer Zielpipeline (`target-pipeline`) und Ihre Transformationsfunktion (`transform`) aufnimmt. Informationen zu Ihrem Feature-Prozessor und Ihrer Transformationsfunktion finden Sie unter [Feature-Prozessor SDK-Datenquellen](feature-store-feature-processor-data-sources-sdk.md).
+ Ein Trigger, der die `put_trigger` API verwendet und das `FeatureProcessorPipelineEvent` Ereignis und Ihren Ziel-Pipeline-Namen (`target-pipeline`) berücksichtigt. 

  Der `FeatureProcessorPipelineEvent` definiert den Auslöser für den Zeitpunkt, zu dem der Status Ihrer Quellpipeline (`source-pipeline`) wird `Succeeded`. Informationen zur Feature-Prozessor-Pipeline-Ereignisfunktion finden Sie [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent) im Feature Store unter Read the Docs. 

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

Ein Beispiel für die Verwendung ereignisbasierter Trigger zur Erstellung kontinuierlicher Ausführungen und automatischer Wiederholungen für Ihre Feature-Prozessor-Pipeline finden Sie unter [Kontinuierliche Ausführungen und automatische Wiederholungen mithilfe ereignisbasierter Trigger](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries).

Ein Beispiel für die Verwendung von ereignisbasierten Triggern zur Erstellung von kontinuierlichem *Streaming* und für automatische Wiederholungsversuche mithilfe ereignisbasierter Trigger finden Sie unter [Beispiele für das Streamen benutzerdefinierter Datenquellen](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming). 

# Überwachen Sie die SageMaker Feature-Prozessor-Pipelines im Amazon Feature Store
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS bietet Überwachungstools, mit denen Sie Ihre Amazon SageMaker AI-Ressourcen und -Anwendungen in Echtzeit überwachen, melden können, wenn etwas schief geht, und gegebenenfalls automatische Maßnahmen ergreifen können. Feature-Prozessor-Pipelines von Feature Store sind Pipelines, sodass die standardmäßigen Überwachungsmechanismen und Integrationen verfügbar sind. Betriebsmetriken wie Ausführungsfehler können über CloudWatch Amazon-Metriken und EventBridge Amazon-Ereignisse überwacht werden. 

Weitere Informationen zur Überwachung und Operationalisierung von Feature Store-Feature-Prozessor finden Sie in den folgenden Ressourcen:
+ [AWS Ressourcen in Amazon SageMaker AI überwachen](monitoring-overview.md)- Allgemeine Leitlinien zur Überwachung und Prüfung von Aktivitäten im Zusammenhang mit SageMaker KI-Ressourcen.
+ [SageMaker Metriken für Pipelines](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines)- Von Pipelines ausgegebene CloudWatch Metriken.
+ [SageMaker Änderung des Status der Pipeline-Ausführung](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline)- EventBridge Ereignisse, die für Pipelines und Ausführungen ausgegeben wurden.
+ [Fehlerbehebung bei Amazon SageMaker Pipelines](pipelines-troubleshooting.md) – allgemeine Tipps zum Debuggen und zur Fehlerbehebung für Pipelines.

Feature Store Feature Processor Ausführungsprotokolle finden Sie in Amazon CloudWatch Logs unter der `/aws/sagemaker/TrainingJobs` Protokollgruppe, wo Sie die Ausführungsprotokollstreams mithilfe von Suchkonventionen finden. Für Ausführungen, die durch den direkten Aufruf der `@feature_processor` dekorierten Funktion erstellt wurden, finden Sie die Protokolle in der Konsole Ihrer lokalen Ausführungsumgebung. Bei ` @remote` dekorierten Ausführungen enthält der Name des CloudWatch Logs-Streams den Namen der Funktion und den Ausführungszeitstempel. Bei Feature-Processor-Pipeline-Ausführungen enthält der CloudWatch Logs-Stream für den Schritt die `feature-processor` Zeichenfolge und die Ausführungs-ID der Pipeline.

Feature Store Feature Processor-Pipelines und aktuelle Ausführungsstatus finden Sie in Amazon SageMaker Studio Classic für eine bestimmte Feature-Gruppe in der Feature Store-Benutzeroberfläche. Funktionsgruppen, die sich auf die Feature-Prozessor-Pipelines beziehen, werden entweder als Eingaben oder Ausgaben in der Benutzeroberfläche angezeigt. Darüber hinaus kann die Lineage-Ansicht den Kontext zu vorgelagerten Ausführungen, wie z. B. datenproduzierenden Feature-Prozessor-Pipelines und Datenquellen, für das weitere Debugging bereitstellen. Weitere Informationen zur Verwendung der Lineage-Ansicht mit Studio Classic finden Sie unter [Lineage von der Konsole aus anzeigen](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).

# IAM-Berechtigungen und Ausführungsrollen
<a name="feature-store-feature-processor-iam-permissions"></a>

Um das Amazon SageMaker Python SDK verwenden zu können, sind Berechtigungen für die Interaktion mit erforderlich AWS-Services. Die folgenden Richtlinien sind für die vollständige Funktionalität des Feature Processor erforderlich. Sie können die Ihrer IAM-Rolle angehängten [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html)und [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS verwalteten Richtlinien anhängen. Weitere Informationen über das Zuweisen einer Berechtigungsrichtlinie zu Ihrer IAM-Rolle finden Sie unter [Hinzufügen von Richtlinien zu Ihrer IAM-Rolle](feature-store-adding-policies.md). Beispiele finden Sie in der folgenden Tabelle.

Die Vertrauensrichtlinie der Rolle, auf die diese Richtlinie angewendet wird, muss die Prinzipien „scheduler.amazonaws.com“, „sagemaker.amazonaws.com“ und „glue.amazonaws.com“ berücksichtigen.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# Einschränkungen, Beschränkungen und Kontingente für Feature-Prozessoren
<a name="feature-store-feature-processor-quotas"></a>

Amazon SageMaker Feature Store Feature Processing basiert auf der Lineage-Tracking (ML) mit SageMaker KI (Machine Learning). Der Feature Store-Feature-Prozessor verwendet Abstammungskontexte, um Feature-Verarbeitung-Pipelines und Pipeline-Versionen darzustellen und nachzuverfolgen. Jeder Feature Store-Feature-Prozessor verwendet mindestens zwei Abstammungskontexte (einen für die Feature-Verarbeitungs-Pipeline und einen weiteren für die Version). Wenn sich die Eingabe- oder Ausgabedatenquelle einer Feature-Verarbeitungs-Pipeline ändert, wird ein zusätzlicher Herkunftskontext erstellt. Sie können die SageMaker AI ML-Abstammungsgrenzen aktualisieren, indem Sie sich an den AWS Support wenden, um eine Erhöhung des Limits zu beantragen. Die Standardgrenzwerte für Ressourcen, die vom Feature Store-Feature-Prozessor verwendet werden, lauten wie folgt. Informationen zur Nachverfolgung der SageMaker AI-ML-Herkunft finden Sie unter. [Amazon SageMaker ML Lineage Tracking](lineage-tracking.md)

Weitere Informationen zu SageMaker KI-Kontingenten finden Sie unter [Amazon SageMaker AI-Endpunkte und Kontingente](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html).

Abstammungsgrenzen pro Region
+ Kontexte – 500 (Soft-Limit)
+ Artefakte – 6.000 (Soft-Limit)
+ Verbände – 6.000 (Soft-Limit)

Trainingslimits pro Region
+ Längste Laufzeit für einen Trainingsjob
+ Anzahl der Instances pro Trainingsjob
+ Die maximale Anzahl von `CreateTrainingJob`-Anforderungen, die Sie pro Sekunde in diesem Konto in der aktuellen Region vornehmen können – 1 TPS.
+ Keakalive-Zeitraum für die Wiederverwendung von Clustern: 3 600 Sekunden

Maximale Anzahl von Pipelines und gleichzeitigen Pipeline-Ausführungen pro Region
+ Maximal zulässige Anzahl von Pipelines pro Konto – 500
+ Pro Konto sind maximal 20 gleichzeitige Pipeline-Ausführungen zulässig
+ Zeitpunkt, zu dem das Timeout bei Pipeline-Ausführungen abläuft – 672 Stunden

# Datenquellen
<a name="feature-store-feature-processor-data-sources"></a>

Amazon SageMaker Feature Store Feature Processing unterstützt mehrere Datenquellen. Der SDK-Feature-Prozessor for Python (Boto3) bietet Konstrukte zum Laden von Daten aus Feature-Gruppen oder Objekten, die in Amazon S3 gespeichert sind. Darüber hinaus können Sie benutzerdefinierte Datenquellen erstellen, um Daten aus anderen Datenquellen zu laden. Informationen zu den vom Feature Store bereitgestellten Datenquellen finden Sie unter [Feature-Prozessor Datenquelle Merkmalsspeicher Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

**Topics**
+ [Feature-Prozessor SDK-Datenquellen](feature-store-feature-processor-data-sources-sdk.md)
+ [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md)
+ [Beispiele für benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom-examples.md)

# Feature-Prozessor SDK-Datenquellen
<a name="feature-store-feature-processor-data-sources-sdk"></a>

Das Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) bietet Konstrukte zum Laden von Daten aus Feature-Gruppen oder Objekten, die in Amazon S3 gespeichert sind. Eine vollständige Liste der vom Feature Store bereitgestellten Datenquellendefinitionen finden Sie in der [Feature-Prozessor-Datenquelle Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

Beispiele zur Verwendung der Feature-Store-Python-SDK-Datenquellendefinitionen finden Sie unter [Beispiel für Feature-Verarbeitungs-Code für allgemeine Anwendungsfälle](feature-store-feature-processor-examples.md).

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

Das `FeatureGroupDataSource` wird verwendet, um eine Feature-Gruppe als Eingabedatenquelle für einen Feature-Prozessor anzugeben. Daten können aus einer Feature-Gruppe im Offline-Speicher geladen werden. Der Versuch, Ihre Daten aus einer Onlineshop-Featuregruppe zu laden, führt zu einem Validierungsfehler. Sie können Start- und Endversätze angeben, um die Daten, die geladen werden, auf einen bestimmten Zeitraum zu beschränken. Sie können beispielsweise einen Startversatz von „14 Tagen“ angeben, um nur die Daten der letzten zwei Wochen zu laden, und Sie können zusätzlich einen Endversatz von „7 Tagen“ angeben, um die Eingabe auf die Daten der letzten Woche zu beschränken.

## Vom Feature Store bereitgestellte Datenquellendefinitionen
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

Das Feature Store Python SDK enthält Datenquellendefinitionen, mit denen verschiedene Eingabedatenquellen für einen Feature-Prozessor angegeben werden können. Dazu gehören CSV-, Parquet- und Eisberg-Tabellenquellen. Eine vollständige Liste der vom Feature Store bereitgestellten Datenquellendefinitionen finden Sie in der [Feature-Prozessor-Datenquelle Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

# Benutzerdefinierte Datenquellen
<a name="feature-store-feature-processor-data-sources-custom"></a>

Auf dieser Seite beschreiben wir, wie Sie eine benutzerdefinierte Datenquellenklasse erstellen, und zeigen einige Anwendungsbeispiele. Mit benutzerdefinierten Datenquellen können Sie das APIs bereitgestellte SageMaker AI SDK for Python (Boto3) genauso verwenden, als würden Sie von Amazon SageMaker Feature Store bereitgestellte Datenquellen verwenden. 

Um eine benutzerdefinierte Datenquelle zu verwenden, um Daten mithilfe von Feature Processing zu transformieren und in eine Feature-Gruppe aufzunehmen, müssen Sie die Klasse um die folgenden `PySparkDataSource` Klassenmitglieder und Funktionen erweitern.
+ `data_source_name`(str): ein beliebiger Name für die Datenquelle. Zum Beispiel Amazon Redshift, Snowflake oder ein Glue Catalog ARN.
+ `data_source_unique_id`(str): eine eindeutige Kennung, die sich auf die spezifische Ressource bezieht, auf die zugegriffen wird. Zum Beispiel Tabellenname, DDB-Tabellen-ARN, Amazon-S3-Präfix. Jede Verwendung derselben Daten `data_source_unique_id` in benutzerdefinierten Datenquellen wird derselben Datenquelle in der Lineage-Ansicht zugeordnet. Die Herkunft umfasst Informationen über den Ausführungscode eines Workflows zur Feature-Verarbeitung, welche Datenquellen verwendet wurden und wie sie in die Feature-Gruppe oder das Feature aufgenommen wurden. Informationen zum Anzeigen der Herkunft einer Feature-Gruppe in **Studio** finden Sie unter [Lineage von der Konsole aus anzeigen](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).
+ `read_data`(func): Eine Methode, die verwendet wird, um eine Verbindung mit dem Feature-Prozessor herzustellen. Gibt einen Spark-Datenrahmen zurück. Beispiele finden Sie unter [Beispiele für benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom-examples.md).

Beide `data_source_name` und `data_source_unique_id` werden verwendet, um Ihre Abstammungsentität eindeutig zu identifizieren. Im Folgenden finden Sie ein Beispiel für eine benutzerdefinierte Datenquellenklasse mit dem Namen `CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Beispiele für benutzerdefinierte Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Dieser Abschnitt enthält Beispiele für Implementierungen benutzerdefinierter Datenquellen für Feature-Prozessoren. Weitere Informationen zu gemeinsamen Datenquellen finden Sie unter [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md).

Sicherheit ist eine gemeinsame Verantwortung zwischen AWS und unseren Kunden. AWS ist verantwortlich für den Schutz der Infrastruktur, auf der die Dienste in der ausgeführt AWS Cloud werden. Kunden sind für alle erforderlichen Sicherheitskonfigurations- und Verwaltungsaufgaben verantwortlich. Beispielsweise sollten Geheimnisse wie Zugangsdaten für Datenspeicher in Ihren benutzerdefinierten Datenquellen nicht fest codiert sein. Sie können diese Anmeldeinformationen AWS Secrets Manager zur Verwaltung verwenden. Informationen zu Secrets Manager finden Sie unter [Was ist AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) im AWS Secrets Manager Benutzerhandbuch. In den folgenden Beispielen wird Secrets Manager für Ihre Anmeldeinformationen verwendet.

**Topics**
+ [Beispiele für benutzerdefinierte Amazon Redshift Clusters (JDBC)-Datenquellen](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Beispiele für benutzerdefinierte Snowflake-Datenquellen](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Beispiele für benutzerdefinierte Datenquellen von Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Beispiele für das Streamen benutzerdefinierter Datenquellen](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Beispiele für benutzerdefinierte Amazon Redshift Clusters (JDBC)-Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift bietet einen JDBC-Treiber, der zum Lesen von Daten mit Spark verwendet werden kann. Informationen zum Herunterladen des Amazon Redshift JDBC-Treibers finden Sie unter [Herunterladen des Amazon Redshift JDBC-Treibers](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html), Version 2.1. 

Um die benutzerdefinierte Amazon Redshift Redshift-Datenquellenklasse zu erstellen, müssen Sie die `read_data` Methode aus der [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) überschreiben. 

Um eine Verbindung mit einem Amazon Redshift Redshift-Cluster herzustellen, benötigen Sie:
+ Amazon Redshift JDBC-URL (`jdbc-url`)

  Informationen zum Abrufen Ihrer Amazon Redshift JDBC-URL finden Sie unter [Getting the JDBC URL](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) im Datenbankentwicklerhandbuch zu Amazon Redshift.
+ Amazon Redshift Redshift-Benutzername (`redshift-user`) und Passwort (`redshift-password`)

  Informationen zum Erstellen und Verwalten von Datenbankbenutzern mithilfe der Amazon-Redshift-SQL-Befehle finden Sie unter [Benutzer](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) im Amazon-Redshift-Dabase-Entwicklerhandbuch.
+ Name der Amazon-Redshift-Tabelle (`redshift-table-name`)

  Informationen zum Erstellen einer Tabelle mit einigen Beispielen finden Sie unter [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) im Datenbankentwicklerhandbuch zu Amazon Redshift.
+ (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (`secret-redshift-account-info`), unter dem Sie Ihren Amazon Redshift Redshift-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern.

  Informationen zu Secrets Manager [finden Sie unter Find Secrets AWS Secrets Manager im](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) AWS Secrets Manager Benutzerhandbuch. 
+ AWS-Region (`your-region`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die `read_data` für Ihre benutzerdefinierte Datenquellenklasse, `DatabricksDataSource` überschreiben.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

Das folgende Beispiel zeigt, wie Sie eine Verbindung `RedshiftDataSource` zu Ihrem `feature_processor` Dekorateur herstellen.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Um den Featureprozessor-Job remote auszuführen, müssen Sie den JDBC-Treiber bereitstellen, indem Sie ihn definieren `SparkConfig` und an den `@remote` Decorator übergeben.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Beispiele für benutzerdefinierte Snowflake-Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake bietet einen Spark-Konnektor, der für Ihren `feature_processor` Dekorateur verwendet werden kann. Informationen zum Snowflake-Konnektor für Spark finden Sie unter Snowflake-Konnektor für Spark in der [Snowflake-Dokumentation](https://docs.snowflake.com/en/user-guide/spark-connector).

Um die benutzerdefinierte Snowflake-Datenquellenklasse zu erstellen, müssen Sie die `read_data` Methode aus dem [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) überschreiben und die Spark-Connector-Pakete zum Spark-Klassenpfad hinzufügen. 

Um eine Verbindung mit einer Snowflake-Datenquelle herzustellen, benötigen Sie:
+ Snowflake-URL (`sf-url`)

  Informationen URLs zum Zugriff auf Snowflake-Weboberflächen finden Sie unter [Konto-Identifikatoren](https://docs.snowflake.com/en/user-guide/admin-account-identifier) in der Snowflake-Dokumentation.
+ Snowflake-Datenbank (`sf-database`) 

  Informationen zum Abrufen des Namens Ihrer Datenbank mit Snowflake finden Sie unter [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) in der Snowflake-Dokumentation.
+ Snowflake-Datenbankschema (`sf-schema`) 

  Informationen zum Abrufen des Namens Ihres Schemas mithilfe von Snowflake finden Sie unter [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) in der Snowflake-Dokumentation.
+ Snowflake-Warehouse (`sf-warehouse`)

  Informationen zum Abrufen des Namens Ihres Warehouse mithilfe von Snowflake finden Sie unter [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) in der Snowflake-Dokumentation.
+ Name der Snowflake-Tabelle (`sf-table-name`)
+ (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (`secret-snowflake-account-info`), unter dem Sie Ihren Snowflake-Zugriffsbenutzernamen und Ihr Passwort in Secrets Manager speichern. 

  Informationen zu Secrets Manager [finden Sie unter Find Secrets AWS Secrets Manager im](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) AWS Secrets Manager Benutzerhandbuch. 
+ AWS-Region (`your-region`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie den Snowflake-Benutzernamen und das Kennwort aus Secrets Manager abrufen und die `read_data` Funktion für Ihre benutzerdefinierte Datenquellenklasse überschreiben. `SnowflakeDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

Das folgende Beispiel zeigt, wie Sie eine Verbindung `SnowflakeDataSource` zu Ihrem `feature_processor` Dekorateur herstellen.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Um den Feature-Prozessor-Job remote auszuführen, müssen Sie die Pakete per Definition `SparkConfig` bereitstellen und an den `@remote` Decorator übergeben. Bei den Spark-Paketen im folgenden Beispiel handelt es sich um die `spark-snowflake_2.12` Feature-Prozessor Scala-Version, `2.12.0` um die Snowflake-Version, die Sie verwenden möchten, und `spark_3.3` um die Feature-Prozessor Spark-Version. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Beispiele für benutzerdefinierte Datenquellen von Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark kann mithilfe des Databricks JDBC-Treibers Daten aus Databricks lesen. Informationen zum Databricks-JDBC-Treiber finden Sie unter [Konfigurieren der Databricks-ODBC- und JDBC-Treiber in der](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) Databricks-Dokumentation.

**Anmerkung**  
Sie können Daten aus jeder anderen Datenbank lesen, indem Sie den entsprechenden JDBC-Treiber in den Spark-Klassenpfad aufnehmen. Weitere Informationen finden Sie unter [JDBC in anderen Datenbanken](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) in Apache Spark SQL.

Um die benutzerdefinierte Databricks-Datenquellenklasse zu erstellen, müssen Sie die `read_data` Methode aus dem überschreiben [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) und das JDBC-JAR zum Spark-Klassenpfad hinzufügen. 

Um eine Verbindung mit einer Databricks-Datenquelle herzustellen, benötigen Sie:
+ Databricks-URL (`databricks-url`)

  Informationen zu Ihrer Databricks-URL finden Sie unter [Erstellen der Verbindungs-URL für den Databricks-Treiber](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) in der Databricks-Dokumentation.
+ Persönliches Zugriffstoken von Databricks (`personal-access-token`)

  Informationen zu Ihrem Databricks-Zugriffstoken finden Sie unter Authentifizierung mit dem [persönlichen Zugriffstoken von Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) in der Databricks-Dokumentation.
+ Name des Datenkatalogs (`db-catalog`) 

  Informationen zu Ihrem Databricks-Katalognamen finden Sie unter [Katalogname](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) in der Databricks-Dokumentation.
+ Schemaname (`db-schema`)

  Informationen zu Ihrem Databricks-Schemanamen finden Sie unter Schemaname in der [Databricks-Dokumentation](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name).
+ Tabellenname (`db-table-name`)

  Informationen zu Ihrem Databricks-Tabellennamen finden Sie unter [Tabellenname](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) in derDatabricks-Dokumentation.
+ (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (`secret-databricks-account-info`), unter dem Sie Ihren Databricks-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern. 

  Informationen zu Secrets Manager [finden Sie unter Find Secrets AWS Secrets Manager im](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) AWS Secrets Manager Benutzerhandbuch. 
+ AWS-Region (`your-region`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die `read_data` für Ihre benutzerdefinierte Datenquellenklasse, `DatabricksDataSource` überschreiben.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

Das folgende Beispiel zeigt, wie der JDBC-Treiber jar, `jdbc-jar-file-name.jar`, auf Amazon S3 hochgeladen wird, um ihn dem Spark-Klassenpfad hinzuzufügen. Informationen zum Herunterladen des Spark-JDBC-Treibers (`jdbc-jar-file-name.jar`) von Databricks finden Sie unter [JDBC-Treiber herunterladen](https://www.databricks.com/spark/jdbc-drivers-download) auf der Databricks-Website.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Um den Featureprozessor-Job remote auszuführen, müssen Sie die JAR-Dateien `SparkConfig` durch Definition bereitstellen und an den `@remote` Decorator übergeben.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Beispiele für das Streamen benutzerdefinierter Datenquellen
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Sie können eine Verbindung zu Streaming-Datenquellen wie Amazon Kinesis herstellen und Transformationen mit Spark Structured Streaming erstellen, um aus Streaming-Datenquellen zu lesen. Informationen zum Kinesis-Konnektor finden Sie unter [Kinesis Connector for Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis) in. GitHub Weitere Informationen finden Sie unter [Was ist Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) im Entwicklerhandbuch für Amazon Kinesis Data Streams.

Um die benutzerdefinierte Amazon Kinesis Kinesis-Datenquellenklasse zu erstellen, müssen Sie die `BaseDataSource` Klasse erweitern und die `read_data` Methode von [Benutzerdefinierte Datenquellen](feature-store-feature-processor-data-sources-custom.md) überschreiben.

Zur Herstellung einer Verbindung mit einem Amazon Kinesis Kinesis-Daten-Stream benötigen Sie:
+ Kinesis ARN (`kinesis-resource-arn`) 

  Informationen zu Kinesis Data Stream ARNs finden Sie unter [Amazon Resource Names (ARNs) for Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) im Amazon Kinesis Developer Guide.
+ Kinesis-Datenstreamname (`kinesis-stream-name`)
+ AWS-Region (`your-region`)

  Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in der Boto3-Dokumentation.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

Das folgende Beispiel zeigt, wie Sie eine Verbindung `KinesisDataSource` zu Ihrem `feature_processor` Dekorateur herstellen. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

Im obigen Beispielcode verwenden wir einige Spark-Optionen für strukturiertes Streaming, während wir Mikrobatches in Ihre Feature-Gruppe streamen. Eine vollständige Liste der Optionen finden Sie im [Leitfaden zur Programmierung von strukturiertem Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) in der Apache-Spark-Dokumentation. 
+ Der `foreachBatch` Sink-Modus ist eine Funktion, mit der Sie Operationen und Schreiblogik auf die Ausgabedaten jedes Mikrobatches einer Streaming-Abfrage anwenden können. 

  Informationen dazu finden Sie unter `foreachBatch` [Using Foreach und ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) im Apache Spark Structured Streaming Programming Guide. 
+ Die `checkpointLocation`-Option speichert regelmäßig den Status der Streaming-Anwendung. Das Streaming-Protokoll wird am Checkpoint gespeichert `s3a://checkpoint-path`.

  Informationen zu dieser `checkpointLocation` Option finden Sie unter [Wiederherstellung nach Fehlern mit Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) in der strukturierten Streaming-Programmierung von Apache Spark. 
+ Die `trigger` Einstellung definiert, wie oft die Mikro-Batch-Verarbeitung in einer Streaming-Anwendung ausgelöst wird. In diesem Beispiel wird der Triggertyp „Verarbeitungszeit“ mit Mikrobatch-Intervallen von einer Minute verwendet, die von `trigger(processingTime="1 minute")` spezifiziert sind. Für das Backfill aus einer Stream-Quelle können Sie den Triggertyp Available-now verwenden, der von `trigger(availableNow=True)` spezifiziert ist.

  Eine vollständige Liste der `trigger`-Typen finden Sie unter [Trigger](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) in der strukturierten Streaming-Programmierung von Apache Spark.

**Kontinuierliches Streaming und automatische Wiederholungsversuche mit ereignisbasierten Triggern**

Der Feature Processor verwendet SageMaker Training als Recheninfrastruktur und hat eine maximale Laufzeit von 28 Tagen. Sie können ereignisbasierte Trigger verwenden, um Ihr kontinuierliches Streaming über einen längeren Zeitraum zu verlängern und vorübergehende Ausfälle zu beheben. Weitere Informationen zu zeitplan- und ereignisbasierten Ausführungen finden Sie unter [Geplante und ereignisbasierte Ausführungen für Feature-Prozessor-Pipelines](feature-store-feature-processor-schedule-pipeline.md).

Im Folgenden finden Sie ein Beispiel für die Einrichtung eines ereignisbasierten Triggers, um die Streaming-Featureprozessor-Pipeline kontinuierlich am Laufen zu halten. Dabei wird die im vorherigen Beispiel definierte Streaming-Transformationsfunktion verwendet. Eine Ziel-Pipeline kann so konfiguriert werden, dass sie ausgelöst wird, wenn bei der `STOPPED` Ausführung oder `FAILED` Quellpipeline-Ereignis eintritt. Beachten Sie, dass dieselbe Pipeline als Quelle und Ziel verwendet wird, sodass sie kontinuierlich ausgeführt wird.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# Beispiel für Feature-Verarbeitungs-Code für allgemeine Anwendungsfälle
<a name="feature-store-feature-processor-examples"></a>

In den folgenden Beispielen finden Sie ein Beispiel für Feature-Verarbeitungs-Code für häufige Anwendungsfälle. Ein detaillierteres Beispiel-Notizbuch, das bestimmte Anwendungsfälle zeigt, finden Sie im [Amazon SageMaker Feature Store Feature Processing Notebook](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb).

In den folgenden Beispielen ist `us-east-1` die Region der Ressource, `111122223333` ist die Konto-ID des Ressourcenbesitzers und `your-feature-group-name` ist der Name der Feature-Gruppe.

Der in den folgenden Beispielen verwendete `transactions` Datensatz hat das folgende Schema:

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [Verknüpfung von Daten aus mehreren Datenquellen](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [Aggregate mit verschiebbaren Fenstern](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [Aggregate aus taumelnden Fenstern](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [Werbung vom Offline-Speicher zum Online-Speicher](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [Transformationen mit der Pandas-Bibliothek](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [Kontinuierliche Ausführungen und automatische Wiederholungen mithilfe ereignisbasierter Trigger](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## Verknüpfung von Daten aus mehreren Datenquellen
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## Aggregate mit verschiebbaren Fenstern
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## Aggregate aus taumelnden Fenstern
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## Werbung vom Offline-Speicher zum Online-Speicher
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## Transformationen mit der Pandas-Bibliothek
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**Transformationen mit der Pandas-Bibliothek**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## Kontinuierliche Ausführungen und automatische Wiederholungen mithilfe ereignisbasierter Trigger
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```