

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# ユーザースクリプトの作成
<a name="create-user-script"></a>

ユーザースクリプトには、エントリポイント関数 (つまり、ハンドラー) が含まれている必要があります。ユーザースクリプトファイルに任意の有効な Python ファイル名を付けることができます。

次の手順では、ユーザースクリプトを作成して PySpark 分析のコア機能を定義する方法について説明します。

**前提条件**
+ PySpark 1.0 (Python 3.11 および Spark 3.5.3 に対応)
+ Amazon S3 のデータセットは、定義した Spark セッションで設定されたテーブルの関連付けとしてのみ読み取ることができます。
+ コードが Amazon S3 と を直接呼び出すことはできません AWS Glue
+ コードがネットワークコールを実行できない

**ユーザースクリプトを作成するには**

1. 選択したテキストエディタまたは統合開発環境 (IDE) を開きます。

   Python ファイルをサポートする任意のテキストエディタまたは IDE (Visual Studio Code、PyCharm、Notepad\+\+ など) を使用できます。

1. 選択した名前で新しい Python ファイルを作成します (例: **my\_analysis.py**)。

1. コンテキストオブジェクトパラメータを受け入れるエントリポイント関数を定義します。

   ```
   def entrypoint(context)
   ```

   `context` オブジェクトパラメータは、基本的な Spark コンポーネント、参照テーブル、および分析パラメータへのアクセスを提供するディクショナリです。以下を含みます。

   を介した Spark セッションアクセス `context['sparkSession']`

   を介した参照テーブル `context['referencedTables']`

   による分析パラメータ `context['analysisParameters']` (テンプレートでパラメータが定義されている場合)

1. エントリポイント関数の結果を定義します。

   ```
   return results
   ```

   は、ファイル名の結果ディクショナリを含むオブジェクトを出力 DataFrame に返す`results`必要があります。
**注記**  
AWS Clean Rooms は、DataFrame オブジェクトを結果レシーバーの S3 バケットに自動的に書き込みます。

1. これで次の作業に進むことができます。

   1. このユーザースクリプトを S3 に保存します。詳細については、「[ユーザースクリプトと仮想環境を S3 に保存する](store-artifacts-in-s3.md)」を参照してください。

   1. ユーザースクリプトに必要な追加のライブラリをサポートするオプションの仮想環境を作成します。詳細については、「[仮想環境の作成 (オプション)](create-virtual-environment.md)」を参照してください。

**Example 例 1**  

```
# File name: my_analysis.py

def entrypoint(context):
    try:
        # Access Spark session
        spark = context['sparkSession']

        # Access input tables
        input_table1 = context['referencedTables']['table1_name']
        input_table2 = context['referencedTables']['table2_name']

        # Example data processing operations
        output_df1 = input_table1.select("column1", "column2")
        output_df2 = input_table2.join(input_table1, "join_key")
        output_df3 = input_table1.groupBy("category").count()
    
        # Return results - each key creates a separate output folder
        return {
            "results": {
                "output1": output_df1,        # Creates output1/ folder
                "output2": output_df2,        # Creates output2/ folder
                "analysis_summary": output_df3 # Creates analysis_summary/ folder
            }
        }
   
    except Exception as e:
        print(f"Error in main function: {str(e)}")
        raise e
```
この例のフォルダ構造は次のとおりです。  

```
analysis_results/
│
├── output1/ # Basic selected columns
│ ├── part-00000.parquet
│ └── _SUCCESS
│
├── output2/ # Joined data
│ ├── part-00000.parquet
│ └── _SUCCESS
│
└── analysis_summary/ # Aggregated results
├── part-00000.parquet
└── _SUCCESS
```

**Example 例 2**  

```
def entrypoint(context):
    try:
        # Get DataFrames from context
        emp_df = context['referencedTables']['employees']
        dept_df = context['referencedTables']['departments']

        # Apply Transformations
        emp_dept_df = emp_df.join(
            dept_df,
            on="dept_id",
            how="left"
        ).select(
            "emp_id",
            "name",
            "salary",
            "dept_name"
        )

        # Return Dataframes
        return {
            "results": {
                "outputTable": emp_dept_df
            }
        }

    except Exception as e:
        print(f"Error in entrypoint function: {str(e)}")
        raise e
```