

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Processamento de atributos
<a name="feature-store-feature-processing"></a>

O processamento de SageMaker recursos da Amazon Feature Store é um recurso com o qual você pode transformar dados brutos em recursos de aprendizado de máquina (ML). Ele fornece um SDK do Processador de atributos com o qual você pode transformar e ingerir dados de fontes de dados em lote em seus grupos de atributos. Com esse atributo, o Feature Store cuida da infraestrutura subjacente, incluindo o provisionamento dos ambientes computacionais e a criação e manutenção do Pipelines para carregar e ingerir dados. Dessa forma, você pode se concentrar nas definições do processador de atributos que incluem uma função de transformação (por exemplo, contagem de visualizações do produto, média do valor da transação), fontes (onde aplicar essa transformação) e coletores (onde gravar os valores computados do atributo).

O pipeline do Processador de Atributos é um pipeline do Pipelines. Como Pipelines, você também pode rastrear pipelines programados do Feature Processor com a linhagem de SageMaker IA no console. Para obter mais informações sobre o SageMaker AI Lineage, consulte [Rastreamento SageMaker de linhagem do Amazon ML](lineage-tracking.md) Isso inclui rastrear execuções programadas, visualizar a linhagem para rastrear recursos de volta às suas fontes de dados e visualizar processadores de recursos compartilhados em um único ambiente. Para obter informações sobre como usar o Feature Store com o Studio, consulte [Visualizar as execuções do pipeline a partir do console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

**Topics**
+ [SDK do Processador de atributos do Feature Store](feature-store-feature-processor-sdk.md)
+ [Executar o Processador de atributos do Feature Store remotamente](feature-store-feature-processor-execute-remotely.md)
+ [Criar e executar pipelines do Processador de atributos do Feature Store](feature-store-feature-processor-create-execute-pipeline.md)
+ [Execuções programadas e baseadas em eventos para pipelines do Processador de atributos](feature-store-feature-processor-schedule-pipeline.md)
+ [Monitore os pipelines SageMaker do processador de recursos da Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md)
+ [Permissões e funções de execução do IAM](feature-store-feature-processor-iam-permissions.md)
+ [Restrições, limites e cotas do Processador de atributos](feature-store-feature-processor-quotas.md)
+ [Fontes de dados](feature-store-feature-processor-data-sources.md)
+ [Exemplo de código de Processamento de atributos para casos de uso comuns](feature-store-feature-processor-examples.md)

# SDK do Processador de atributos do Feature Store
<a name="feature-store-feature-processor-sdk"></a>

Declare uma definição de Processador de atributos do Feature Store decorando suas funções de transformação com o decorador `@feature_processor`. O SageMaker AI SDK for Python (Boto3) carrega automaticamente os dados das fontes de dados de entrada configuradas, aplica a função de transformação decorada e, em seguida, ingere os dados transformados em um grupo de recursos de destino. As funções de transformação decoradas devem estar de acordo com a assinatura esperada do decorador `@feature_processor`. Para obter mais informações sobre o `@feature_processor` decorador, consulte [@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) na Amazon SageMaker Feature Store Read the Docs. 

Com o `@feature_processor` decorador, sua função de transformação é executada em um ambiente de execução do Spark, onde os argumentos de entrada fornecidos à sua função e seu valor de retorno são Spark. DataFrames O número de parâmetros de entrada em sua função de transformação deve corresponder ao número de entradas configuradas no decorador `@feature_processor`. 

Para obter mais informações sobre o decorador `@feature_processor`, consulte o [SDK do Processador de atributos do Feature Store para Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor).

O código a seguir é um exemplo básico de como usar o decorador `@feature_processor`. Para obter exemplos de casos de uso mais específicos, consulte [Exemplo de código de Processamento de atributos para casos de uso comuns](feature-store-feature-processor-examples.md).

O Feature Processor SDK pode ser instalado a partir do SDK do SageMaker Python e seus extras usando o comando a seguir. 

```
pip install sagemaker[feature-processor]
```

Nos exemplos a seguir, `us-east-1` é a região do atributo, `111122223333` é o ID da conta do proprietário do atributo e `your-feature-group-name` é o nome do grupo de atributos.

Veja a seguir uma definição básica de Processador de atributos, em que o decorador `@feature_processor` configura uma entrada CSV do Amazon S3 para ser carregada e fornecida à sua função de transformação (por exemplo, `transform`) e a prepara para ingestão em um grupo de atributos. A última linha o executa.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

O parâmetro `@feature_processor` inclui:
+ `inputs` (Lista [str]): uma lista de fontes de dados que são usadas em seu Processador de atributos do Feature Store. Se suas fontes de dados forem grupos de atributos ou armazenadas no Amazon S3, você poderá usar as definições da fonte de dados fornecidas pelo Feature Store para o Processador de atributos. Para obter uma lista completa das definições de fontes de dados fornecidas pela Feature Store, consulte a [Fonte de dados do Feature Processor](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source) na Amazon SageMaker Feature Store. Leia a documentação.
+ `output` (str): o ARN do grupo de atributos para ingerir a saída da função decorada.
+ `target_stores` (Opcional [List [str]]): uma lista de armazenamentos (por exemplo, `OnlineStore` ou`OfflineStore`) a serem ingeridos na saída. Se não forem especificados, os dados serão ingeridos em todos os armazenamentos habilitados do grupo de atributos de saída.
+ `parameters` (Dict [str, Any]): um dicionário a ser fornecido para sua função de transformação. 
+ `enable_ingestion` (bool): um sinalizador para indicar se as saídas da função de transformação são ingeridas no grupo de atributos de saída. Esse sinalizador é útil durante a fase de desenvolvimento. Se não for especificado, a ingestão será habilitada.

Os parâmetros opcionais da função encapsulada (fornecidos como argumento se fornecidos na assinatura da função) incluem:
+ `params` (Dict [str, Any]): o dicionário definido nos parâmetros `@feature_processor`. Ele também contém parâmetros configurados pelo sistema que podem ser referenciados com a chave `system`, como o parâmetro `scheduled_time`.
+ `spark`(SparkSession): Uma referência à SparkSession instância inicializada para o aplicativo Spark.

O código a seguir é um exemplo do uso dos parâmetros `params` e `spark`.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

O parâmetro do sistema `scheduled_time` (fornecido no argumento `params` da sua função) é um valor importante para compatibilidade com a repetição de cada execução. O valor pode ajudar a identificar de forma exclusiva a execução do Processador de atributos e pode ser usado como um ponto de referência para entradas baseadas em intervalos de datas (por exemplo, carregando apenas os dados das últimas 24 horas) para garantir o intervalo de entrada independente do tempo real de execução do código. Se o Processador de atributos for executado com base em uma programação (consulte [Execuções programadas e baseadas em eventos para pipelines do Processador de atributos](feature-store-feature-processor-schedule-pipeline.md)), seu valor será fixado no horário programado para execução. O argumento pode ser substituído durante a execução síncrona usando a API de execução do SDK para dar compatibilidade com casos de uso como preenchimento de dados ou reexecução de uma execução anterior perdida. Seu valor é a hora atual se o Processador de atributos for executado de outra forma.

Para obter informações sobre a criação do código Spark, consulte o [Guia do programa SQL do Spark](https://spark.apache.org/docs/latest/sql-programming-guide.html).

Para obter mais exemplos de código para casos de uso comuns, consulte o [Exemplo de código de Processamento de atributos para casos de uso comuns](feature-store-feature-processor-examples.md). 

Observe que as funções de transformação decoradas com `@feature_processor` não retornam um valor. Para testar programaticamente sua função, você pode remover ou corrigir o decorador `@feature_processor` de forma que ele atue como uma passagem para a função agrupada. Para obter mais detalhes sobre o `@feature_processor` decorador, consulte [Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html). 

# Executar o Processador de atributos do Feature Store remotamente
<a name="feature-store-feature-processor-execute-remotely"></a>

Para executar seus processadores de recursos em grandes conjuntos de dados que exigem hardware mais poderoso do que o disponível localmente, você pode decorar seu código com o `@remote` decorador para executar seu código Python local como um trabalho de treinamento distribuído de um ou vários nós SageMaker . Para obter mais informações sobre como executar seu código como um trabalho de SageMaker treinamento, consulte[Execute seu código local como um trabalho SageMaker de treinamento](train-remote-decorator.md). 

Veja a seguir um exemplo de uso do decorador `@remote` junto com o decorador `@feature_processor`.

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

O parâmetro `spark_config` indica que o trabalho remoto é executado como uma aplicação do Spark. A `SparkConfig` instância pode ser usada para configurar a configuração do Spark e fornecer dependências adicionais ao aplicativo Spark, como arquivos Python e arquivos. JARs

Para iterações mais rápidas ao desenvolver seu código de processamento de atributos, você pode especificar o argumento `keep_alive_period_in_seconds` no decorador `@remote` para reter os atributos configurados em um grupo de aquecimento para trabalhos de treinamento subsequentes. Para obter mais informações sobre grupos de aquecimento, consulte `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)` no Guia de referência da API.

O código a seguir é um exemplo do local `requirements.txt:`.

```
sagemaker>=2.167.0
```

Isso instalará a versão correspondente do SageMaker SDK na tarefa remota, necessária para executar o método anotado por. `@feature-processor` 

# Criar e executar pipelines do Processador de atributos do Feature Store
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

O SDK do Feature Processor APIs permite promover suas definições de processador de recursos em um pipeline de SageMaker IA totalmente gerenciado. Para mais informações sobre o Pipelines, consulte [Visão geral do Pipelines](pipelines-overview.md). Para converter suas definições de processador de recursos em um pipeline de SageMaker IA, use a `to_pipeline` API com sua definição de processador de recursos. Você pode agendar execuções de seu processador de recursos. A definição pode ser agendada, monitorá-las operacionalmente com CloudWatch métricas e integrá-las EventBridge para atuar como fontes de eventos ou assinantes. Para obter mais informações sobre o monitoramento de pipelines criados com o Pipelines, consulte [Monitore os pipelines SageMaker do processador de recursos da Amazon Feature Store](feature-store-feature-processor-monitor-pipeline.md).

Para ver seus pipelines do Processador de atributos, consulte [Visualizar as execuções do pipeline a partir do console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

Se sua função também estiver decorada com o decorador `@remote`, suas configurações serão transferidas para o pipeline do Processador de atributos. Você pode especificar configurações avançadas, como tipo e contagem de instâncias de computação, dependências de runtime, configurações de rede e segurança usando o decorador `@remote`.

O exemplo a seguir usa `to_pipeline` `execute` APIs e.

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

A API `to_pipeline` é semanticamente uma operação de inserção. Ele atualiza o pipeline se ele já existir; caso contrário, ele cria um pipeline.

Opcionalmente, a `to_pipeline` API aceita um URI do Amazon S3 que faz referência a um arquivo contendo a definição do Feature Processor para associá-lo ao pipeline do Feature Processor para rastrear a função de transformação e suas versões em SageMaker sua linhagem de aprendizado de máquina de IA.

Para recuperar uma lista de cada pipeline do Processador de atributos em sua conta, você pode usar a API `list_pipelines`. Uma solicitação subsequente à API `describe` retorna detalhes relacionados ao pipeline do Processador de Atributos, incluindo, mas não se limitando a, Pipelines e detalhes de programação.

O exemplo a seguir usa `list_pipelines` `describe` APIs e.

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# Execuções programadas e baseadas em eventos para pipelines do Processador de atributos
<a name="feature-store-feature-processor-schedule-pipeline"></a>

As execuções do pipeline de processamento de recursos do Amazon SageMaker Feature Store podem ser configuradas para serem iniciadas de forma automática e assíncrona com base em uma programação pré-configurada ou como resultado de outro evento de serviço. AWS Por exemplo, você pode programar pipelines de processamento de atributos para serem executados no primeiro dia de cada mês ou encadear dois pipelines juntos para que um pipeline de destino seja executado automaticamente após a conclusão da execução do pipeline de origem.

**Topics**
+ [Execuções baseadas em programação](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [Execuções baseadas em eventos](#feature-store-feature-processor-schedule-pipeline-event-based)

## Execuções baseadas em programação
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

O Feature Processor SDK fornece uma [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)API para executar pipelines do Feature Processor de forma recorrente com a integração com o Amazon EventBridge Scheduler. A programação pode ser especificada com uma `cron` expressão `at``rate`,, ou usando o [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)parâmetro com as mesmas expressões suportadas pela Amazon EventBridge. A API de programação é semanticamente uma operação de inserção, pois atualiza a programação, se ele já existir; caso contrário, ela o cria. Para obter mais informações sobre EventBridge expressões e exemplos, consulte [Tipos de EventBridge agendamento no Scheduler no Guia](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html) do usuário do EventBridge Scheduler.

Os exemplos a seguir usam a API [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) do Processador de atributos, usando as expressões `at`, `rate` e `cron`.

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

O fuso horário padrão para entradas de data e hora na API `schedule` é o UTC. Para obter mais informações sobre expressões de EventBridge agendamento do Scheduler, consulte a [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)documentação de referência da API EventBridge Scheduler.

As execuções programadas do pipeline do Processador de atributos fornecem à sua função de transformação o runtime programado, para ser usado como um token de idempotência ou um ponto de referência fixo para entradas baseadas em intervalos de datas. Para desativar (ou seja, pausar) ou reativar uma programação, use o parâmetro `state` da API [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) com `‘DISABLED’` ou`‘ENABLED’`, respectivamente.

Para obter mais informações sobre o Processador de atributos, consulte [Fontes de dados do SDK do Processador de atributos](feature-store-feature-processor-data-sources-sdk.md). 

## Execuções baseadas em eventos
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

Um pipeline de processamento de atributos pode ser configurado para ser executado automaticamente quando um evento AWS ocorrer. O SDK de Processamento de atributos fornece uma função [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) que aceita uma lista de eventos de origem e um pipeline de destino. Os eventos de origem devem ser instâncias de [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent), que especificam um pipeline e eventos de [status de execução](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus). 

A `put_trigger` função configura uma EventBridge regra e uma meta da Amazon para rotear eventos e permite que você especifique um padrão de EventBridge evento para responder a qualquer AWS evento. Para obter informações sobre esses conceitos, consulte EventBridge [as regras](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html), [metas](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html) e [padrões de eventos](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html) da Amazon.

Os gatilhos podem ser ativados ou desativados. EventBridge iniciará a execução de um pipeline de destino usando a função fornecida no `role_arn` parâmetro da `put_trigger` API. A função de execução é usada por padrão se o SDK for usado em um ambiente Amazon SageMaker Studio Classic ou Notebook. Para obter informações sobre como obter sua função de execução, consulte[Obtenha um perfil de execução](sagemaker-roles.md#sagemaker-roles-get-execution-role).

O seguinte exemplo define:
+ Um pipeline de SageMaker IA usando a `to_pipeline` API, que inclui o nome do pipeline de destino (`target-pipeline`) e sua função de transformação (`transform`). Para obter informações sobre seu Processador de atributos e a função de transformação, consulte [Fontes de dados do SDK do Processador de atributos](feature-store-feature-processor-data-sources-sdk.md).
+ Um gatilho usando a API `put_trigger`, que absorve o `FeatureProcessorPipelineEvent` para o evento e o nome do seu pipeline de destino (`target-pipeline`). 

  O `FeatureProcessorPipelineEvent` define o gatilho para quando o status do seu pipeline de origem (`source-pipeline`) se torna `Succeeded`. Para obter informações sobre a função de evento do Pipeline do Processador de atributos, consulte [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent) na seção Ler os documentos do Feature Store. 

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

Para obter um exemplo do uso de gatilhos baseados em eventos para criar execuções contínuas e novas tentativas automáticas para seu pipeline do Processador de atributos, consulte [Execuções contínuas e novas tentativas automáticas usando gatilhos baseados em eventos](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries).

Para obter um exemplo do uso de gatilhos baseados em eventos para criar *streaming* contínuo e novas tentativas automáticas usando gatilhos baseados em eventos, consulte [Exemplos de fontes de dados personalizadas de streaming](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming). 

# Monitore os pipelines SageMaker do processador de recursos da Amazon Feature Store
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS fornece ferramentas de monitoramento para monitorar seus recursos e aplicativos de SageMaker IA da Amazon em tempo real, relatar quando algo dá errado e realizar ações automáticas quando apropriado. Os pipelines do Processador de Atributos do Feature Store são do Pipelines, portanto, os mecanismos e integrações de monitoramento padrão estão disponíveis. Métricas operacionais, como falhas de execução, podem ser monitoradas por meio de CloudWatch métricas da Amazon e EventBridge eventos da Amazon. 

Para obter mais informações sobre como monitorar e operacionalizar o Processador de atributos do Feature Store, consulte os seguintes atributos:
+ [AWS Recursos de monitoramento na Amazon SageMaker AI](monitoring-overview.md)- Orientação geral sobre atividades de monitoramento e auditoria de recursos de SageMaker IA.
+ [SageMaker métricas de pipelines](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines)- CloudWatch Métricas emitidas por pipelines.
+ [SageMaker alteração do estado de execução do pipeline](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline)- EventBridge eventos emitidos para pipelines e execuções.
+ [Solução de problemas do Amazon SageMaker Pipelines](pipelines-troubleshooting.md) - Dicas gerais de depuração e solução de problemas para Pipelines.

Os registros de execução do Feature Store Feature Processor podem ser encontrados no Amazon CloudWatch Logs, no `/aws/sagemaker/TrainingJobs` grupo de registros, onde você pode encontrar os fluxos de registros de execução usando convenções de pesquisa. Para execuções criadas invocando diretamente a função decorada `@feature_processor`, você pode encontrar logs no console do seu ambiente de execução local. Para execuções ` @remote` decoradas, o nome do stream de CloudWatch registros contém o nome da função e a data e hora da execução. Para execuções de pipeline do Feature Processor, o stream de CloudWatch registros da etapa contém a `feature-processor` string e o ID de execução do pipeline.

Os pipelines do Feature Store Feature Processor e os status de execução recentes podem ser encontrados no Amazon SageMaker Studio Classic para um determinado grupo de recursos na interface do usuário do Feature Store. Grupos de atributos relacionados aos pipelines do Processador de atributos como entradas ou saídas são exibidos na interface do usuário. Além disso, a visualização de linhagem pode fornecer contexto para execuções upstream, como pipelines de produção de dados do Processador de atributos e fontes de dados, para depuração adicional. Para obter mais informações sobre como usar a visualização de linhagem usando o Studio Classic, consulte [Visualizar a linhagem a partir do console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).

# Permissões e funções de execução do IAM
<a name="feature-store-feature-processor-iam-permissions"></a>

Para usar o SDK do Amazon SageMaker Python, é necessário ter permissões para interagir com. Serviços da AWS As políticas a seguir são necessárias para a funcionalidade completa do Processador de atributos. Você pode anexar as políticas [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS gerenciadas [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html)e as políticas anexadas à sua função do IAM. Para obter informações sobre como anexar políticas ao seu perfil do IAM, consulte [Adicionar políticas ao seu perfil do IAM](feature-store-adding-policies.md). Veja os seguintes exemplos para obter mais detalhes:

A política de confiança da função à qual essa política é aplicada deve permitir os princípios “scheduler.amazonaws.com”, “sagemaker.amazonaws.com” e “glue.amazonaws.com”.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# Restrições, limites e cotas do Processador de atributos
<a name="feature-store-feature-processor-quotas"></a>

O processamento de SageMaker recursos da Amazon Feature Store depende do rastreamento de linhagem de aprendizado de máquina (ML) de SageMaker IA. O Processador de atributos do Feature Store usa contextos de linhagem para representar e rastrear Pipelines de processamento de atributos e versões do Pipeline. Cada Processador de atributos do Feature Store consome pelo menos dois contextos de linhagem (um para o Pipeline de Processamento de atributos e outro para a versão). Se a fonte de dados de entrada ou saída de um Pipeline de Processamento de atributos mudar, um contexto de linhagem adicional será criado. Você pode atualizar os limites de linhagem do SageMaker AI ML entrando em contato com o AWS suporte para aumentar o limite. Os limites padrão para atributos usados pelo Processador de atributos do Feature Store são os seguintes: Para obter informações sobre o rastreamento de linhagem do SageMaker AI ML, consulte[Rastreamento SageMaker de linhagem do Amazon ML](lineage-tracking.md).

Para obter mais informações sobre cotas de SageMaker IA, consulte os [endpoints e cotas de SageMaker IA da Amazon](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html).

Limites de linhagem por região
+ Contextos: 500 (limite flexível)
+ Artefatos: 6.000 (limite flexível)
+ Associações: 6.000 (limite flexível)

Limites de treinamento por Região
+ Maior tempo de execução para um trabalho de treinamento: 432.000 segundos
+ Número máximo de instâncias por trabalho de treinamento: 20
+ Número máximo de solicitações de `CreateTrainingJob` que podem ser feitas por segundo nesta conta na região atual: 1 TPS
+ Período de keep alive para reutilização de clusters: 3.600 segundos

Número máximo de Pipelines e execuções simultâneas de pipelines por Região
+ Número máximo de pipelines permitidos por conta: 500
+ Número máximo de pipelines simultâneos permitidos por conta: 20
+ Tempo em que as execuções do pipeline atingem o tempo limite: 672 horas

# Fontes de dados
<a name="feature-store-feature-processor-data-sources"></a>

O Amazon SageMaker Feature Store Feature Processing oferece suporte a várias fontes de dados. O SDK do Processador de atributos para Python (Boto3) fornece constructos para carregar dados de grupos de atributos ou objetos armazenados no Amazon S3. Além disso, você pode criar fontes de dados personalizadas para carregar dados de outras fontes de dados. Para obter informações sobre as fontes de dados fornecidas pelo Feature Store, consulte [Fonte de dados do Processador de atributos do SDK Python do Feature Store](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

**Topics**
+ [Fontes de dados do SDK do Processador de atributos](feature-store-feature-processor-data-sources-sdk.md)
+ [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md)
+ [Exemplos de fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom-examples.md)

# Fontes de dados do SDK do Processador de atributos
<a name="feature-store-feature-processor-data-sources-sdk"></a>

O Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) fornece construções para carregar dados de grupos de recursos ou objetos armazenados no Amazon S3. Para obter uma lista completa das definições das fontes de dados fornecidas pelo Feature Store, consulte a [Fonte de dados do Processador de atributos do SDK Python do Feature Store](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

Para ver exemplos de como usar as definições da fonte de dados do SDK Python do Feature Store para, consulte [Exemplo de código de Processamento de atributos para casos de uso comuns](feature-store-feature-processor-examples.md).

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

O `FeatureGroupDataSource` é usado para especificar um grupo de atributos como fonte de dados de entrada para um Processador de atributos. Os dados podem ser carregados de um grupo de atributos do armazenamento offline. A tentativa de carregar seus dados de um grupo de atributos do armazenamento on-line resultará em um erro de validação. Você pode especificar os deslocamentos inicial e final para limitar os dados que são carregados em um intervalo de tempo específico. Por exemplo, você pode especificar um início de deslocamento de “14 dias” para carregar somente as últimas duas semanas de dados e também pode especificar um término de deslocamento de “7 dias” para limitar a entrada à semana anterior de dados.

## Definições da fonte de dados fornecidas pelo Feature Store
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

O SDK Python do Feature Store contém definições da fonte de dados que podem ser usadas para especificar várias fontes de dados de entrada para um Processador de atributos. Isso inclui fontes de tabelas CSV, Parquet e Iceberg. Para obter uma lista completa das definições das fontes de dados fornecidas pelo Feature Store, consulte a [Fonte de dados do Processador de atributos do SDK Python do Feature Store](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

# Fontes de dados personalizadas
<a name="feature-store-feature-processor-data-sources-custom"></a>

Nesta página, descreveremos como criar uma classe de fonte de dados personalizada e mostraremos alguns exemplos de uso. Com fontes de dados personalizadas, você pode usar o SDK de SageMaker IA para Python ( APIs Boto3) fornecido da mesma forma como se estivesse usando fontes de dados fornecidas pela Amazon Feature Store. SageMaker 

Para usar uma fonte de dados personalizada para transformar e ingerir dados em um grupo de atributos usando o Processamento de atributos, você precisará estender a classe `PySparkDataSource` com os seguintes membros e funções da classe:
+ `data_source_name` (str): um nome arbitrário para a fonte de dados. Por exemplo, Amazon Redshift, Snowflake ou um ARN do Catálogo Glue.
+ `data_source_unique_id` (str): um identificador exclusivo que se refere ao recurso específico que está sendo acessado. Por exemplo, nome da tabela, ARN da tabela DDB, prefixo Amazon S3. Todo o uso do mesmo `data_source_unique_id` em fontes de dados personalizadas será associado à mesma fonte de dados na visualização de linhagem. A linhagem inclui informações sobre o código de execução de um fluxo de trabalho de processamento de atributos, quais fontes de dados foram usadas e como elas são ingeridas no grupo de atributos ou no atributo. Para obter informações sobre a visualização da linhagem de um grupo de atributos no **Studio**, consulte [Visualizar a linhagem a partir do console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).
+ `read_data` (func): um método usado para se conectar ao processador de atributos. Retorna um estrutura de dados do Spark. Para obter exemplos, consulte [Exemplos de fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom-examples.md).

Ambos `data_source_name` `data_source_unique_id` são usados para identificar de forma exclusiva sua entidade de linhagem. Veja a seguir um exemplo de uma classe de fonte de dados personalizada chamada `CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Exemplos de fontes de dados personalizadas
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Esta seção fornece exemplos de implantações de fontes de dados personalizadas para Processadores de atributos. Para obter mais informações sobre fontes de dados personalizadas, consulte [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md).

A segurança é uma responsabilidade compartilhada AWS entre nossos clientes. AWS é responsável por proteger a infraestrutura que executa os serviços no Nuvem AWS. Os clientes são responsáveis por todas as tarefas necessárias de configuração e gerenciamento de segurança. Por exemplo, segredos como credenciais de acesso aos armazenamentos de dados não devem ser codificados em suas fontes de dados personalizadas. Você pode usar AWS Secrets Manager para gerenciar essas credenciais. Para obter informações sobre o Secrets Manager, consulte [O que é AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) no guia do AWS Secrets Manager usuário. Os exemplos a seguir usarão o Secrets Manager para suas credenciais.

**Topics**
+ [Exemplos de fontes de dados personalizadas dos Clusters do Amazon Redshift (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Exemplos de fontes de dados personalizadas do Snowflake](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Exemplos de fontes de dados personalizadas do Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Exemplos de fontes de dados personalizadas de streaming](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Exemplos de fontes de dados personalizadas dos Clusters do Amazon Redshift (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

O Amazon Redshift oferece um driver JDBC que pode ser usado para ler dados com o Spark. Para obter informações sobre como baixar o driver JDBC do Amazon Redshift, consulte [Baixar o driver JDBC do Amazon Redshift, versão 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Para criar a classe de fonte de dados personalizada do Amazon Redshift, você precisará substituir o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md). 

Para se conectar a um cluster do Amazon Redshift, você precisa de:
+ URL do JDBC do Amazon Redshift (`jdbc-url`)

  Para obter informações sobre como obter o URL do JDBC do Amazon Redshift, consulte [Obter o URL do JDBC no Guia do desenvolvedor de banco de dados do Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html).
+ Nome de usuário (`redshift-user`) e senha (`redshift-password`) do Amazon Redshift

  Para obter informações sobre como criar e gerenciar usuários de banco de dados usando os comandos SQL do Amazon Redshift, consulte [Usuários](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) no Guia do desenvolvedor de banco de dados do Amazon Redshift.
+ Nome da tabela do Amazon Redshift (`redshift-table-name`)

  Para obter informações sobre como criar uma tabela com alguns exemplos, consulte [CRIAR TABELA](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) no Guia do desenvolvedor de banco de dados do Amazon Redshift.
+ (Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (`secret-redshift-account-info`) onde você armazena seu nome de usuário e senha de acesso ao Amazon Redshift no Secrets Manager.

  Para obter informações sobre o Secrets Manager, consulte [Encontre segredos AWS Secrets Manager no](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Guia AWS Secrets Manager do Usuário. 
+ Região da AWS (`your-region`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

O exemplo a seguir demonstra como recuperar o URL do JDBC e o token de acesso pessoal do Secrets Manager e substituir o `read_data` pela sua classe de fonte de dados personalizada, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

O exemplo a seguir mostra como conectar o `RedshiftDataSource` ao decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Para executar o trabalho do processador de atributos remotamente, você precisa fornecer o driver jdbc definindo o `SparkConfig` e passando-o para o decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemplos de fontes de dados personalizadas do Snowflake
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

O Snowflake fornece um conector Spark que pode ser usado para seu decorador `feature_processor`. Para obter informações sobre o conector Snowflake para Spark, consulte [Conector Snowflake para Spark](https://docs.snowflake.com/en/user-guide/spark-connector) na documentação do Snowflake.

Para criar a classe de fonte de dados personalizada do Snowflake, você precisará substituir o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md) e adicionar os pacotes de conectores do Spark ao classpath do Spark. 

Para se conectar a uma fonte de dados do Snowflake, você precisa:
+ URL do Snowflake (`sf-url`)

  Para obter informações sobre URLs como acessar as interfaces web do Snowflake, consulte [Identificadores de conta](https://docs.snowflake.com/en/user-guide/admin-account-identifier) na documentação do Snowflake.
+ Banco de dados do Snowflake (`sf-database`) 

  Para obter informações sobre como obter o nome do seu banco de dados usando o Snowflake, consulte [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) na documentação do Snowflake.
+ Esquema do banco de dados do Snowflake (`sf-schema`) 

  Para obter informações sobre como obter o nome do seu esquema usando o Snowflake, consulte [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) na documentação do Snowflake.
+ Warehouse do Snowflake (`sf-warehouse`)

  Para obter informações sobre como obter o nome do seu warehouse usando o Snowflake, consulte [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) na documentação do Snowflake.
+ Nome da tabela do Snowflake (`sf-table-name`)
+ (Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (`secret-snowflake-account-info`) onde você armazena seu nome de usuário e senha de acesso ao Snowflake no Secrets Manager. 

  Para obter informações sobre o Secrets Manager, consulte [Encontre segredos AWS Secrets Manager no](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Guia AWS Secrets Manager do Usuário. 
+ Região da AWS (`your-region`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

O exemplo a seguir demonstra como recuperar o nome de usuário e senha do Snowflake no Secrets Manager e substituir a função `read_data` pela sua classe de fonte de dados personalizada `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

O exemplo a seguir mostra como conectar o `SnowflakeDataSource` ao decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Para executar o trabalho do processador de atributos remotamente, você precisa fornecer os pacotes definindo o `SparkConfig` e passando-os para o decorador `@remote`. Os pacotes Spark no exemplo a seguir mostra que `spark-snowflake_2.12` é a versão Scala do Processador de atributos, `2.12.0` é a versão do Snowflake que você deseja usar e `spark_3.3` é a versão do Spark do Processador de atributos. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemplos de fontes de dados personalizadas do Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

O Spark pode ler dados do Databricks usando o driver JDBC do Databricks. Para obter informações sobre o driver JDBC do Databricks, consulte [Configurar os drivers ODBC e JDBC do Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) na documentação do Databricks.

**nota**  
Você pode ler dados de qualquer outro banco de dados incluindo o driver JDBC correspondente no classpath do Spark. Para obter mais informações, consulte [JDBC para outros bancos de dados](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) no Guia do Spark SQL.

Para criar a classe de fonte de dados personalizada do Databricks, você precisará substituir o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md) e adicionar os arquivos jar do JDBC ao classpath do Spark. 

Para se conectar a uma fonte de dados do Databricks, você precisa:
+ URL do Databricks (`databricks-url`)

  Para obter informações sobre o URL do Databricks, consulte [Criar URL da conexão para o driver do Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) na documentação do Databricks.
+ Token de acesso pessoal do Databricks (`personal-access-token`)

  Para obter informações sobre seu token de acesso ao Databricks, consulte [Autenticação do token de acesso pessoal do Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) na documentação do Databricks.
+ Nome do catálogo de dados (`db-catalog`) 

  Para obter informações sobre o nome do catálogo do Databricks, consulte [Nome de catálogo](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) na documentação do Databricks.
+ Nome do esquema (`db-schema`)

  Para obter informações sobre o nome do esquema do Databricks, consulte [Nome do esquema](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) na documentação do Databricks.
+ Nome da tabela (`db-table-name`)

  Para obter informações sobre o nome da tabela do Databricks, consulte [Nome da tabela](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) na documentação do Databricks.
+ (Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (`secret-databricks-account-info`) onde você armazena seu nome de usuário e senha de acesso ao Databricks no Secrets Manager. 

  Para obter informações sobre o Secrets Manager, consulte [Encontre segredos AWS Secrets Manager no](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Guia AWS Secrets Manager do Usuário. 
+ Região da AWS (`your-region`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

O exemplo a seguir demonstra como recuperar o URL do JDBC e o token de acesso pessoal do Secrets Manager e substituir o `read_data` pela sua classe de fonte de dados personalizada, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

O exemplo a seguir mostra como fazer o upload dos arquivos jar do driver JDBC, `jdbc-jar-file-name.jar`, para o Amazon S3 para adicioná-lo ao classpath do Spark. Para obter informações sobre como baixar o driver JDBC (`jdbc-jar-file-name.jar`) do Spark a partir do Databricks, consulte [Baixar o driver JDBC](https://www.databricks.com/spark/jdbc-drivers-download) no site do Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Para executar o trabalho do processador de atributos remotamente, você precisa fornecer os arquivos jar definindo o `SparkConfig` e passando-os para o decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Exemplos de fontes de dados personalizadas de streaming
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Você pode se conectar a fontes de dados de streaming, como o Amazon Kinesis, e criar transformações com o Spark Structured Streaming para ler a partir de fontes de dados de streaming. Para obter informações sobre o conector Kinesis, consulte Conector [Kinesis para streaming estruturado do Spark](https://github.com/roncemer/spark-sql-kinesis) em. GitHub Para obter mais informações sobre o Amazon Kinesis, consulte [O que é o Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) no Guia do desenvolvedor do Amazon Kinesis.

Para criar a classe de fonte de dados personalizada do Amazon Kinesis, você precisará estender a classe `BaseDataSource` e sobrescrever o método `read_data` do [Fontes de dados personalizadas](feature-store-feature-processor-data-sources-custom.md).

Para se conectar a um Amazon Kinesis Data Streams, você precisa:
+ ARN do Kinesis (`kinesis-resource-arn`) 

  Para obter informações sobre o stream de dados do Kinesis ARNs, consulte [Amazon Resource Names (ARNs) para Kinesis Data Streams no Guia do desenvolvedor do Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format).
+ Nome do fluxo de dados do Kinesis (`kinesis-stream-name`)
+ Região da AWS (`your-region`)

  Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) na documentação do Boto3.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

O exemplo a seguir demonstra como conectar o `KinesisDataSource` ao decorador `feature_processor`. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

No código de exemplo acima, usamos algumas opções do Spark Structured Streaming ao transmitir microlotes para seu grupo de atributos. Para ver uma lista completa de opções, consulte o [Guia de programação de streaming estruturado](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) na documentação do Apache Spark. 
+ O modo sink `foreachBatch` é um atributo que permite aplicar operações e escrever lógica nos dados de saída de cada microlote de uma consulta de streaming. 

  Para obter informações sobre isso`foreachBatch`, consulte [Usando o Foreach e ForeachBatch no Guia](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) de programação de streaming estruturado do Apache Spark. 
+ A opção `checkpointLocation` salva periodicamente o estado da aplicação de streaming. O log de streaming é salvo no local `s3a://checkpoint-path` do ponto de verificação.

  Para obter informações sobre a opção `checkpointLocation`, consulte [Recuperando-se de falhas com pontos de verificação](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) no Guia de programação do Apache Spark Structured Streaming. 
+ A configuração `trigger` define com que frequência o processamento em microlote é acionado em uma aplicação de streaming. No exemplo, o tipo de gatilho de tempo de processamento é usado com intervalos de microlote de um minuto, especificados por `trigger(processingTime="1 minute")`. Para preencher a partir de uma fonte de fluxo, você pode usar o tipo de gatilho disponível agora, especificado por `trigger(availableNow=True)`.

  Para ver uma lista completa dos tipos de `trigger`, consulte [Gatilhos](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) no Guia de programação do Apache Spark Structured Streaming.

**Streaming contínuo e novas tentativas automáticas usando gatilhos baseados em eventos**

O Feature Processor usa o SageMaker treinamento como infraestrutura computacional e tem um limite máximo de tempo de execução de 28 dias. Você pode usar gatilhos baseados em eventos para estender seu streaming contínuo por um longo período de tempo e se recuperar de falhas transitórias. Para obter mais informações sobre execuções baseadas em programações e eventos, consulte [Execuções programadas e baseadas em eventos para pipelines do Processador de atributos](feature-store-feature-processor-schedule-pipeline.md).

Veja a seguir um exemplo de configuração de um gatilho baseado em eventos para manter o pipeline de streaming do Processador de atributos funcionando continuamente. Ele usa a função de transformação de streaming definida no exemplo anterior. Um pipeline de destino pode ser configurado para ser acionado quando ocorre um evento `STOPPED` ou `FAILED` para a execução de um pipeline de origem. Observe que o mesmo pipeline é usado como origem e destino para que seja executado continuamente.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# Exemplo de código de Processamento de atributos para casos de uso comuns
<a name="feature-store-feature-processor-examples"></a>

Os exemplos a seguir fornecem amostras de código de Processamento de atributos para casos de uso comuns. Para um exemplo mais detalhado de caderno mostrando casos de uso específicos, consulte o caderno de [processamento de SageMaker recursos da Amazon Feature Store](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb).

Nos exemplos a seguir, `us-east-1` é a região do atributo, `111122223333` é o ID da conta do proprietário do atributo e `your-feature-group-name` é o nome do grupo de atributos.

O conjunto de dados `transactions` usado nos seguintes exemplos tem o seguinte esquema:

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [Junção de dados de várias fontes de dados](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [Agregados de janelas deslizantes](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [Agregados de janelas em cascata](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [Promoção do armazenamento offline para o armazenamento on-line](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [Transformações com a biblioteca Pandas](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [Execuções contínuas e novas tentativas automáticas usando gatilhos baseados em eventos](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## Junção de dados de várias fontes de dados
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## Agregados de janelas deslizantes
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## Agregados de janelas em cascata
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## Promoção do armazenamento offline para o armazenamento on-line
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## Transformações com a biblioteca Pandas
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**Transformações com a biblioteca Pandas**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## Execuções contínuas e novas tentativas automáticas usando gatilhos baseados em eventos
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```