

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 功能儲存功能處理器 SDK
<a name="feature-store-feature-processor-sdk"></a>

透過使用`@feature_processor`裝飾器裝飾轉換函式來宣告特徵商店特徵處理器定義。SageMaker AI SDK for Python (Boto3) 會自動從設定的輸入資料來源載入資料、套用裝飾的轉換函式，然後將轉換後的資料擷取到目標特徵群組。裝飾的轉換函式必須符合`@feature_processor`裝飾器的預期簽名。如需有關 `@feature_processor` 裝飾器的詳細資訊，請參閱 Amazon SageMaker Feature Store中的 [@feature\_processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator)閱讀文件。

使用 `@feature_processor` 裝飾器，您的轉換函式在 Spark 執行期環境中執行，其中提供給函式及其傳回值的輸入引數是 Spark 數 DataFrames。轉換函式中的輸入參數數目必須與`@feature_processor`裝飾器中配置的輸入數目相符。

如需有關 `@feature_processor` 裝飾器的詳細資訊，請參閱 [Python 的特徵處理器特徵商店 SDK (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor)。

下面的代碼是有關如何使用`@feature_processor`裝飾器的基本範例。如需更具體的使用案例範例，請參閱[常見使用案例的特徵處理程式碼範例](feature-store-feature-processor-examples.md)。

您可以使用下列指令從 SageMaker SDK 及其附加功能安裝功能處理器 SDK。

```
pip install sagemaker[feature-processor]
```

在下列範例中，`{{us-east-1}}`是資源的區域、`{{111122223333}}`資源擁有者帳戶 ID，`{{your-feature-group-name}}`是特徵群組名稱。

以下是基本特徵處理器定義，其中`@feature_processor`裝飾器會將 Amazon S3 的 CSV 輸入設定為載入並提供給轉換函式 (例如`transform`)，並準備擷取至特徵群組。最後一行執行它。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://{{your-bucket}}/{{prefix-to-csv}}/')
OUTPUT_FG = 'arn:aws:sagemaker:{{us-east-1}}:{{111122223333}}:feature-group/{{your-feature-group-name}}'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`@feature_processor`參數包括：
+ `inputs` (List[str])：Feature Store 特徵處理器中使用的資料來源清單。如果您的資料來源是功能群組或存放在 Amazon S3 中，您可以使用功能存放區為功能處理器提供的資料來源定義。如需特徵商店提供的資料來源定義的完整清單，請參閱 [Amazon SageMaker Feature Store 中的特徵處理器資料來源](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source)閱讀文件。
+ `output`(str)：特徵群組的 ARN，以擷取裝飾函式的輸出。
+ `target_stores` (Optinal[List[str]])：要擷取至輸出的儲存清單 (例如，`OnlineStore`或`OfflineStore`)。如果未指定，則會將資料擷取到所有輸出功能群組的已啟用存放區。
+ `parameters` (Dict[str, Any])：要提供給您的轉換函式的字典。
+ `enable_ingestion` (bool)：指出轉換函式的輸出是否已擷取至輸出特徵群組的旗標。此標誌在開發階段非常有用。如果未指定，則會啟用擷取。

可選的包裝函式參數 (如果在函式簽名中提供，則作為引數提供) 包括：
+ `params` (Dict[str, Any])：在`@feature_processor`參數中定義的字典。它還包含系統設定的參數，這些參數可以與鍵 `system` 一起參考，例如`scheduled_time`參數。
+ `spark`(SparkSession)：對 Spark 應用程式初始化的 SparkSession 執行個體的引用。

以下程式碼是使用 `params` 和 `spark` 參數的範例。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://{{your-bucket}}/{{prefix-to-csv}}/')
OUTPUT_FG = 'arn:aws:sagemaker:{{us-east-1}}:{{111122223333}}:feature-group/{{your-feature-group-name}}' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

`scheduled_time`系統參數 (在函式的`params`引數中提供) 是支援重試每次執行的重要值。此值有助於唯一識別特徵處理器的執行，並可作為以日期為基礎的輸入的參考點 (例如，僅載入最近 24 小時的資料)，以確保輸入範圍與程式碼的實際執行時間無關。如果特徵處理器按照排程執行 (請參閱[以排程和事件為基礎執行特徵處理器管道](feature-store-feature-processor-schedule-pipeline.md))，則其值會固定為排定執行的時間。在同步執行期間，可以使用 SDK 的執行 API 覆寫引數，以支援資料回填或重新執行遺漏的過去執行等使用案例。如果功能處理器以任何其他方式執行，則其值是目前的時間。

如需撰寫 Spark 程式碼的相關資訊，請參閱 [Spark SQL 程式設計指南](https://spark.apache.org/docs/latest/sql-programming-guide.html)。

如需常見使用案例的更多程式碼範例，請參閱[常見使用案例的特徵處理程式碼範例](feature-store-feature-processor-examples.md)。

請注意，用裝飾的轉換函式`@feature_processor`不會返回值。要以編程方式測試您的函式，您可以刪除或猴子修補`@feature_processor`裝飾器，使其充當包裝函式的傳遞。如需有關 `@feature_processor` 裝飾器的詳細資訊，請參閱 [Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html)。