

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Fontes de dados e ingestão
<a name="feature-store-ingest-data"></a>

Os registros são adicionados aos seus grupos de atributos por meio da ingestão. Dependendo do caso de uso desejado, os registros ingeridos podem ser mantidos dentro do grupo de atributos ou não. Isso depende da configuração de armazenamento, se seu grupo de atributos usa o armazenamento offline ou online. O armazenamento offline é usado como um banco de dados histórico, normalmente usado para exploração de dados, treinamento de modelos de machine learning (ML) e inferência em lote. O armazenamento on-line é usado como uma pesquisa em tempo real de registros, normalmente usado para veiculação de modelos de ML. Para obter mais informações sobre conceitos e ingestão do Feature Store, consulte [Conceitos do Feature Store](feature-store-concepts.md).

Há várias maneiras de trazer seus dados para a Amazon SageMaker Feature Store. O Feature Store oferece uma chamada de API única para ingestão de dados chamada `PutRecord`, que permite ingerir dados em lotes ou de fontes de streaming. Você pode usar o Amazon SageMaker Data Wrangler para criar recursos e, em seguida, inserir seus recursos em sua Feature Store. Você também pode usar o Amazon EMR para ingestão de dados em lote por meio de um conector Spark.

Nos tópicos a seguir, discutiremos a diferença entre 

**Topics**
+ [

## Ingestão de streaming
](#feature-store-ingest-data-stream)
+ [

## Data Wrangler com o Feature Store
](#feature-store-data-wrangler-integration)
+ [

# Ingestão em lote com a Amazon SageMaker Feature Store Spark
](batch-ingestion-spark-connector-setup.md)

## Ingestão de streaming
<a name="feature-store-ingest-data-stream"></a>

É possível usar fontes de streaming, como o Kafka ou Kinesis, como fonte de dados quando os registros são extraídos e enviados diretamente ao armazenamento on-line para treinamento, inferência ou criação de atributos. Os registros podem ser ingeridos em seu grupo de atributos usando a chamada de API `PutRecord` síncrona. Como essa é uma chamada de API síncrona, ela permite que pequenos lotes de atualizações sejam enviados em uma única chamada de API. Isso permite que você mantenha um alto nível de atualização dos valores do atributo e publique os valores assim que uma atualização for detectada. Esses também são chamados de atributos de *streaming*. 

## Data Wrangler com o Feature Store
<a name="feature-store-data-wrangler-integration"></a>

O Data Wrangler é um recurso do Studio Classic que fornece uma end-to-end solução para importar, preparar, transformar, caracterizar e analisar dados. O Data Wrangler permite que você projete seus atributos e os inclua nos grupos de atributos do seu armazenamento on-line ou offline.

As instruções a seguir exportam um caderno Jupyter que contém todo o código-fonte necessário para criar um grupo de atributos do Feature Store que adiciona seus atributos do Data Wrangler a um armazenamento on-line ou offline.

As instruções sobre como exportar seu fluxo de dados do Data Wrangler para o Feature Store no console variam dependendo se você habilitou [SageMaker Estúdio Amazon](studio-updated.md) ou [Amazon SageMaker Studio Clássico](studio.md) como sua experiência padrão.

### Exportar seu fluxo de dados do Data Wrangler para o Feature Store se o Studio for sua experiência padrão (console)
<a name="feature-store-ingest-data-wrangler-integration-with-studio-updated"></a>

1. Abra o console do Studio seguindo as instruções em [Inicie o Amazon SageMaker Studio](studio-updated-launch.md).

1. Escolha **Dados** no painel esquerdo para expandir a lista suspensa.

1. Na lista suspensa, escolha **Data Wrangler**.

1. Se você já tiver uma instância do Amazon SageMaker Canvas em execução, escolha **Open Canvas**.

   Se você não tiver uma instância do SageMaker Canvas em execução, escolha **Executar no Canvas**.

1. No console do SageMaker Canvas, escolha **Data Wrangler** no painel de navegação esquerdo.

1. Escolha **Fluxos de dados** para visualizar seus fluxos de dados.

1. Escolha **\$1** para expandir a lista suspensa.

1. Escolha **Exportar fluxo de dados** para expandir a lista suspensa.

1. Escolha **Salvar na SageMaker Feature Store (via JupyterLab Notebook)**.

1. **Em “Exportar fluxo de dados como caderno”**, escolha uma das seguintes opções:
   + **Faça download de uma cópia local** para fazer download do fluxo de dados em sua máquina local.
   + **Exporte para o local do S3** para baixar o fluxo de dados para um local do Amazon Simple Storage Service e insira o local do Amazon S3 ou escolha **Procurar** para encontrar seu local do Amazon S3.

1. Escolha **Exportar**.

 Depois que o grupo de atributos for criado, você também poderá selecionar e juntar dados em vários grupos de atributos para criar novos atributos de engenharia no Data Wrangler e depois exportar seu conjunto de dados para um bucket do Amazon S3. 

Para obter mais informações sobre como exportar para a Feature Store, consulte [Exportar para a SageMaker AI Feature Store](https://docs.aws.amazon.com/sagemaker/latest/dg/data-wrangler-data-export.html#data-wrangler-data-export-feature-store). 

# Ingestão em lote com a Amazon SageMaker Feature Store Spark
<a name="batch-ingestion-spark-connector-setup"></a>

O Amazon SageMaker Feature Store Spark é um conector Spark que conecta a biblioteca do Spark à Feature Store. O Feature Store Spark simplifica a ingestão de dados do Spark `DataFrame` para grupos de atributos. O Feature Store suporta a ingestão de dados em lote com o Spark, usando seu pipeline de ETL existente, no Amazon EMR, no GIS, em um trabalho, em um trabalho AWS Glue de SageMaker processamento da Amazon ou em um notebook. SageMaker 

Métodos para instalar e implantar a ingestão de dados em lote são fornecidos para desenvolvedores de Python e Scala. [Os desenvolvedores de Python podem usar a biblioteca `sagemaker-feature-store-pyspark` Python de código aberto para desenvolvimento local, instalação no Amazon EMR e para notebooks Jupyter seguindo as instruções no repositório Spark da Amazon Feature Store. SageMaker GitHub](https://github.com/aws/sagemaker-feature-store-spark) Os desenvolvedores do Scala podem usar o conector Spark da Feature Store disponível no repositório central Maven do [SDK Spark da Amazon SageMaker Feature Store](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk).

Você pode usar o conector Spark para ingerir dados das seguintes formas, dependendo se o armazenamento on-line, o armazenamento offline ou ambos estão habilitados:

1. Ingestão por padrão — Se a loja virtual estiver ativada, o conector Spark primeiro ingere seu dataframe na loja virtual usando a API. [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) Apenas o registro com o maior horário do evento permanecerá no armazenamento on-line. Se o armazenamento offline estiver habilitado, em 15 minutos, o Feature Store ingerirá seu dataframe no armazenamento offline. Para obter mais informações sobre como os armazenamentos on-line e offline funcionam, consulte [Conceitos do Feature Store](feature-store-concepts.md).

   Você pode fazer isso não especificando `target_stores` no método `.ingest_data(...)`. 

1. Ingestão direta do armazenamento offline: Se o armazenamento offline estiver habilitado, o lote do conector Spark ingere seu dataframe diretamente no armazenamento offline. A ingestão do dataframe diretamente no armazenamento offline não atualiza o armazenamento on-line.

   Você pode fazer isso definindo `target_stores=["OfflineStore"]` no método `.ingest_data(...)`.

1. Somente loja virtual — Se a loja virtual estiver ativada, o conector Spark ingere seu dataframe na loja virtual usando a API. [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) A ingestão do dataframe diretamente no armazenamento on-line não atualiza o armazenamento offline. 

   Você pode fazer isso definindo `target_stores=["OnlineStore"]` no método `.ingest_data(...)`.

Para obter informações sobre como usar diferentes métodos de ingestão, consulte [Implementações de exemplos](#batch-ingestion-spark-connector-example-implementations).

**Topics**
+ [

## Instalação do Feature Store Spark
](#batch-ingestion-spark-connector-installation)
+ [

## Recuperar o JAR para o Feature Store Spark
](#retrieve-jar-spark-connector)
+ [

## Implementações de exemplos
](#batch-ingestion-spark-connector-example-implementations)

## Instalação do Feature Store Spark
<a name="batch-ingestion-spark-connector-installation"></a>

 **Usuários do Scala** 

O SDK do Spark da Feature Store está disponível no repositório [central Maven do SDK Spark da Amazon SageMaker Feature Store](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk) para usuários do Scala.

 ****Requisitos**** 
+ Spark >= 3.0.0 e <= 3.3.0
+ `iceberg-spark-runtime` >= 0.14.0
+ Scala >= 2.12.x  
+  Amazon EMR >= 6.1.0 (somente se você estiver usando o Amazon EMR) 

 **Declarar a dependência em POM.xml** 

O conector do Feature Store Spark depende da biblioteca `iceberg-spark-runtime`. Portanto, você deve adicionar a versão correspondente da biblioteca `iceberg-spark-runtime` à dependência se estiver ingerindo dados em um grupo de atributos que você criou automaticamente com o formato de tabela Iceberg. Por exemplo, se você estiver usando o Spark 3.1, você deve declarar o seguinte no seu projeto `POM.xml`: 

```
 <dependency>
 <groupId>software.amazon.sagemaker.featurestore</groupId>
 <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId>
 <version>1.0.0</version>
 </dependency>
 
 <dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId>
   <version>0.14.0</version>
</dependency>
```

 **Usuários de Python** 

O SDK do Feature Store Spark está disponível no repositório de código aberto [ SageMaker Amazon Feature Store GitHub](https://github.com/aws/sagemaker-feature-store-spark) Spark.

 ****Requisitos**** 
+ Spark >= 3.0.0 e <= 3.3.0
+ Amazon EMR >= 6.1.0 (somente se você estiver usando o Amazon EMR) 
+ Kernel = `conda_python3`

Recomendamos configurar o `$SPARK_HOME` para o diretório em que você tem o Spark instalado. Durante a instalação, o Feature Store carrega o JAR necessário para `SPARK_HOME`, para que as dependências sejam carregadas automaticamente. É necessário iniciar uma JVM pelo Spark para fazer essa PySpark biblioteca funcionar.

 **Instalação local** 

Para obter mais informações sobre a instalação, habilite o modo detalhado anexando `--verbose` ao seguinte comando de instalação: 

```
pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

 **Instalação no Amazon EMR** 

Crie um cluster do Amazon EMR com a versão 6.1.0 ou posterior. Habilite o SSH para ajudá-lo a solucionar qualquer problema.

Para instalar a biblioteca, faça o seguinte:
+ Crie uma etapa personalizada no Amazon EMR.
+ Conecte-se ao seu cluster usando SSH e instale a biblioteca a partir daí.

**nota**  
As informações a seguir usam a versão 3.1 do Spark, mas você pode especificar qualquer versão que atenda aos requisitos.

```
export SPARK_HOME=/usr/lib/spark
sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

**nota**  
Se você quiser instalar o dependente JARs automaticamente no SPARK\$1HOME, não use a etapa de bootstrap.

 **Instalação em uma instância de SageMaker notebook** 

Instale uma versão compatível com o conector Spark usando os seguintes comandos: PySpark 

```
!pip3 install pyspark==3.1.1 
!pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

Se você estiver realizando a ingestão em lote no armazenamento offline, as dependências não estarão dentro do ambiente da instância do caderno.

```
from pyspark.sql import SparkSession
import feature_store_pyspark

extra_jars = ",".join(feature_store_pyspark.classpath_jars())

spark = SparkSession.builder \
    .config("spark.jars", extra_jars) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \
    .getOrCreate()
```

 **Instalação em cadernos com GIS** 

**Importante**  
Você deve usar a AWS Glue versão 2.0 ou posterior.

Use as informações a seguir para ajudá-lo a instalar o PySpark conector em uma sessão AWS Glue interativa (GIS).

O Amazon SageMaker Feature Store Spark exige que um conector JAR específico do Spark durante a inicialização da sessão seja carregado em seu bucket do Amazon S3. Para obter mais informações sobre como carregar o JAR necessário para o bucket do S3, consulte [Recuperar o JAR para o Feature Store Spark](#retrieve-jar-spark-connector).

Depois de fazer o upload do JAR, você deve fornecer o JAR às sessões do GIS usando o comando a seguir. 

```
%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar
```

Para instalar o Feature Store Spark em AWS Glue tempo de execução, use o comando `%additional_python_modules` mágico no notebook GIS. AWS Glue é executado `pip` nos módulos que você especificou abaixo`%additional_python_modules`.

```
%additional_python_modules sagemaker-feature-store-pyspark-3.1
```

Antes de iniciar a AWS Glue sessão, você deve usar os dois comandos mágicos anteriores.

 **Instalação em um AWS Glue trabalho** 

**Importante**  
Você deve usar a AWS Glue versão 2.0 ou posterior.

Para instalar o conector Spark em uma AWS Glue tarefa, use o `--extra-jars` argumento para fornecer o necessário JARs e `--additional-python-modules` instalar o conector Spark como parâmetros da tarefa ao criar a AWS Glue tarefa, conforme mostrado no exemplo a seguir. Para obter mais informações sobre como carregar o JAR necessário para o bucket do S3, consulte [Recuperar o JAR para o Feature Store Spark](#retrieve-jar-spark-connector).

```
glue_client = boto3.client('glue', region_name=region)
response = glue_client.create_job(
    Name=pipeline_id,
    Description='Feature Store Compute Job',
    Role=glue_role_arn,
    ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run},
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location_uri,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--TempDir': temp_dir_location_uri,
        '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1',
        '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar",
        ...
    },
    MaxRetries=3,
    NumberOfWorkers=149,
    Timeout=2880,
    GlueVersion='3.0',
    WorkerType='G.2X'
)
```

 **Instalação em uma tarefa de SageMaker processamento da Amazon** 

Para usar o Feature Store Spark com trabalhos SageMaker de processamento da Amazon, traga sua própria imagem. Para obter informações sobre como trazer sua própria imagem, consulte [Imagens personalizadas no Amazon SageMaker Studio Classic](studio-byoi.md). Adicione a etapa de instalação a um Dockerfile. Depois de enviar a imagem do Docker para um repositório Amazon ECR, você pode usar o PySparkProcessor para criar o trabalho de processamento. Para obter mais informações sobre a criação de uma tarefa de processamento com o PySpark processador, consulte[Executar um Trabalho de Processamento com o Apache Spark](use-spark-processing-container.md).

Veja a seguir um exemplo da adição de uma etapa de instalação ao Dockerfile.

```
FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0

RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

## Recuperar o JAR para o Feature Store Spark
<a name="retrieve-jar-spark-connector"></a>

Para recuperar o JAR de dependência do Feature Store Spark, você deve instalar o conector Spark a partir do repositório Python Package Index (PyPI) usando `pip` em qualquer ambiente Python com acesso à rede. Um notebook SageMaker Jupyter é um exemplo de ambiente Python com acesso à rede.

O comando a seguir instala o conector Spark.

```
!pip install sagemaker-feature-store-pyspark-3.1      
```

Depois de instalar o Feature Store Spark, você pode recuperar a localização do JAR e fazer o upload do JAR para o Amazon S3.

O comando `feature-store-pyspark-dependency-jars` fornece a localização do JAR de dependência necessário que o Feature Store Spark adicionou. Você pode usar o comando para recuperar o JAR e carregá-lo no Amazon S3.

```
jar_location = !feature-store-pyspark-dependency-jars
jar_location = jar_location[0]

s3_client = boto3.client("s3")
s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")
```

## Implementações de exemplos
<a name="batch-ingestion-spark-connector-example-implementations"></a>

------
#### [ Example Python script ]

 *FeatureStoreBatchIngestion.py* 

```
from pyspark.sql import SparkSession
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
import feature_store_pyspark

spark = SparkSession.builder \
                    .getOrCreate()

# Construct test DataFrame
columns = ["RecordIdentifier", "EventTime"]
data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")]

df = spark.createDataFrame(data).toDF(*columns)

# Initialize FeatureStoreManager with a role arn if your feature group is created by another account
feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
 
# Load the feature definitions from input schema. The feature definitions can be used to create a feature group
feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)

feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"

# Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
# https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn)

# To select the target stores for ingestion, you can specify the target store as the paramter
# If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"])

# If only OfflineStore is selected, the connector will batch write the data to offline store directly
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

# To retrieve the records failed to be ingested by spark connector
failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()
```

 **Envie um trabalho do Spark com um exemplo de script Python** 

A PySpark versão exige que um JAR extra dependente seja importado, portanto, etapas extras são necessárias para executar o aplicativo Spark. 

Se você não especificou `SPARK_HOME` durante a instalação, precisará carregar o necessário JARs na JVM durante a execução. `spark-submit` `feature-store-pyspark-dependency-jars`é um script Python instalado pela biblioteca Spark para buscar automaticamente o caminho para tudo para você. JARs 

```
spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py
```

Se você estiver executando esse aplicativo no Amazon EMR, recomendamos que você execute o aplicativo no modo cliente, para que você não precise distribuir o dependente JARs para outros nós de tarefas. Adicione mais uma etapa no cluster do Amazon EMR com o argumento Spark semelhante ao seguinte:

```
spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
```

------
#### [ Example Scala script ]

 *FeatureStoreBatchIngestion.scala* 

```
import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object TestSparkApp {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().getOrCreate()

    // Construct test DataFrame
    val data = List(
      Row("1", "2021-07-01T12:20:12Z"),
      Row("2", "2021-07-02T12:20:13Z"),
      Row("3", "2021-07-03T12:20:14Z")
    )
    
    val schema = StructType(
      List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType))
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    
    // Initialize FeatureStoreManager with a role arn if your feature group is created by another account
    val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
    
    // Load the feature definitions from input schema. The feature definitions can be used to create a feature group
    val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df)

    val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"
   
    // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
    // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
    featureStoreManager.ingestData(df, featureGroupArn)
    
    // To select the target stores for ingestion, you can specify the target store as the paramter
    // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
    featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore"))
    
    // If only OfflineStore is selected, the connector will batch write the data to offline store directly
    featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"])
    
    // To retrieve the records failed to be ingested by spark connector
    val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame()
  }
}
```

 **Enviar Tarefas do Spark** 

 **Scala** 

Você pode usar o Feature Store Spark como uma dependência normal. Nenhuma instrução extra é necessária para executar a aplicação em todas as plataformas. 

------