

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Criar um pipeline com funções decoradas por `@step`
<a name="pipelines-step-decorator-create-pipeline"></a>

Você pode criar um pipeline convertendo funções do Python em etapas de pipeline usando o decorador `@step`, criando dependências entre essas funções para criar um gráfico de pipeline (ou gráfico acíclico direcionado (DAG)) e passando os nós da folha desse gráfico como uma lista de etapas para o pipeline. As seções a seguir explicam esse procedimento detalhadamente com exemplos.

**Topics**
+ [Converter uma função em uma etapa](#pipelines-step-decorator-run-pipeline-convert)
+ [Crie dependências entre as etapas](#pipelines-step-decorator-run-pipeline-link)
+ [Use `ConditionStep` com etapas decoradas por `@step`](#pipelines-step-decorator-condition)
+ [Definir um pipeline usando a saída `DelayedReturn` das etapas](#pipelines-step-define-delayed)
+ [Criar um pipeline](#pipelines-step-decorator-pipeline-create)

## Converter uma função em uma etapa
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

Para criar uma etapa usando o decorador `@step`, anote a função com `@step`. O exemplo a seguir mostra uma função decorada por `@step` que pré-processa os dados.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

Quando você invoca uma função `@step` -decorada, a SageMaker IA retorna uma `DelayedReturn` instância em vez de executar a função. Uma instância `DelayedReturn` é um proxy para o retorno real da função. A instância `DelayedReturn` pode ser passada para outra função como argumento ou diretamente para uma instância do pipeline como uma etapa. Para obter informações sobre a `DelayedReturn` classe, consulte [sagemaker.workflow.function\_step. DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn).

## Crie dependências entre as etapas
<a name="pipelines-step-decorator-run-pipeline-link"></a>

Ao criar uma dependência entre duas etapas, você cria uma conexão entre as etapas no gráfico do pipeline. As seções a seguir apresentam várias maneiras de criar uma dependência entre as etapas do pipeline.

### Dependências de dados por meio de argumentos de entrada
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

Passar a saída `DelayedReturn` de uma função como entrada para outra função cria automaticamente uma dependência de dados no DAG do pipeline. No exemplo a seguir, passar a saída `DelayedReturn` da função `preprocess` para a função `train` cria uma dependência entre `preprocess` e `train`.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

O exemplo anterior define uma função de treinamento que é decorada com`@step`. Quando essa função é invocada, ela recebe a saída `DelayedReturn` da etapa do pipeline de pré-processamento como entrada. A invocação da função de treinamento retorna outra instância `DelayedReturn`. Essa instância contém as informações sobre todas as etapas anteriores definidas nessa função (ou seja, a etapa `preprocess` no exemplo) que formam o DAG do pipeline.

No exemplo anterior, a função `preprocess` retorna um único valor. Para tipos de retorno mais complexos, como listas ou tuplas, consulte [Limitações](pipelines-step-decorator-limit.md).

### Definir dependências personalizadas
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

No exemplo anterior, a função `train` recebeu a saída `DelayedReturn` de `preprocess` e criou uma dependência. Se você quiser definir a dependência explicitamente sem passar a saída da etapa anterior, use a função `add_depends_on` com a etapa. Você pode usar a função `get_step()` para recuperar a etapa subjacente de sua instância `DelayedReturn` e, em seguida, chamar `add_depends_on`\_on com a dependência como entrada. Para ver a definição da função `get_step()`, consulte [sagemaker.workflow.step\_outputs.get\_step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step). O exemplo a seguir mostra como criar uma dependência entre `preprocess` e `train` usando `get_step()` e `add_depends_on()`.

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### Transmita dados de e para uma função decorada por `@step` para uma etapa tradicional de pipeline
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

Você pode criar um pipeline que inclua uma etapa decorada por `@step` e uma etapa tradicional de pipeline para transmitir dados entre elas. Por exemplo, você pode usar `ProcessingStep` para processar os dados e passar o resultado para a função de treinamento decorada por `@step`. No exemplo a seguir, uma etapa de treinamento decorada por `@step` faz referência à saída de uma etapa de processamento.

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source={{input_data}}, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## Use `ConditionStep` com etapas decoradas por `@step`
<a name="pipelines-step-decorator-condition"></a>

Os pipelines oferecem apoio a uma classe `ConditionStep` que avalia os resultados das etapas anteriores para decidir qual ação tomar no pipeline. Você também pode usar `ConditionStep` com uma etapa decorada por `@step`. Para usar a saída de qualquer etapa decorada por `@step` com `ConditionStep`, insira a saída dessa etapa como argumento para `ConditionStep`. No exemplo a seguir, a etapa de condição recebe a saída da etapa de avaliação de modelo decorada por `@step`.

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## Definir um pipeline usando a saída `DelayedReturn` das etapas
<a name="pipelines-step-define-delayed"></a>

Você define um pipeline da mesma forma, independentemente de usar ou não um decorador `@step`. Ao passar uma instância `DelayedReturn` para seu pipeline, você não precisa passar uma lista completa de etapas para criar o pipeline. O SDK infere automaticamente as etapas anteriores com base nas dependências que você define. Todas as etapas anteriores dos objetos `Step` que você passou para o pipeline ou objetos `DelayedReturn` são incluídas no gráfico do pipeline. No exemplo a seguir, o pipeline recebe o objeto `DelayedReturn` da função `train`. SageMaker A IA adiciona a `preprocess` etapa, como etapa anterior`train`, ao gráfico do pipeline.

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="{{<pipeline-name>}}",
    steps=[step_train_result],
    sagemaker_session={{<sagemaker-session>}},
)
```

Se não houver dados ou dependências personalizadas entre as etapas e você executar várias etapas em paralelo, o gráfico do pipeline terá mais de um nó de folha. Passe todos esses nós de folha em uma lista para o argumento `steps` em sua definição de pipeline, conforme mostrado no seguinte exemplo:

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="{{<pipeline-name>}}",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session={{sagemaker-session}},
)
```

Quando o pipeline é executado, as duas etapas são executadas paralelamente.

Você só passa os nós de folha do gráfico para o pipeline porque os nós de folha contêm informações sobre todas as etapas anteriores definidas por meio de dados ou dependências personalizadas. Ao compilar o pipeline, a SageMaker IA também infere todas as etapas subsequentes que formam o gráfico do pipeline e adiciona cada uma delas como uma etapa separada ao pipeline.

## Criar um pipeline
<a name="pipelines-step-decorator-pipeline-create"></a>

Crie um pipeline chamando `pipeline.create()`, como mostrado no trecho a seguir. Para obter detalhes sobre `create()`, consulte [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create).

```
role = "{{pipeline-role}}"
pipeline.create(role)
```

Quando você liga`pipeline.create()`, a SageMaker IA compila todas as etapas definidas como parte da instância do pipeline. SageMaker A IA carrega a função serializada, os argumentos e todos os outros artefatos relacionados à etapa para o Amazon S3.

Os dados residem no bucket do S3 de acordo com a seguinte estrutura:

```
s3_root_uri/
    {{pipeline_name}}/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        {{step_name}}/
            {{timestamp}}/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        {{execution_id}}/
            {{step_name}}/
                results     # returned output from the serialized function including the model
```

`s3_root_uri`é definido no arquivo de configuração do SageMaker AI e se aplica a todo o pipeline. Se indefinido, o bucket de SageMaker IA padrão será usado.

**nota**  
Toda vez que a SageMaker IA compila um pipeline, a SageMaker IA salva as funções, argumentos e dependências serializados das etapas em uma pasta com a data e hora atual. Isso ocorre toda vez que você executa `pipeline.create()`, `pipeline.update()`, `pipeline.upsert()` ou `pipeline.definition()`.