

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Elaborazione delle funzionalità
<a name="feature-store-feature-processing"></a>

Amazon SageMaker Feature Store Feature Processing è una funzionalità con cui puoi trasformare i dati grezzi in funzionalità di machine learning (ML). Fornisce un processore di funzionalità SDK con cui puoi trasformare e inserire dati da fonti di dati in batch nei tuoi gruppi di funzionalità. Grazie a questa funzionalità, l’archivio delle caratteristiche si occupa dell’infrastruttura sottostante, incluso il provisioning degli ambienti di calcolo e la creazione e la manutenzione di pipeline per caricare e inserire dati. In questo modo è possibile concentrarsi sulle definizioni del processore di funzionalità che includono funzioni di trasformazione (ad esempio, il numero di visualizzazioni del prodotto, la media del valore della transazione), fonti (su cui applicare questa trasformazione) e sink (su cui scrivere i valori delle funzionalità calcolate).

La pipeline del Processore di funzionalità è una pipeline Pipelines. Come Pipelines, puoi anche tenere traccia delle pipeline di Feature Processor pianificate con la derivazione SageMaker AI nella console. Per ulteriori informazioni su SageMaker AI Lineage, consulta [Monitoraggio del lignaggio di Amazon SageMaker ML](lineage-tracking.md) Ciò include il monitoraggio delle esecuzioni pianificate, la visualizzazione del lineage per ricondurre le funzionalità alle relative fonti di dati e la visualizzazione dei feature processor condivisi in un unico ambiente. Per informazioni sull’utilizzo dell’archivio delle caratteristiche con la console, consulta [Visualizzazione delle esecuzioni della pipeline dalla console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

**Topics**
+ [Feature Store Feature Processor SDK](feature-store-feature-processor-sdk.md)
+ [Esecuzione del processore di funzionalità Feature Store da remoto](feature-store-feature-processor-execute-remotely.md)
+ [Creazione ed esecuzione di pipeline del processore di funzionalità Feature Store](feature-store-feature-processor-create-execute-pipeline.md)
+ [Esecuzioni pianificate e basate su eventi per le pipeline del Processore di funzionalità](feature-store-feature-processor-schedule-pipeline.md)
+ [Monitora le pipeline dei SageMaker Feature Processor di Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md)
+ [Autorizzazioni e ruoli di esecuzione IAM](feature-store-feature-processor-iam-permissions.md)
+ [Restrizioni, limiti e quote del Processore di funzionalità](feature-store-feature-processor-quotas.md)
+ [Origini dati](feature-store-feature-processor-data-sources.md)
+ [Esempio di codice di elaborazione della funzionalità per casi d'uso comuni](feature-store-feature-processor-examples.md)

# Feature Store Feature Processor SDK
<a name="feature-store-feature-processor-sdk"></a>

Dichiara una definizione del processore di funzionalità Feature Store decorando le funzioni di trasformazione con il decoratore `@feature_processor`. L' SageMaker AI SDK for Python (Boto3) carica automaticamente i dati dalle fonti di dati di input configurate, applica la funzione di trasformazione decorata e quindi inserisce i dati trasformati in un gruppo di funzionalità di destinazione. Le funzioni di trasformazione decorate devono essere conformi alla firma prevista del decoratore `@feature_processor`. Per ulteriori informazioni sul `@feature_processor` decoratore, consulta [@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) nell'Amazon SageMaker Feature Store Leggi i documenti. 

Con il `@feature_processor` decoratore, la funzione di trasformazione viene eseguita in un ambiente di runtime Spark in cui gli argomenti di input forniti alla funzione e il suo valore restituito sono Spark. DataFrames Il numero di parametri di input nella funzione di trasformazione deve corrispondere al numero di input configurati nel decoratore `@feature_processor`. 

Per ulteriori informazioni sul decoratore `@feature_processor`, consulta [processore di funzionalità Feature Store SDK per Python (boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor).

Il codice seguente è un esempio di base su come usare il decoratore `@feature_processor`. Per esempi di utilizzo più specifici, consulta [Esempio di codice di elaborazione della funzionalità per casi d'uso comuni](feature-store-feature-processor-examples.md).

Il Feature Processor SDK può essere installato dall'SDK SageMaker Python e dai suoi extra utilizzando il seguente comando. 

```
pip install sagemaker[feature-processor]
```

Negli esempi seguenti, `us-east-1` è la Regione della risorsa, `111122223333` è l'ID dell'account del proprietario della risorsa e `your-feature-group-name` è il nome del gruppo di funzionalità.

Di seguito è riportata una definizione di base del processore di funzionalità, in cui il decoratore `@feature_processor` configura un input CSV da Amazon S3 da caricare, fornire alla funzione di trasformazione (ad esempio, `transform`) e preparare per l'inserimento in un gruppo di funzionalità. Viene eseguito dall'ultima riga.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

I parametri `@feature_processor` includono:
+ `inputs` (List[str]): un elenco di fonti di dati utilizzate nel processore di funzionalità Feature Store. Se le tue fonti di dati sono gruppi di funzionalità o sono archiviate in Amazon S3, potresti essere in grado di utilizzare le definizioni delle fonti di dati fornite da Feature Store per il processore di funzionalità. Per un elenco completo delle definizioni delle fonti di dati fornite dal Feature Store, consulta la [fonte dei dati del Feature Processor](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source) in Amazon SageMaker Feature Store Leggi i documenti.
+ `output` (str): l'ARN del gruppo di funzionalità per importare l'output della funzione decorata.
+ `target_stores` (Optional[List[str]]): un elenco di archivi (ad esempio, `OnlineStore` o `OfflineStore`) da inserire nell'output. Se non specificato, i dati vengono inseriti in tutti gli archivi abilitati del gruppo di funzionalità di output.
+ `parameters` (Dict[str, Any]): un dizionario da fornire alla funzione di trasformazione. 
+ `enable_ingestion` (bool): flag che indica se gli output della funzione di trasformazione vengono inseriti nel gruppo di funzionalità di output. Questo flag è utile durante la fase di sviluppo. Se non specificato, l'inserimento è abilitato.

I parametri opzionali della funzione con wrap (forniti come argomento se presenti nella firma della funzione) includono:
+ `params` (Dict[str, Any]): il dizionario definito nei parametri `@feature_processor`. Contiene anche parametri configurati dal sistema a cui è possibile fare riferimento con la chiave `system`, come il parametro `scheduled_time`.
+ `spark`(SparkSession): Un riferimento all' SparkSession istanza inizializzata per l'applicazione Spark.

Il seguente codice è un esempio di utilizzo dei parametri `params` e `spark`.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

Il parametro di sistema `scheduled_time` (fornito nell'argomento `params` della funzione) è un valore importante per supportare ogni nuovo tentativo di esecuzione. Il valore può aiutare a identificare in modo univoco l'esecuzione del processore di funzionalità e può essere utilizzato come punto di riferimento per gli input basati su intervalli di date (ad esempio, caricando solo i dati delle ultime 24 ore) per garantire l'intervallo di input indipendente dal tempo di esecuzione effettivo del codice. Se il processore di funzionalità viene eseguito secondo un programma (consulta [Esecuzioni pianificate e basate su eventi per le pipeline del Processore di funzionalità](feature-store-feature-processor-schedule-pipeline.md)), il relativo valore viene fissato all'ora in cui è pianificata l'esecuzione. L'argomento può essere ignorato durante l'esecuzione sincrona utilizzando l'API di esecuzione dell'SDK per supportare casi d'uso come il backfill dei dati o la nuova esecuzione di un’operazione precedente non riuscita. Il valore corrisponde all'ora corrente se il processore di funzionalità viene eseguito in altro modo.

Per informazioni sulla creazione del codice Spark, consulta la [Guida alla programmazione di Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html).

Per ulteriori esempi di codice per casi d'uso comuni, consulta [Esempio di codice di elaborazione della funzionalità per casi d'uso comuni](feature-store-feature-processor-examples.md). 

Si noti che le funzioni di trasformazione decorate con `@feature_processor` non restituiscono alcun valore. Per testare programmaticamente la tua funzione, puoi rimuovere o applicare patch al decoratore `@feature_processor` in modo che funga da collegamento diretto alla funzione con wrap. Per maggiori dettagli sul `@feature_processor` decoratore, consulta [Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html). 

# Esecuzione del processore di funzionalità Feature Store da remoto
<a name="feature-store-feature-processor-execute-remotely"></a>

Per eseguire i Feature Processor su set di dati di grandi dimensioni che richiedono hardware più potente di quello disponibile localmente, puoi decorare il codice con il `@remote` decoratore per eseguire il codice Python locale come processo di formazione distribuito a uno o più nodi SageMaker . Per ulteriori informazioni sull'esecuzione del codice come processo di SageMaker formazione, consulta. [Esegui il tuo codice locale come processo SageMaker di formazione](train-remote-decorator.md) 

Di seguito è riportato un esempio di utilizzo del decoratore `@remote` insieme al decoratore `@feature_processor`.

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

Il parametro `spark_config` indica che il processo remoto viene eseguito come applicazione Spark. L'`SparkConfig`istanza può essere utilizzata per configurare la configurazione Spark e fornire dipendenze aggiuntive all'applicazione Spark come file e file Python. JARs

Per iterazioni più rapide durante lo sviluppo del codice di elaborazione delle funzionalità, puoi specificare l'argomento `keep_alive_period_in_seconds` nel decoratore `@remote` per mantenere le risorse configurate in un warm pool per i successivi processi di addestramento. Per ulteriori informazioni sui warm pool, consulta `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)` nella guida di riferimento delle API.

Il seguente codice è un esempio di `requirements.txt:` locale

```
sagemaker>=2.167.0
```

Questo installerà la versione SageMaker SDK corrispondente nel job remoto, necessaria per l'esecuzione del metodo annotato da. `@feature-processor` 

# Creazione ed esecuzione di pipeline del processore di funzionalità Feature Store
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

Il Feature Processor SDK consente di APIs promuovere le definizioni dei Feature Processor in una pipeline AI completamente gestita SageMaker . Per ulteriori informazioni su Pipelines, consulta [Panoramica su Pipelines](pipelines-overview.md). Per convertire le definizioni del Feature Processor in una pipeline SageMaker AI, utilizza l'`to_pipeline`API con la definizione del Feature Processor. Potete pianificare le esecuzioni della vostra Feature Processor Definition, monitorarle operativamente con CloudWatch metriche e integrarle EventBridge per fungere da fonti di eventi o abbonati. Per ulteriori informazioni sul monitoraggio delle pipeline create con Pipelines, consulta [Monitora le pipeline dei SageMaker Feature Processor di Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md).

Per visualizzare le pipeline del Processore di funzionalità, consulta [Visualizzazione delle esecuzioni della pipeline dalla console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

Se la funzione è decorata anche con il decoratore `@remote`, le relative configurazioni vengono trasferite nella pipeline del Processore di funzionalità. È possibile specificare configurazioni avanzate come il tipo e il numero delle istanze di calcolo, le dipendenze di runtime, le configurazioni di rete e di sicurezza utilizzando il decoratore `@remote`.

L'esempio seguente utilizza and. `to_pipeline` `execute` APIs

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

L'API `to_pipeline` è semanticamente un'operazione di upsert. Aggiorna la pipeline se esiste già; in caso contrario, crea una pipeline.

L'`to_pipeline`API accetta facoltativamente un URI Amazon S3 che fa riferimento a un file contenente la definizione del Feature Processor per associarlo alla pipeline Feature Processor per tracciare la funzione di trasformazione e le sue versioni nella SageMaker sua linea di apprendimento automatico AI.

Per recuperare un elenco di tutte le pipeline del Processore di funzionalità presenti nel tuo account, puoi utilizzare l'API `list_pipelines`. Una richiesta successiva all’API `describe` restituisce i dettagli relativi alla pipeline del Processore di funzionalità, inclusi, a titolo esemplificativo, i dettagli su Pipelines e sulla pianificazione.

L'esempio seguente utilizza and. `list_pipelines` `describe` APIs

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# Esecuzioni pianificate e basate su eventi per le pipeline del Processore di funzionalità
<a name="feature-store-feature-processor-schedule-pipeline"></a>

Le esecuzioni della pipeline di Amazon SageMaker Feature Store Feature Processing possono essere configurate per l'avvio automatico e asincrono in base a una pianificazione preconfigurata o in seguito a un altro evento di servizio. AWS Ad esempio, è possibile pianificare l'esecuzione delle pipeline di elaborazione delle funzionalità il primo giorno di ogni mese o concatenare due pipeline in modo che una pipeline di destinazione venga eseguita automaticamente dopo il completamento dell'esecuzione di una pipeline di origine.

**Topics**
+ [Esecuzioni basate sulla pianificazione](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [Esecuzioni basate su eventi](#feature-store-feature-processor-schedule-pipeline-event-based)

## Esecuzioni basate sulla pianificazione
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

Il Feature Processor SDK fornisce un'[https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)API per eseguire le pipeline di Feature Processor su base ricorrente con l'integrazione di Amazon Scheduler. EventBridge La pianificazione può essere specificata con un'`cron`espressione `at``rate`, o utilizzando il [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)parametro con le stesse espressioni supportate da Amazon EventBridge. L'API di pianificazione è semanticamente un'operazione di upsert in quanto aggiorna la pianificazione se esiste già; in caso contrario, la crea. Per ulteriori informazioni sulle EventBridge espressioni e sugli esempi, consulta [Schedule types on EventBridge Scheduler nella EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html) User Guide.

Gli esempi seguenti utilizzano l'API [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) del Processore di funzionalità, utilizzando le espressioni `at`, `rate` e `cron`.

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

Il fuso orario predefinito per gli input di data e ora nell'API `schedule` è UTC. Per ulteriori informazioni sulle espressioni di EventBridge pianificazione di Scheduler, consulta la documentazione di riferimento dell'[https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)API EventBridge Scheduler.

Le esecuzioni di pipeline pianificate del Processore di funzionalità forniscono alla funzione di trasformazione il tempo di esecuzione pianificato, da utilizzare come token di idempotenza o punto di riferimento fisso per gli input basati su intervalli di date. Per disabilitare (ad esempio mettere in pausa) o riattivare una pianificazione, utilizza il parametro `state` dell'API [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) con `‘DISABLED’` o `‘ENABLED’`, rispettivamente.

Per informazioni sul Processore di funzionalità, consulta [Origini dati per SDK del Processore di funzionalità](feature-store-feature-processor-data-sources-sdk.md). 

## Esecuzioni basate su eventi
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

Una pipeline di elaborazione delle funzionalità può essere configurata per l'esecuzione automatica quando si verifica un evento AWS . L'SDK di elaborazione delle funzionalità fornisce una funzione [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) che accetta un elenco di eventi di origine e una pipeline di destinazione. Gli eventi di origine devono essere istanze di [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent), che specifica una pipeline ed eventi di [stato di esecuzione](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus). 

La `put_trigger` funzione configura una EventBridge regola e un target Amazon per instradare gli eventi e consente di specificare un modello di EventBridge evento per rispondere a qualsiasi AWS evento. Per informazioni su questi concetti, consulta Amazon EventBridge [rules](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html), [targets](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html) and [event patterns](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html).

I trigger possono essere abilitati o disabilitati. EventBridge avvierà l'esecuzione di una pipeline di destinazione utilizzando il ruolo fornito nel `role_arn` parametro dell'`put_trigger`API. Il ruolo di esecuzione viene utilizzato per impostazione predefinita se l'SDK viene utilizzato in un ambiente Amazon SageMaker Studio Classic o Notebook. Per informazioni su come ottenere il ruolo di esecuzione, consulta [Acquisizione del ruolo di esecuzione](sagemaker-roles.md#sagemaker-roles-get-execution-role).

L'esempio seguente configura:
+ Una pipeline SageMaker AI che utilizza l'`to_pipeline`API, che include il nome della pipeline di destinazione (`target-pipeline`) e la funzione di trasformazione (). `transform` Per informazioni sul Processore di funzionalità e sulla funzione di trasformazione, consulta [Origini dati per SDK del Processore di funzionalità](feature-store-feature-processor-data-sources-sdk.md).
+ Un trigger che utilizza l'API `put_trigger`, che riceve `FeatureProcessorPipelineEvent` per l'evento e il nome della pipeline di destinazione (`target-pipeline`). 

  `FeatureProcessorPipelineEvent` definisce il trigger per quando lo stato della pipeline di origine (`source-pipeline`) diventa `Succeeded`. Per informazioni sulla funzione evento della pipeline del Processore di funzionalità, consulta [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent) in Leggi i documenti del Processore di funzionalità. 

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

Per un esempio di utilizzo di trigger basati su eventi per creare esecuzioni continue e tentativi automatici per la pipeline del Processore di funzionalità, consulta [Esecuzioni continue e tentativi automatici utilizzando trigger basati su eventi](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries).

Per un esempio di utilizzo di trigger basati su eventi per creare *streaming* continuo e tentativi automatici utilizzando trigger basati su eventi, consulta [Esempi di origini dati personalizzate di streaming](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming). 

# Monitora le pipeline dei SageMaker Feature Processor di Amazon Feature Store
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS fornisce strumenti di monitoraggio per monitorare le risorse e le applicazioni Amazon SageMaker AI in tempo reale, segnalare quando qualcosa va storto e intraprendere azioni automatiche quando necessario. Le pipeline del Processore di funzionalità dell’archivio delle caratteristiche sono Pipelines, quindi sono disponibili i meccanismi di monitoraggio e le integrazioni standard. Le metriche operative come gli errori di esecuzione possono essere monitorate tramite i CloudWatch parametri Amazon e gli eventi Amazon. EventBridge 

Per ulteriori informazioni su come monitorare e rendere operativo il Processore di funzionalità dell'archivio funzionalità, consulta le seguenti risorse:
+ [Monitoraggio AWS delle risorse in Amazon SageMaker AI](monitoring-overview.md)- Linee guida generali sull'attività di monitoraggio e controllo delle risorse di intelligenza artificiale. SageMaker 
+ [SageMaker metriche delle pipeline](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines)- CloudWatch Metriche emesse dalle pipeline.
+ [SageMaker modifica dello stato di esecuzione della pipeline](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline)- EventBridge eventi emessi per Pipeline ed esecuzioni.
+ [Risoluzione dei problemi di Amazon SageMaker Pipelines](pipelines-troubleshooting.md): suggerimenti generali per il debug e la risoluzione dei problemi per Pipelines.

I log di esecuzione del Feature Store Feature Processor sono disponibili in Amazon CloudWatch Logs nel gruppo di `/aws/sagemaker/TrainingJobs` log, dove puoi trovare i flussi di log di esecuzione utilizzando le convenzioni di ricerca. Per le esecuzioni create richiamando direttamente la funzione decorata `@feature_processor`, puoi trovare i log nella console dell'ambiente di esecuzione locale. Per le esecuzioni ` @remote` decorate, il nome del flusso CloudWatch Logs contiene il nome della funzione e il timestamp di esecuzione. Per le esecuzioni della pipeline del Feature Processor, il flusso CloudWatch Logs per la fase contiene la stringa e l'ID di esecuzione della `feature-processor` pipeline.

Le pipeline del Feature Store Feature Processor e gli stati di esecuzione recenti sono disponibili in Amazon SageMaker Studio Classic per un determinato gruppo di funzionalità nell'interfaccia utente di Feature Store. I gruppi di funzionalità relativi alle pipeline del Processore di funzionalità come input o output vengono visualizzati nell'interfaccia utente. Inoltre, la visualizzazione della derivazione può fornire un contesto alle esecuzioni upstream, come le pipeline del Processore di funzionalità che producono dati e le origini dati, per ulteriori operazioni di debug. Per ulteriori informazioni sull’utilizzo della visualizzazione di lineage con Studio Classic, consulta [Visualizzazione del lineage dalla console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).

# Autorizzazioni e ruoli di esecuzione IAM
<a name="feature-store-feature-processor-iam-permissions"></a>

Per utilizzare l'SDK Amazon SageMaker Python sono necessarie le autorizzazioni con cui interagire. Servizi AWS Le seguenti policy sono necessarie per la piena funzionalità del Processore di funzionalità. Puoi allegare le [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html)e le [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS Managed Policies associate al tuo ruolo IAM. Per ulteriori informazioni su come collegare policy al ruolo IAM, consulta [Aggiunta di policy al ruolo IAM](feature-store-adding-policies.md). Per i dettagli, consulta gli esempi seguenti.

La policy di attendibilità del ruolo a cui viene applicata questa policy deve consentire i principi "scheduler.amazonaws.com", "sagemaker.amazonaws.com" e "glue.amazonaws.com".

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# Restrizioni, limiti e quote del Processore di funzionalità
<a name="feature-store-feature-processor-quotas"></a>

Amazon SageMaker Feature Store Feature Processing si basa sul tracciamento del lignaggio basato sull' SageMaker intelligenza artificiale (ML). Il Processore di funzionalità dell'archivio funzionalità utilizza contesti di derivazione per rappresentare e tracciare le pipeline di elaborazione delle funzionalità e le versioni della pipeline. Ogni Processore di funzionalità dell'archivio funzionalità utilizza almeno due contesti di derivazione (uno per la pipeline di elaborazione delle funzionalità e l'altro per la versione). Se l'origine dati di input o output di una pipeline di elaborazione delle funzionalità cambia, viene creato un contesto di derivazione aggiuntivo. Puoi aggiornare i limiti di derivazione di SageMaker AI ML contattando l' AWS assistenza per un aumento dei limiti. I limiti predefiniti per le risorse utilizzate dal Processore di funzionalità dell'archivio funzionalità sono i seguenti. Per informazioni sul tracciamento del lignaggio SageMaker AI ML, consulta. [Monitoraggio del lignaggio di Amazon SageMaker ML](lineage-tracking.md)

Per ulteriori informazioni sulle quote SageMaker AI, consulta [Endpoint e quote Amazon SageMaker AI](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html).

Limiti di derivazione per Regione
+ Contesti: 500 (limite flessibile)
+ Artefatti: 6.000 (limite flessibile)
+ Associazioni: 6.000 (limite flessibile)

Limiti di addestramento per Regione
+ Runtime più lungo per un processo di addestramento: 432.000 secondi
+ Numero massimo di istanze per processo di addestramento: 20
+ Il numero massimo di richieste `CreateTrainingJob` che puoi effettuare al secondo in questo account nella Regione corrente: 1 TPS
+ Periodo di mantenimento dell'attività per il riutilizzo dei cluster: 3.600 secondi

Numero massimo di pipeline ed esecuzioni simultanee di pipeline per Regione
+ Numero massimo di pipeline consentite per account: 500
+ Numero massimo di esecuzioni simultanee di pipeline consentite per account: 20
+ Tempo dopo il quale scadono le esecuzioni della pipeline: 672 ore

# Origini dati
<a name="feature-store-feature-processor-data-sources"></a>

Amazon SageMaker Feature Store Feature Processing supporta più fonti di dati. L'SDK del Processore di funzionalità per Python (Boto3) fornisce costrutti per caricare dati da gruppi di funzionalità o oggetti archiviati in Amazon S3. Inoltre, puoi creare origini dati personalizzate per caricare dati da altre origini dati. Per informazioni sulle origini dati fornite dall'archivio funzionalità, consulta [Origine dati del Processore di funzionalità dell'archivio funzionalità Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

**Topics**
+ [Origini dati per SDK del Processore di funzionalità](feature-store-feature-processor-data-sources-sdk.md)
+ [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md)
+ [Esempi di origini dati personalizzate](feature-store-feature-processor-data-sources-custom-examples.md)

# Origini dati per SDK del Processore di funzionalità
<a name="feature-store-feature-processor-data-sources-sdk"></a>

L'Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) fornisce costrutti per caricare dati da gruppi di funzionalità o oggetti archiviati in Amazon S3. Per un elenco completo delle definizioni delle origini dati fornite dall'archivio funzionalità, consulta [Origine dati del Processore di funzionalità dell'archivio funzionalità Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

Per esempi su come utilizzare le definizioni delle origini dati dell'archivio funzionalità Python SDK, consulta [Esempio di codice di elaborazione della funzionalità per casi d'uso comuni](feature-store-feature-processor-examples.md).

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` viene utilizzato per specificare un gruppo di funzionalità come origine dati di input per un Processore di funzionalità. I dati possono essere caricati da un gruppo di funzionalità di un archivio offline. Il tentativo di caricare i dati da un gruppo di funzionalità di un archivio online genererà un errore di convalida. È possibile specificare gli offset di inizio e fine per limitare i dati caricati a un intervallo di tempo specifico. Ad esempio, puoi specificare un offset iniziale di '14 giorni' per caricare solo le ultime due settimane di dati e puoi anche specificare un offset finale di '7 giorni' per limitare l'input alla settimana precedente di dati.

## Definizioni delle origini dati fornite dall'archivio funzionalità
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

L'archivio funzionalità Python SDK contiene definizioni di origini dati che possono essere utilizzate per specificare varie origini dati di input per un Processore di funzionalità. Queste includono origini di tabella CSV, Parquet e Iceberg. Per un elenco completo delle definizioni delle origini dati fornite dall'archivio funzionalità, consulta [Origine dati del Processore di funzionalità dell'archivio funzionalità Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

# Origini dati personalizzate
<a name="feature-store-feature-processor-data-sources-custom"></a>

In questa pagina descriveremo come creare una classe di origine dati personalizzata e mostreremo alcuni esempi di utilizzo. Con le sorgenti dati personalizzate, puoi utilizzare l' SageMaker AI SDK for Python ( APIs Boto3) fornito nello stesso modo in cui utilizzi le sorgenti dati fornite da Amazon Feature Store. SageMaker 

Per utilizzare un'origine dati personalizzata per trasformare e inserire dati in un gruppo di funzionalità utilizzando l'elaborazione delle funzionalità, dovrai estendere la classe `PySparkDataSource` con i seguenti membri e funzioni della classe.
+ `data_source_name` (str): un nome arbitrario per l'origine dati. Ad esempio, Amazon Redshift, Snowflake o un ARN di Glue Catalog.
+ `data_source_unique_id`(str): un identificatore univoco che si riferisce alla risorsa specifica a cui si accede. Ad esempio, nome della tabella, ARN della tabella DDB, prefisso Amazon S3. Tutti gli utilizzi dello stesso `data_source_unique_id` nelle origini dati personalizzate verranno associati alla stessa origine dati nella visualizzazione della derivazione. La derivazione include informazioni sul codice di esecuzione di un flusso di lavoro di elaborazione delle funzionalità, sulle origini dati utilizzate e su come vengono inserite nel gruppo di funzionalità o nella funzionalità. Per informazioni sulla visualizzazione del lineage di un gruppo di funzionalità in **Studio**, consulta [Visualizzazione del lineage dalla console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).
+ `read_data`(func): un metodo usato per connettersi con il processore di funzionalità. Restituisce un dataframe Spark. Per alcuni esempi, consulta [Esempi di origini dati personalizzate](feature-store-feature-processor-data-sources-custom-examples.md).

Entrambi `data_source_name` e `data_source_unique_id` vengono utilizzati per identificare in modo univoco l'entità di derivazione. Di seguito è riportato un esempio di una classe di origine dati personalizzata denominata `CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Esempi di origini dati personalizzate
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Questa sezione fornisce esempi di implementazioni di origini dati personalizzate per il Processore di funzionalità. Per ulteriori informazioni sulle origini dati personalizzate, consulta [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md).

La sicurezza è una responsabilità condivisa tra i AWS nostri clienti. AWS è responsabile della protezione dell'infrastruttura che gestisce i servizi di Cloud AWS. I clienti sono responsabili di tutte le attività necessarie di configurazione e gestione della sicurezza. Ad esempio, segreti come le credenziali di accesso agli archivi dati non devono essere codificati nelle origini dati personalizzate. È possibile utilizzare Gestione dei segreti AWS per gestire queste credenziali. Per informazioni su Secrets Manager, vedi [Cos'è Gestione dei segreti AWS?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) nella guida Gestione dei segreti AWS per l'utente. I seguenti esempi utilizzeranno Secrets Manager per le tue credenziali.

**Topics**
+ [Esempi di origini dati personalizzate di cluster Amazon Redshift (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Esempi di origini dati personalizzate Snowflake](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Esempi di origini dati personalizzate Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Esempi di origini dati personalizzate di streaming](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Esempi di origini dati personalizzate di cluster Amazon Redshift (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift offre un driver JDBC che può essere utilizzato per leggere i dati con Spark. Per informazioni su come scaricare il driver JDBC di Amazon Redshift, consulta [Scarica il driver JDBC di Amazon Redshift, versione 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Per creare la classe di origine dati personalizzata di Amazon Redshift, dovrai sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md). 

Per connetterti a un cluster Amazon Redshift hai bisogno di:
+ URL JDBC di Amazon Redshift (`jdbc-url`)

  Per informazioni su come ottenere l'URL JDBC di Amazon Redshift, consulta la sezione [Ottenimento dell'URL JDBC](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) nella Guida per gli sviluppatori di database di Amazon Redshift.
+ Nome utente (`redshift-user`) e password (`redshift-password`) di Amazon Redshift

  Per informazioni su come creare e gestire gli utenti del database utilizzando i comandi SQL di Amazon Redshift, consulta la sezione [Utenti](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) nella Guida per gli sviluppatori di database di Amazon Redshift.
+ Nome della tabella (`redshift-table-name`) di Amazon Redshift

  Per informazioni su come creare una tabella con alcuni esempi, consulta la sezione [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) nella Guida per gli sviluppatori di database di Amazon Redshift.
+ (Facoltativo) Se utilizzi Secrets Manager, avrai bisogno del nome segreto (`secret-redshift-account-info`) in cui archivi il nome utente e la password di accesso ad Amazon Redshift su Secrets Manager.

  Per informazioni su Secrets Manager, consulta [Find secrets Gestione dei segreti AWS in](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) the Gestione dei segreti AWS User Guide. 
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

L'esempio seguente illustra come recuperare l'URL JDBC e il token di accesso personale da Secrets Manager e sovrascrivere `read_data` per la classe di origine dati personalizzata, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

L'esempio seguente mostra come connettere `RedshiftDataSource` al proprio decoratore `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Per eseguire il processo del processore di funzionalità da remoto, è necessario fornire il driver jdbc definendo `SparkConfig` e passandolo al decoratore `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Esempi di origini dati personalizzate Snowflake
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake fornisce un connettore Spark che può essere utilizzato per il decoratore `feature_processor`. Per informazioni sul connettore Snowflake per Spark, consulta [Connettore Snowflake per Spark](https://docs.snowflake.com/en/user-guide/spark-connector) nella documentazione di Snowflake.

Per creare la classe di origine dati personalizzata Snowflake, dovrai sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md) e aggiungere i pacchetti del connettore Spark al classpath di Spark. 

Per connetterti a un'origine dati Snowflake hai bisogno di:
+ URL Snowflake (`sf-url`)

  Per informazioni sull' URLs accesso alle interfacce web di Snowflake, consulta gli [identificatori degli account](https://docs.snowflake.com/en/user-guide/admin-account-identifier) nella documentazione di Snowflake.
+ Database Snowflake (`sf-database`) 

  Per informazioni su come ottenere il nome del database utilizzando Snowflake, consulta [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) nella documentazione di Snowflake.
+ Schema del database Snowflake (`sf-schema`) 

  Per informazioni su come ottenere il nome dello schema utilizzando Snowflake, consulta [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) nella documentazione di Snowflake.
+ Warehouse Snowflake (`sf-warehouse`)

  Per informazioni su come ottenere il nome del warehouse utilizzando Snowflake, consulta [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) nella documentazione di Snowflake.
+ Nome della tabella Snowflake (`sf-table-name`)
+ (Facoltativo) Se utilizzi Secrets Manager, avrai bisogno del nome segreto (`secret-snowflake-account-info`) in cui archivi il nome utente e la password di accesso a Snowflake su Secrets Manager. 

  Per informazioni su Secrets Manager, consulta [Find secrets Gestione dei segreti AWS in](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) the Gestione dei segreti AWS User Guide. 
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

L'esempio seguente illustra come recuperare il nome utente e la password di Snowflake da Secrets Manager e sovrascrivere la funzione `read_data` per la classe di origine dati personalizzata `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

L'esempio seguente mostra come connettere `SnowflakeDataSource` al proprio decoratore `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Per eseguire il processo del processore di funzionalità da remoto, è necessario fornire i pacchetti tramite la definizione di `SparkConfig` e passarlo al decoratore `@remote`. I pacchetti Spark nell'esempio seguente sono tali che `spark-snowflake_2.12` è la versione Scala del Processore di funzionalità, `2.12.0` è la versione di Snowflake che desideri utilizzare e `spark_3.3` è la versione Spark del Processore di funzionalità. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Esempi di origini dati personalizzate Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark può leggere i dati da Databricks utilizzando il driver JDBC di Databricks. Per informazioni sul driver JDBC di Databricks, consulta [Configurazione dei driver ODBC e JDBC di Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) nella documentazione di Databricks.

**Nota**  
Puoi leggere i dati da qualsiasi altro database includendo il driver JDBC corrispondente nel classpath di Spark. Per ulteriori informazioni, consulta [Da JDBC ad altri database](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) nella Guida Spark SQL.

Per creare la classe di origine dati personalizzata Databricks, dovrai sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md) e aggiungere il file jar JDBC al classpath di Spark. 

Per connetterti a un'origine dati Databricks hai bisogno di:
+ URL di Databricks (`databricks-url`)

  Per informazioni sull'URL di Databricks, consulta [Creazione dell'URL di connessione per il driver di Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) nella documentazione di Databricks.
+ Token di accesso personale di Databricks (`personal-access-token`)

  Per informazioni sul token di accesso Databricks, consulta [Autenticazione con token di accesso personale di Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) nella documentazione di Databricks.
+ Nome del catalogo dati (`db-catalog`) 

  Per informazioni sul nome del catalogo di Databricks, consulta [Nome del catalogo](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) nella documentazione di Databricks.
+ Nome schema (`db-schema`)

  Per informazioni sul nome dello schema di Databricks, consulta [Nome dello schema](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) nella documentazione di Databricks.
+ Nome della tabella (`db-table-name`)

  Per informazioni sul nome della tabella di Databricks, consulta [Nome della tabella](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) nella documentazione di Databricks.
+ (Facoltativo) Se utilizzi Secrets Manager, avrai bisogno del nome segreto (`secret-databricks-account-info`) in cui archivi il nome utente e la password di accesso a Databricks su Secrets Manager. 

  Per informazioni su Secrets Manager, consulta [Find secrets Gestione dei segreti AWS in](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) the Gestione dei segreti AWS User Guide. 
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

L'esempio seguente illustra come recuperare l'URL JDBC e il token di accesso personale da Secrets Manager e sovrascrivere `read_data` per la classe di origine dati personalizzata, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

L'esempio seguente mostra come caricare il file jar del driver JDBC, `jdbc-jar-file-name.jar`, su Amazon S3 per aggiungerlo al classpath di Spark. Per informazioni sul download del driver JDBC di Spark (`jdbc-jar-file-name.jar`) da Databricks, consulta [Download del driver JDBC](https://www.databricks.com/spark/jdbc-drivers-download) nel sito Web di Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Per eseguire il processo del processore di funzionalità da remoto, è necessario fornire i file jar definendo `SparkConfig` e passandolo al decoratore `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Esempi di origini dati personalizzate di streaming
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

È possibile connettersi a origini dati di streaming come Amazon Kinesis e creare trasformazioni con Spark Structured Streaming per leggere da origini dati di streaming. Per informazioni sul connettore Kinesis, consulta Kinesis Connector [for Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis) in. GitHub Per informazioni su Amazon Kinesis, consulta [Cos'è il flusso di dati Amazon Kinesis?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) nella Guida per gli sviluppatori di Amazon Kinesis.

Per creare la classe di origine dati personalizzata di Amazon Kinesis, dovrai estendere la classe `BaseDataSource` e sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md).

Per connetterti a un flusso di dati Amazon Kinesis, hai bisogno di:
+ ARN di Kinesis (`kinesis-resource-arn`) 

  Per informazioni sul flusso di dati Kinesis ARNs, consulta [Amazon Resource Names (ARNs) for Kinesis Data Streams nella Amazon Kinesis Developer](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) Guide.
+ Nome del flusso di dati Kinesis (`kinesis-stream-name`)
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

L'esempio seguente illustra come connettere `KinesisDataSource` al proprio decoratore `feature_processor`. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

Nel codice di esempio sopra riportato, utilizziamo alcune opzioni di Spark Structured Streaming durante lo streaming di micro-batch nel tuo gruppo di funzionalità. Per un elenco completo delle opzioni, consulta la [Guida di programmazione di Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) nella documentazione di Apache Spark. 
+ La modalità sink `foreachBatch` è una funzionalità che consente di applicare operazioni e scrivere logica sui dati di output di ogni micro-batch di una query di streaming. 

  Per informazioni su`foreachBatch`, consulta [Using Foreach e ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) nella Apache Spark Structured Streaming Programming Guide. 
+ L'opzione `checkpointLocation` salva periodicamente lo stato dell'applicazione di streaming. Il log di streaming viene salvato nella posizione di checkpoint `s3a://checkpoint-path`.

  Per informazioni sull'opzione `checkpointLocation`, consulta [Recupero dagli errori tramite esecuzione di checkpoint](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) nella Guida di programmazione di Apache Spark Structured Streaming. 
+ L'impostazione `trigger` definisce la frequenza con cui viene attivata l'elaborazione in micro-batch in un'applicazione di streaming. Nell'esempio, il tipo di trigger del tempo di elaborazione viene utilizzato con intervalli di micro-batch di un minuto, specificati da `trigger(processingTime="1 minute")`. Per effettuare il backfill da una sorgente di streaming, puoi utilizzare il tipo di trigger available-now, specificato da `trigger(availableNow=True)`.

  Per un elenco completo dei tipi di `trigger`, consulta [Trigger](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) nella Guida di programmazione di Apache Spark Structured Streaming.

**Streaming continuo e tentativi automatici utilizzando trigger basati su eventi**

Il Feature Processor utilizza SageMaker Training come infrastruttura di elaborazione e ha un limite di runtime massimo di 28 giorni. È possibile utilizzare i trigger basati su eventi per estendere lo streaming continuo per un periodo di tempo più lungo e recuperare in caso di errori temporanei. Per ulteriori informazioni sulle esecuzioni basate sulla pianificazione e sugli eventi, consulta [Esecuzioni pianificate e basate su eventi per le pipeline del Processore di funzionalità](feature-store-feature-processor-schedule-pipeline.md).

Di seguito è riportato un esempio di configurazione di un trigger basato su eventi per mantenere in esecuzione continua la pipeline del Processore di funzionalità di streaming. Usa la funzione di trasformazione di streaming definita nell'esempio precedente. Una pipeline di destinazione può essere configurata per essere attivata quando si verifica un evento `STOPPED` o `FAILED` per l'esecuzione di una pipeline di origine. Si noti che la stessa pipeline viene utilizzata come origine e destinazione in modo che sia esecuzione in modo continuo.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# Esempio di codice di elaborazione della funzionalità per casi d'uso comuni
<a name="feature-store-feature-processor-examples"></a>

Nei seguenti esempi vengono riportati esempi di codice di elaborazione delle funzionalità per casi d'uso comuni. Per un notebook di esempio più dettagliato che mostra casi d'uso specifici, consulta il notebook [Amazon SageMaker Feature Store Feature Processing](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb).

Negli esempi seguenti, `us-east-1` è la Regione della risorsa, `111122223333` è l'ID dell'account proprietario della risorsa e `your-feature-group-name` è il nome del gruppo di funzionalità.

Il set di dati `transactions` utilizzato negli esempi seguenti presenta lo schema seguente:

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [Unione di dati da più origini dati](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [Aggregati basati su finestre temporali scorrevoli](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [Aggregati basati su finestre temporali a cascata](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [Promozione dall'archivio offline all'archivio online](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [Trasformazioni con la libreria Pandas](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [Esecuzioni continue e tentativi automatici utilizzando trigger basati su eventi](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## Unione di dati da più origini dati
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## Aggregati basati su finestre temporali scorrevoli
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## Aggregati basati su finestre temporali a cascata
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## Promozione dall'archivio offline all'archivio online
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## Trasformazioni con la libreria Pandas
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**Trasformazioni con la libreria Pandas**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## Esecuzioni continue e tentativi automatici utilizzando trigger basati su eventi
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```