

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Procesamiento de características
<a name="feature-store-feature-processing"></a>

El procesamiento de funciones de Amazon SageMaker Feature Store es una capacidad con la que puede transformar datos sin procesar en funciones de aprendizaje automático (ML). Le proporciona un SDK de procesador de características con el que puede transformar e ingerir datos de orígenes de datos por lotes en sus grupos de características. Con esta capacidad, el Almacén de características se ocupa de la infraestructura subyacente, lo que incluye el aprovisionamiento de los entornos de computación y la creación y el mantenimiento de Canalizaciones para cargar e ingerir datos. De esta forma, puede centrarse en las definiciones de su procesador de características, que incluyen una función de transformación (por ejemplo, el recuento de visualizaciones de productos, la media del valor de las transacciones), orígenes (dónde aplicar esta transformación) y receptores (dónde escribir los valores computados de las características).

La canalización del procesador de características es una canalización de Canalizaciones. Al tratarse de canalizaciones, también puede realizar un seguimiento de las canalizaciones programadas de procesadores de funciones basadas en la SageMaker inteligencia artificial en la consola. Para obtener más información sobre el linaje de la SageMaker IA, consulte [Seguimiento del linaje de Amazon SageMaker ML](lineage-tracking.md) Esto incluye el seguimiento de las ejecuciones programadas, la visualización del linaje para rastrear las entidades hasta sus fuentes de datos y la visualización de los procesadores de funciones compartidas en un solo entorno. Para obtener más información sobre el uso del Almacén de características con la consola, consulte [Visualización de las ejecuciones de canalizaciones desde la consola](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

**Topics**
+ [SDK del procesador de características del almacén de características](feature-store-feature-processor-sdk.md)
+ [Ejecución remota del procesador de características del almacén de características](feature-store-feature-processor-execute-remotely.md)
+ [Creación y ejecución de canalizaciones del procesador de características del almacén de características](feature-store-feature-processor-create-execute-pipeline.md)
+ [Ejecuciones programadas y basadas en eventos para las canalizaciones del procesador de características](feature-store-feature-processor-schedule-pipeline.md)
+ [Supervise las canalizaciones SageMaker de los procesadores de funciones de Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md)
+ [Permisos y roles de ejecución de IAM](feature-store-feature-processor-iam-permissions.md)
+ [Restricciones, límites y cuotas del procesador de características](feature-store-feature-processor-quotas.md)
+ [Orígenes de datos](feature-store-feature-processor-data-sources.md)
+ [Ejemplo de código de procesamiento de características para casos de uso habituales](feature-store-feature-processor-examples.md)

# SDK del procesador de características del almacén de características
<a name="feature-store-feature-processor-sdk"></a>

Determine una definición del procesador de características del almacén de características mediante la decoración de sus características de transformación con el decorador `@feature_processor`. El SDK de SageMaker IA para Python (Boto3) carga automáticamente los datos de las fuentes de datos de entrada configuradas, aplica la función de transformación decorada y, a continuación, ingiere los datos transformados en un grupo de características objetivo. Las funciones de transformación decoradas deben ajustarse a la firma esperada del decorador `@feature_processor`. Para obtener más información sobre el `@feature_processor` decorador, consulta [@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) en Amazon SageMaker Feature Store Lee los documentos. 

Con el `@feature_processor` decorador, la función de transformación se ejecuta en un entorno de ejecución de Spark en el que los argumentos de entrada proporcionados a la función y su valor devuelto son Spark. DataFrames El número de parámetros de entrada de la función de transformación debe coincidir con el número de entradas configuradas en el decorador `@feature_processor`. 

Para obtener más información sobre el decorador `@feature_processor`, consulte [Feature Processor Feature Store SDK for Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor).

El siguiente código contiene ejemplos básicos de uso del decorador `@feature_processor`. Para ver ejemplos de casos de uso de ejemplo más específicos, consulte [Ejemplo de código de procesamiento de características para casos de uso habituales](feature-store-feature-processor-examples.md).

El SDK del procesador de funciones se puede instalar desde el SDK de SageMaker Python y sus extras mediante el siguiente comando. 

```
pip install sagemaker[feature-processor]
```

En los siguientes ejemplos, `us-east-1` es la región del recurso, `111122223333` es el ID de la cuenta propietaria del recurso y `your-feature-group-name` es el nombre del grupo de características.

La siguiente es una definición básica del procesador de características, en la que el decorador `@feature_processor` configura una entrada CSV de Amazon S3 para cargarla y proporcionarla a la función de transformación (por ejemplo, `transform`), y la prepara para su incorporación a un grupo de características. Lo ejecuta la última línea.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

El parámetro `@feature_processor` incluye:
+ `inputs` (List[str]): una lista de orígenes de datos que se utilizan en el procesador de características del almacén de características. Si los orígenes de datos son grupos de características o están almacenados en Amazon S3, es posible que pueda utilizar la definiciones de orígenes de datos proporcionadas por el almacén de características para el procesador de características. Para obtener una lista completa de las definiciones de fuentes de datos proporcionadas por [Feature Store, consulte la fuente de datos del procesador](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source) de funciones en Amazon SageMaker Feature Store Lea los documentos.
+ `output` (str): el ARN del grupo de características para ingerir la salida de la función decorada.
+ `target_stores` (Optional[List[str]]): una lista de almacenes (por ejemplo, `OnlineStore` u `OfflineStore`) para ingerir en la salida. Si no se especifica, los datos se ingieren en todos los almacenes habilitados del grupo de características de salida.
+ `parameters` (Dict[str, Any]): un diccionario que se proporcionará a la función de transformación. 
+ `enable_ingestion` (bool): un indicador que señala si las salidas de la función de transformación se ingieren en el grupo de características de salida. Este indicador es útil durante la fase de desarrollo. Si no se especifica, la ingestión está habilitada.

Los parámetros opcionales de la función encapsulada (que se proporcionan como argumento si se proporcionan en la firma de la función) incluyen:
+ `params` (Dict[str, Any]): el diccionario definido en los parámetros del `@feature_processor`. También contiene parámetros configurados por el sistema a los que se puede hacer referencia con la clave `system`, como el parámetro `scheduled_time`.
+ `spark`(SparkSession): una referencia a la SparkSession instancia inicializada para la aplicación Spark.

El siguiente código es un ejemplo del uso de los parámetros `params` y `spark`.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

El parámetro del sistema `scheduled_time` (que se proporciona en el argumento `params` a su función) es un valor importante para respaldar el reintento de cada ejecución. El valor puede ayudar a identificar de forma única la ejecución del procesador de características y puede usarse como punto de referencia para las entradas basadas en el intervalo de fechas (por ejemplo, si solo se cargan los datos de las últimas 24 horas) para garantizar que el intervalo de entrada sea independiente del tiempo de ejecución real del código. Si el procesador de características se ejecuta según una programación (consulte [Ejecuciones programadas y basadas en eventos para las canalizaciones del procesador de características](feature-store-feature-processor-schedule-pipeline.md)), su valor se fija en la hora en la que está programada su ejecución. El argumento se puede anular durante la ejecución sincrónica mediante la API de ejecución del SDK para admitir casos de uso como la reposición de datos o la repetición de una ejecución anterior omitida. Su valor es la hora actual si el procesador de características se ejecuta de otra manera.

Para obtener información sobre la creación de código de Spark, consulte [Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html).

Para obtener más ejemplos de código para casos de uso habituales, consulte [Ejemplo de código de procesamiento de características para casos de uso habituales](feature-store-feature-processor-examples.md). 

Tenga en cuenta que las funciones de transformación decoradas con `@feature_processor` no devuelven ningún valor. Para probar la función mediante programación, puede quitar o aplicar un parche al decorador `@feature_processor` para que actúe como paso a la función encapsulada. Para obtener más información sobre el `@feature_processor` decorador, consulta el [SDK de Python de Amazon SageMaker Feature Store](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html). 

# Ejecución remota del procesador de características del almacén de características
<a name="feature-store-feature-processor-execute-remotely"></a>

Para ejecutar sus procesadores de funciones en conjuntos de datos grandes que requieren un hardware más potente que el disponible localmente, puede decorar su código con el `@remote` decorador para ejecutar su código Python local como un trabajo de SageMaker entrenamiento distribuido de uno o varios nodos. Para obtener más información sobre cómo ejecutar el código como un trabajo de SageMaker formación, consulte. [Ejecuta tu código local como un trabajo SageMaker de formación](train-remote-decorator.md) 

A continuación hay un ejemplo de uso del decorador `@remote` junto con el decorador `@feature_processor`.

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

El parámetro `spark_config` indica que el trabajo remoto se ejecuta como una aplicación de Spark. La `SparkConfig` instancia se puede usar para configurar la configuración de Spark y proporcionar dependencias adicionales a la aplicación Spark JARs, como archivos y archivos de Python.

Para realizar iteraciones más rápidas a la hora de desarrollar el código de procesamiento de características, puede especificar el argumento `keep_alive_period_in_seconds` en el decorador `@remote` para conservar los recursos configurados en un grupo en caliente para los siguientes trabajos de entrenamiento. Para obtener más información sobre los grupos en caliente, consulte `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)` en la Guía de referencia de API.

El siguiente código es un ejemplo de `requirements.txt:` local.

```
sagemaker>=2.167.0
```

De este modo, se instalará la versión del SageMaker SDK correspondiente en una tarea remota necesaria para ejecutar el método anotado por. `@feature-processor` 

# Creación y ejecución de canalizaciones del procesador de características del almacén de características
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

El SDK para procesadores de funciones permite APIs convertir sus definiciones de procesadores de funciones en una canalización de SageMaker IA totalmente gestionada. Para obtener más información acerca de Canalizaciones, consulte [Información general de canalizaciones](pipelines-overview.md). Para convertir sus definiciones de procesadores de funciones en una canalización de SageMaker IA, utilice la `to_pipeline` API con su definición de procesador de funciones. Puede programar las ejecuciones de su procesador de funciones, puede programarlas, monitorizarlas desde el punto de vista operativo con CloudWatch métricas e integrarlas EventBridge para que actúen como fuentes de eventos o suscriptores. Para obtener más información sobre la supervisión de canalizaciones creadas con Canalizaciones, consulte [Supervise las canalizaciones SageMaker de los procesadores de funciones de Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md).

Para ver las canalizaciones del procesador de características, consulte [Visualización de las ejecuciones de canalizaciones desde la consola](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

Si su función también está decorada con el decorador `@remote`, sus configuraciones se transfieren a la canalización del procesador de características. Puede especificar configuraciones avanzadas, como tipo y recuento de instancias de computación, dependencias del tiempo de ejecución y configuraciones de red y seguridad, mediante el decorador `@remote`.

En el siguiente ejemplo, se utilizan las letras `to_pipeline` y. `execute` APIs

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

La API `to_pipeline` es semánticamente una operación upsert. Actualiza la canalización si ya existe; de no ser así, crea una canalización.

La `to_pipeline` API acepta opcionalmente un URI de Amazon S3 que hace referencia a un archivo que contiene la definición del procesador de características para asociarlo a la canalización del procesador de características y realizar un seguimiento de la función de transformación y sus versiones en su linaje de aprendizaje automático de SageMaker IA.

Para recuperar una lista de todas las canalizaciones del procesador de características de su cuenta, puede utilizar la API `list_pipelines`. Una solicitud posterior a la API `describe` devuelve detalles relacionados con la canalización del procesador de características, incluidos, entre otros, detalles de Canalizaciones y la programación.

En el siguiente ejemplo, se utilizan las letras `list_pipelines` y. `describe` APIs

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# Ejecuciones programadas y basadas en eventos para las canalizaciones del procesador de características
<a name="feature-store-feature-processor-schedule-pipeline"></a>

Las ejecuciones del proceso de procesamiento de funciones de Amazon SageMaker Feature Store se pueden configurar para que se inicien de forma automática y asíncrona en función de una programación preconfigurada o como resultado de otro evento de servicio. AWS Por ejemplo, puede programar las canalizaciones de procesamiento de características para que se ejecuten el primer día de cada mes o encadenar dos canalizaciones para que una canalización objetiva se ejecute automáticamente una vez finalizada la ejecución de la canalización de origen.

**Topics**
+ [Ejecuciones basadas en una programación](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [Ejecuciones basadas en eventos](#feature-store-feature-processor-schedule-pipeline-event-based)

## Ejecuciones basadas en una programación
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

El SDK de Feature Processor proporciona una [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)API para ejecutar las canalizaciones de Feature Processor de forma recurrente con la integración de Amazon EventBridge Scheduler. La programación se puede especificar con una `cron` expresión `at``rate`, o utilizando el [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)parámetro con las mismas expresiones admitidas por Amazon EventBridge. La API de programación es semánticamente una operación upsert, ya que actualiza la programación si ya existe; de no ser así, la crea. Para obtener más información sobre EventBridge las expresiones y los ejemplos, consulte los [tipos de programación en el EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html) en la Guía del usuario del EventBridge Scheduler.

Los siguientes ejemplos utilizan la API [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) del procesador de características, con las expresiones `at`, `rate` y `cron`.

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

La zona horaria predeterminada para las entradas de fecha y hora en la API `schedule` está en UTC. Para obtener más información sobre las expresiones de programación de EventBridge Scheduler, consulte la documentación de referencia de la [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)API de EventBridge Scheduler.

Las ejecuciones planificadas de la canalización del procesador de características proporcionan a la función de transformación el tiempo de ejecución programado, que se utilizará como un token de idempotencia o un punto de referencia fijo para las entradas basadas en un intervalo de fechas. Para deshabilitar (es decir, pausar) o volver a habilitar una programación, usa el parámetro `state` de la API [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) con `‘DISABLED’` o `‘ENABLED’`, respectivamente.

Para obtener información sobre el procesador de características, consulte [Orígenes de datos del SDK del procesador de características](feature-store-feature-processor-data-sources-sdk.md). 

## Ejecuciones basadas en eventos
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

Es posible configurar una canalización de procesamiento de características para que se ejecute automáticamente cuando se produzca un evento AWS . El SDK de procesamiento de características proporciona una función [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) que acepta una lista de eventos de origen y una canalización objetivo. Los eventos de origen deben ser instancias de [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent), que especifica una canalización y eventos de [estado de ejecución](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus). 

La `put_trigger` función configura una EventBridge regla y un objetivo de Amazon para enrutar los eventos y te permite especificar un patrón de EventBridge eventos para responder a cualquier AWS evento. Para obtener información sobre estos conceptos, consulta EventBridge [las reglas](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html), [los objetivos](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html) y los [patrones de eventos](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html) de Amazon.

Los activadores se pueden activar o desactivar. EventBridge iniciará la ejecución de una canalización objetivo utilizando la función proporcionada en el `role_arn` parámetro de la `put_trigger` API. La función de ejecución se utiliza de forma predeterminada si el SDK se utiliza en un entorno Amazon SageMaker Studio Classic o Notebook. Para obtener información sobre cómo obtener el rol de ejecución, consulte [Obtención del rol de ejecución](sagemaker-roles.md#sagemaker-roles-get-execution-role).

En el siguiente ejemplo se establece:
+ Una canalización de SageMaker IA que utiliza la `to_pipeline` API y que incluye el nombre de la canalización de destino (`target-pipeline`) y la función de transformación (`transform`). Para obtener información sobre el procesador de características y su función de transformación, consulte [Orígenes de datos del SDK del procesador de características](feature-store-feature-processor-data-sources-sdk.md).
+ Un desencadenador que utiliza la API `put_trigger` y que toma `FeatureProcessorPipelineEvent` para el evento y su nombre de la canalización objetivo (`target-pipeline`). 

  El `FeatureProcessorPipelineEvent` define el desencadenador para cuando el estado de la canalización de origen (`source-pipeline`) pase a ser `Succeeded`. Para obtener información sobre la función de eventos de canalización del procesador de características, consulte [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent) en Read the Docs del almacén de características. 

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

Para ver un ejemplo del uso de desencadenadores basados en eventos para crear ejecuciones continuas y reintentos automáticos para su canalización del procesador de características, consulte [Ejecuciones continuas y reintentos automáticos mediante desencadenadores basados en eventos](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries).

Para ver un ejemplo del uso de desencadenadores basados en eventos para crear *transmisiones* continuas y reintentos automáticos mediante desencadenadores basados en eventos, consulte [Ejemplos de orígenes de datos personalizados de transmisión](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming). 

# Supervise las canalizaciones SageMaker de los procesadores de funciones de Amazon Feature Store
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS proporciona herramientas de monitoreo para monitorear sus recursos y aplicaciones de Amazon SageMaker AI en tiempo real, informar cuando algo sale mal y tomar medidas automáticas cuando sea apropiado. Las canalizaciones del procesador de características del Almacén de características son Canalizaciones, por lo que hay disponibles mecanismos de supervisión e integraciones estándar. Las métricas operativas, como los errores de ejecución, se pueden monitorear a través de CloudWatch las métricas de Amazon y EventBridge los eventos de Amazon. 

Para obtener más información sobre cómo supervisar y hacer operativo el procesador de características del almacén de características, consulte los siguientes recursos:
+ [Supervisión de AWS los recursos en Amazon SageMaker AI](monitoring-overview.md)- Directrices generales sobre la actividad de supervisión y auditoría de los recursos de SageMaker IA.
+ [SageMaker tuberías, métricas](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines)- CloudWatch Métricas emitidas por los oleoductos.
+ [SageMaker cambio de estado de ejecución de la canalización](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline)- EventBridge eventos emitidos por oleoductos y ejecuciones.
+ [Solución de problemas de Amazon SageMaker Pipelines](pipelines-troubleshooting.md): consejos generales de depuración y solución de problemas para Canalizaciones.

Los registros de ejecución del Feature Store Feature Processor se encuentran en Amazon CloudWatch Logs, en el `/aws/sagemaker/TrainingJobs` grupo de registros, donde puede encontrar los flujos de registros de ejecución mediante convenciones de búsqueda. En el caso de las ejecuciones creadas mediante la invocación directa de la función decorada `@feature_processor`, encontrará los registros en la consola de su entorno de ejecución local. En el caso de las ejecuciones ` @remote` condecoradas, el nombre de la secuencia de CloudWatch registros contiene el nombre de la función y la marca temporal de la ejecución. En el caso de las ejecuciones en canalización de Feature Processor, el flujo de CloudWatch registros del paso contiene la `feature-processor` cadena y el identificador de ejecución de la canalización.

Las canalizaciones de los procesadores de funciones y los estados de ejecución recientes de la tienda de características se encuentran en Amazon SageMaker Studio Classic para un grupo de características determinado en la interfaz de usuario de Feature Store. En la interfaz de usuario se muestran los grupos de características relacionados con las canalizaciones del procesador de características como entradas o salidas. Además, la vista de linaje puede proporcionar contexto a las ejecuciones anteriores, como canalizaciones que generan datos del procesador de características y orígenes de datos, para una mayor depuración. Para obtener más información sobre el uso de la vista de linaje con Studio Classic, consulte [Visualización del linaje desde la consola](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).

# Permisos y roles de ejecución de IAM
<a name="feature-store-feature-processor-iam-permissions"></a>

Para usar el SDK de Amazon SageMaker Python, se requieren permisos para interactuar con él Servicios de AWS. Se requieren las siguientes políticas para disfrutar de todas las funcionalidades del procesador de características. Puede adjuntar las políticas [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS gestionadas [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html)y las correspondientes a su función de IAM. Para obtener información sobre cómo asociar políticas a su rol de IAM, consulte [Adición de políticas al rol de IAM](feature-store-adding-policies.md). Para obtener detalles, consulte los siguientes ejemplos.

La política de confianza del rol al que se aplique esta política debe respetar los principios "scheduler.amazonaws.com", "sagemaker.amazonaws.com" y "glue.amazonaws.com".

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# Restricciones, límites y cuotas del procesador de características
<a name="feature-store-feature-processor-quotas"></a>

El procesamiento de funciones de Amazon SageMaker Feature Store se basa en el seguimiento del linaje mediante aprendizaje automático (ML) mediante SageMaker IA. El procesador de características del almacén de características utiliza contextos de linaje para representar y hacer un seguimiento de las canalizaciones del procesamiento de características y las versiones de las canalizaciones. Cada procesador de características del almacén de características consume al menos dos contextos de linaje (uno para la canalización de procesamiento de características y otro para la versión). Si el origen de datos de entrada o salida de una canalización de procesamiento de características cambia, se crea un contexto de linaje adicional. Para actualizar los límites de linaje con SageMaker IA ML, ponte en contacto con el equipo de AWS soporte técnico para aumentar el límite. Los límites predeterminados de los recursos utilizados por el procesador de características del almacén de características son los siguientes. Para obtener información sobre el seguimiento del linaje mediante SageMaker IA ML, consulte. [Seguimiento del linaje de Amazon SageMaker ML](lineage-tracking.md)

Para obtener más información sobre las cuotas de SageMaker IA, consulta los [puntos finales y las cuotas de Amazon SageMaker AI](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html).

Límites de linaje por región
+ Contextos: 500 (límite flexible)
+ Artefactos: 6000 (límite flexible)
+ Asociaciones: 6000 (límite flexible)

Límites de entrenamiento por región
+ Tiempo máximo de ejecución de un trabajo de entrenamiento: 432 000 segundos
+ Número máximo de instancias por trabajo de entrenamiento: 20
+ El número máximo de solicitudes `CreateTrainingJob` que se pueden realizar por segundo en esta cuenta en la región actual: 1 TPS
+ Período para mantenerse en activo para la reutilización del clúster: 3600 segundos

Número máximo de canalizaciones y ejecuciones simultáneas de canalizaciones por región
+ Número máximo de canalizaciones permitidas por cuenta: 500
+ Número máximo de ejecuciones simultáneas de canalizaciones permitidas por cuenta: 20
+ Tiempo de espera de las ejecuciones de la canalización: 672 horas

# Orígenes de datos
<a name="feature-store-feature-processor-data-sources"></a>

El procesamiento SageMaker de funciones de Amazon Feature Store admite múltiples fuentes de datos. El SDK para Python (Boto3) del procesador de características proporciona constructos para cargar datos de grupos de características u objetos almacenados en Amazon S3. Además, puede crear orígenes de datos personalizados para cargar datos de otros orígenes de datos. Para obtener información sobre los orígenes de datos proporcionadas por el almacén de características, consulte [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

**Topics**
+ [Orígenes de datos del SDK del procesador de características](feature-store-feature-processor-data-sources-sdk.md)
+ [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md)
+ [Ejemplos de orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom-examples.md)

# Orígenes de datos del SDK del procesador de características
<a name="feature-store-feature-processor-data-sources-sdk"></a>

El SDK del procesador de características de Amazon SageMaker Feature Store para Python (Boto3) proporciona construcciones para cargar datos de grupos de características u objetos almacenados en Amazon S3. Para obtener una lista completa de las definiciones de orígenes de datos proporcionadas por el almacén de características, consulte [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

Para ver ejemplos sobre cómo utilizar las definiciones de orígenes de datos del SDK de Python del almacén de características, consulte [Ejemplo de código de procesamiento de características para casos de uso habituales](feature-store-feature-processor-examples.md).

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` se utiliza para especificar un grupo de características como origen de datos de entrada para un procesador de características. Los datos se pueden cargar desde un grupo de características de un almacenamiento sin conexión. Si intenta cargar los datos desde un grupo de características de un almacenamiento en línea, se producirá un error de validación. Puede especificar desplazamientos de inicio y final para limitar los datos que se cargan a un intervalo de tiempo específico. Por ejemplo, puede especificar un desplazamiento de inicio de “14 días” para cargar solo las dos últimas semanas de datos y, además, puede especificar un desplazamiento de finalización de “7 días” para limitar la entrada de datos a la semana anterior.

## Definiciones de orígenes de datos proporcionadas por el almacén de características
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

El SDK de Python del almacén de características contiene definiciones de orígenes de datos que se pueden usar para especificar varios orígenes de datos de entrada para un procesador de características. Entre ellas se incluyen orígenes de tablas CSV, Parquet e Iceberg. Para obtener una lista completa de las definiciones de orígenes de datos proporcionadas por el almacén de características, consulte [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

# Orígenes de datos personalizados
<a name="feature-store-feature-processor-data-sources-custom"></a>

En esta página se describe cómo crear una clase de origen de datos personalizado y se muestran algunos ejemplos de uso. Con las fuentes de datos personalizadas, puedes usar el SDK de SageMaker IA para Python (Boto3) que se proporciona de la misma manera que si APIs utilizaras las fuentes de datos proporcionadas por Amazon SageMaker Feature Store. 

Para utilizar un origen de datos personalizado para transformar e ingerir datos en un grupo de características mediante el procesamiento de características, tendrá que ampliar la clase `PySparkDataSource` con los siguientes miembros y funciones de la clase.
+ `data_source_name` (str): un nombre arbitrario para el origen de datos. Por ejemplo, un ARN de Amazon Redshift, Snowflake o catálogo de Glue.
+ `data_source_unique_id` (str): un identificador único que hace referencia al recurso específico al que se accede. Por ejemplo, nombre de tabla, ARN de tabla de DDB, prefijo de Amazon S3. Todo uso del mismo `data_source_unique_id` en orígenes de datos personalizados se asociará al mismo origen de datos en la vista de linaje. El linaje incluye la información sobre el código de ejecución del flujo de trabajo de procesamiento de características, los orígenes de datos que se utilizaron y la forma en que se incorporaron al grupo de características o a la característica. Para obtener más información sobre cómo ver el linaje de un grupo de características en **Studio**, consulte [Visualización del linaje desde la consola](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).
+ `read_data` (func): método utilizado para conectarse con el procesador de características. Devuelve un marco de datos de Spark. Para ver ejemplos, consulte [Ejemplos de orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom-examples.md).

Tanto `data_source_name` como `data_source_unique_id` se utilizan para identificar de forma exclusiva la entidad del linaje. A continuación, se muestra un ejemplo de una clase de origen de datos personalizado que se denomina `CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Ejemplos de orígenes de datos personalizados
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

En esta sección se proporcionan ejemplos de implementaciones de orígenes de datos personalizados para procesadores de características. Para obtener más información sobre los orígenes de datos personalizados, consulte [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md).

La seguridad es una responsabilidad compartida entre nuestros clientes AWS y nosotros. AWS es responsable de proteger la infraestructura en la que se ejecutan los servicios del Nube de AWS. Los clientes son responsables de todas las tareas necesarias de configuración y administración de la seguridad. Por ejemplo, los secretos, como las credenciales de acceso a los almacenes de datos, no deben estar codificados de forma rígida en los orígenes de datos personalizados. Puede utilizarlas AWS Secrets Manager para administrar estas credenciales. Para obtener información sobre Secrets Manager, consulte [¿Qué es AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) en la guía AWS Secrets Manager del usuario. En los siguientes ejemplos, se utilizará Secrets Manager para las credenciales.

**Topics**
+ [Ejemplos de orígenes de datos personalizados de clústeres de Amazon Redshift (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Ejemplos de orígenes de datos personalizados de Snowflake](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Ejemplos de orígenes de datos personalizados de Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Ejemplos de orígenes de datos personalizados de transmisión](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Ejemplos de orígenes de datos personalizados de clústeres de Amazon Redshift (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift ofrece un controlador JDBC que puede utilizarse para leer datos con Spark. Para obtener información acerca de cómo descargar el controlador JDBC de Amazon Redshift, consulte [Descargar el controlador JDBC versión 2.1 de Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Para crear la clase de origen de datos personalizado de Amazon Redshift, tendrá que sobrescribir el método `read_data` de los [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md). 

Para conectarse a un clúster de Amazon Redshift, necesita:
+ URL de JDBC de Amazon Redshift (`jdbc-url`).

  Para obtener información sobre cómo obtener la URL de JDBC de Amazon Redshift, consulte [Obtención de la URL de JDBC](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) en la Guía para desarrolladores de bases de datos de Amazon Redshift.
+ Nombre de usuario (`redshift-user`) y contraseña (`redshift-password`) de Amazon Redshift.

  Para obtener información acerca de cómo crear y administrar usuarios de bases de datos mediante comandos SQL de Amazon Redshift, consulte [Usuarios](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) en la Guía para desarrolladores de bases de datos de Amazon Redshift.
+ Nombre de la tablas de Amazon Redshift (`redshift-table-name`).

  Para obtener información acerca de cómo crear una tabla con algunos ejemplos, consulte [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) en la Guía para desarrolladores de bases de datos de Amazon Redshift.
+ De forma opcional, si utiliza Secrets Manager, necesitará el nombre del secreto (`secret-redshift-account-info`) en el que guarda el nombre de usuario y contraseña de acceso a Amazon Redshift en Secrets Manager.

  Para obtener información sobre Secrets Manager, consulte [Buscar secretos AWS Secrets Manager en](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) la Guía del AWS Secrets Manager usuario. 
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

En el siguiente ejemplo se muestra cómo recuperar la URL de JDBC y el token de acceso personal de Secrets Manager y cómo anular `read_data` para su clase de origen de datos personalizado, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

En el siguiente ejemplo se muestra cómo conectar `RedshiftDataSource` con el decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Para ejecutar el trabajo del procesador de características de forma remota, debe proporcionar el controlador JDBC, para lo cual debe definir `SparkConfig` y pasarlo al decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Ejemplos de orígenes de datos personalizados de Snowflake
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake proporciona un conector Spark que puede utilizar para su decorador `feature_processor`. Para obtener información sobre el conector de Snowflake para Spark, consulte [Snowflake Connector for Spark](https://docs.snowflake.com/en/user-guide/spark-connector) en la documentación de Snowflake.

Para crear la clase de origen de datos personalizado de Snowflake, tendrá que anular el método `read_data` de los [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md) y agregar paquetes de conectores Spark a la ruta de clases de Spark. 

Para conectarse a un origen de datos de Snowflake, necesita:
+ URL de Snowflake (`sf-url`).

  Para obtener información sobre cómo URLs acceder a las interfaces web de Snowflake, consulte los [identificadores de cuenta](https://docs.snowflake.com/en/user-guide/admin-account-identifier) en la documentación de Snowflake.
+ Base de datos de Snowflake (`sf-database`). 

  Para obtener información sobre cómo obtener el nombre de la base de datos de Snowflake, consulte [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) en la documentación de Snowflake.
+ Esquema de base de datos de Snowflake (`sf-schema`). 

  Para obtener información sobre cómo obtener el nombre del esquema de Snowflake, consulte [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) en la documentación de Snowflake.
+ Almacén de Snowflake (`sf-warehouse`).

  Para obtener información sobre cómo obtener el nombre del almacén de Snowflake, consulte [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) en la documentación de Snowflake.
+ Nombre de la tabla de Snowflake (`sf-table-name`).
+ De forma opcional, si utiliza Secrets Manager, necesitará el nombre del secreto (`secret-snowflake-account-info`) en el que guarda el nombre de usuario y contraseña de acceso a Snowflake en Secrets Manager. 

  Para obtener información sobre Secrets Manager, consulte [Buscar secretos AWS Secrets Manager en](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) la Guía del AWS Secrets Manager usuario. 
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

En el siguiente ejemplo se muestra cómo recuperar el nombre de usuario y la contraseña de Snowflake de Secrets Manager y cómo anular la función `read_data` para su clase de origen de datos personalizado, `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

En el siguiente ejemplo se muestra cómo conectar `SnowflakeDataSource` con el decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Para ejecutar el trabajo del procesador de características de forma remota, debe proporcionar los paquetes, para lo cual debe definir `SparkConfig` y pasarlo al decorador `@remote`. En los paquetes de Spark en el siguiente ejemplo `spark-snowflake_2.12` es la versión de Scala del procesador de características, `2.12.0` es la versión de Snowflake que desea utilizar y `spark_3.3` es la versión de Spark del procesador de características. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Ejemplos de orígenes de datos personalizados de Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark puede leer datos de Databricks mediante el controlador JDBC de Databricks. Para obtener información sobre el controlador JDBC de Databricks, consulte [Configure the Databricks ODBC and JDBC drivers](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) en la documentación de Databricks.

**nota**  
Puede leer los datos de cualquier otra base de datos si incluye el controlador JDBC correspondiente en la ruta de clases de Spark. Para obtener más información, consulte [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) en la Guía de Spark SQL.

Para crear la clase de origen de datos personalizado de Databricks, tendrá que anular el método `read_data` de los [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md) y agregar paquetes de conectores Spark a la ruta de clases de Spark. 

Para conectarse a un origen de datos de Databricks, necesita:
+ URL de Databricks (`databricks-url`).

  Para obtener información sobre la URL de Databricks, consulte [Building the connection URL for the Databricks driver](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) en la documentación de Databricks.
+ Token de acceso personal de Databricks (`personal-access-token`).

  Para obtener información sobre el token de acceso a Databricks, consulte [Databricks personal access token authentication](https://docs.databricks.com/en/dev-tools/auth.html#pat) en la documentación de Databricks.
+ Nombre del catálogo de datos (`db-catalog`). 

  Para obtener información sobre el nombre del catálogo de Databricks, consulte [Catalog name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) en la documentación de Databricks.
+ Nombre del esquema (`db-schema`).

  Para obtener información sobre el nombre del esquema de Databricks, consulte [Schema name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) en la documentación de Databricks.
+ Nombre de la tabla (`db-table-name`).

  Para obtener información sobre el nombre de la tabla de Databricks, consulte [Table name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) en la documentación de Databricks.
+ De forma opcional, si utiliza Secrets Manager, necesitará el nombre del secreto (`secret-databricks-account-info`) en el que guarda el nombre de usuario y contraseña de acceso a Databricks en Secrets Manager. 

  Para obtener información sobre Secrets Manager, consulte [Buscar secretos AWS Secrets Manager en](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) la Guía del AWS Secrets Manager usuario. 
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

En el siguiente ejemplo se muestra cómo recuperar la URL de JDBC y el token de acceso personal de Secrets Manager y cómo sobrescribir `read_data` para su clase de origen de datos personalizado, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

El siguiente ejemplo muestra cómo cargar el jar del controlador JDBC, `jdbc-jar-file-name.jar`, a Amazon S3 para agregarlo a la ruta de clases de Spark. Para obtener información sobre cómo descargar el controlador JDBC de Spark (`jdbc-jar-file-name.jar`) de Databricks, consulte [Download JDBC Driver](https://www.databricks.com/spark/jdbc-drivers-download) en el sitio web de Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Para ejecutar el trabajo del procesador de características de forma remota, debe proporcionar los jars, para lo cual debe definir `SparkConfig` y pasarlo al decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Ejemplos de orígenes de datos personalizados de transmisión
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Puede conectarse a orígenes de datos de transmisión, como Amazon Kinesis, y crear transformaciones con Spark Structured Streaming para leer orígenes de datos de transmisión. Para obtener información sobre el conector de Kinesis, consulte Kinesis [Connector para Spark Structured](https://github.com/roncemer/spark-sql-kinesis) Streaming en. GitHub Para obtener información sobre Amazon Kinesis, consulte [What Is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) en la Guía para desarrolladores de Amazon Kinesis.

Para crear la clase de origen de datos personalizado de Amazon Kinesis, tendrá que ampliar la clase `BaseDataSource` y anular el método `read_data` de [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md).

Para conectarse a un flujo de datos de Amazon Kinesis, necesita:
+ ARN de Kinesis (`kinesis-resource-arn`). 

  Para obtener información sobre la transmisión de datos de Kinesis ARNs, consulte [Amazon Resource Names (ARNs) para Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) en la Guía para desarrolladores de Amazon Kinesis.
+ Nombre de flujo de datos de Kinesis (`kinesis-stream-name`).
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

En el siguiente ejemplo se muestra cómo conectar `KinesisDataSource` con el decorador `feature_processor`. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

En el código de ejemplo anterior, se utilizan algunas opciones de la transmisión estructurada de Spark para transmitir microlotes al grupo de características. Para ver una lista completa de opciones, consulte la [guía de programación de transmisión estructurada](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) en la documentación de Apache Spark. 
+ El modo receptor `foreachBatch` es una característica que permite aplicar operaciones y escribir lógica en los datos de salida de cada microlote de una consulta de transmisión. 

  Para obtener más información`foreachBatch`, consulte el [uso de Foreach y](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) la Guía de programación de ForeachBatch streaming estructurado de Apache Spark. 
+ La opción `checkpointLocation` guarda periódicamente el estado de la aplicación de transmisión. El registro de transmisión se guarda en la ubicación del punto de control `s3a://checkpoint-path`.

  Para obtener información sobre la opción `checkpointLocation`, consulte [Recovering from Failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) en la guía de programación de transmisión estructurada de Apache Spark. 
+ La configuración `trigger` define la frecuencia con la que se desencadena el procesamiento por microlotes en una aplicación de transmisión. En el ejemplo, el tipo de desencadenador de tiempo de procesamiento se utiliza con intervalos de microlotes de un minuto, según lo especificado en `trigger(processingTime="1 minute")`. Para reponer desde un origen de transmisión, puede usar el tipo de desencadenador disponible en este momento, según lo especificado en `trigger(availableNow=True)`.

  Para ver una lista completa de tipos de `trigger`, consulte [Triggers](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) en la guía de programación de transmisión estructurada de Apache Spark.

**Transmisiones continuas y reintentos automáticos mediante desencadenadores basados en eventos**

El procesador de funciones utiliza SageMaker Training como infraestructura de cómputo y tiene un límite máximo de tiempo de ejecución de 28 días. Puede utilizar desencadenadores basados en eventos para ampliar la transmisión continua durante un período de tiempo más prolongado y recuperarse de errores transitorios. Para obtener más información sobre las ejecuciones programadas y basadas en eventos, consulte [Ejecuciones programadas y basadas en eventos para las canalizaciones del procesador de características](feature-store-feature-processor-schedule-pipeline.md).

A continuación, se muestra un ejemplo de cómo configurar un desencadenador basado en eventos para que la canalización del procesador de características de transmisión funcione de forma continua. Para ello se utiliza la función de transformación de transmisión definida en el ejemplo anterior. Es posible configurar una canalización objetivo para que se desencadene cuando se produzca un evento `STOPPED` o `FAILED` durante la ejecución de una canalización de origen. Tenga en cuenta que se utiliza la misma canalización como origen y objetivo para que se ejecute de forma continua.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# Ejemplo de código de procesamiento de características para casos de uso habituales
<a name="feature-store-feature-processor-examples"></a>

Los siguientes ejemplos incluyen muestras de código de procesamiento de características para casos de uso habituales. Para ver un ejemplo de cuaderno más detallado que muestre casos de uso específicos, consulta el [cuaderno de procesamiento de características de Amazon SageMaker Feature Store](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb).

En los siguientes ejemplos, `us-east-1` es la región del recurso, `111122223333` es el ID de la cuenta propietaria del recurso y `your-feature-group-name` es el nombre del grupo de características.

El conjunto de datos `transactions` utilizado en los siguientes ejemplos tiene el siguiente esquema:

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [Unión de datos desde varios orígenes de datos](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [Agregación en ventanas deslizantes](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [Agregación en ventanas de saltos de tamaño constante](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [Promoción del almacenamiento sin conexión al almacenamiento en línea](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [Transformaciones con la biblioteca Pandas](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [Ejecuciones continuas y reintentos automáticos mediante desencadenadores basados en eventos](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## Unión de datos desde varios orígenes de datos
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## Agregación en ventanas deslizantes
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## Agregación en ventanas de saltos de tamaño constante
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## Promoción del almacenamiento sin conexión al almacenamiento en línea
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## Transformaciones con la biblioteca Pandas
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**Transformaciones con la biblioteca Pandas**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## Ejecuciones continuas y reintentos automáticos mediante desencadenadores basados en eventos
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```