

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 사용자 스크립트 생성
<a name="create-user-script"></a>

사용자 스크립트에는 진입점 함수(즉, 핸들러)가 포함되어야 합니다. 유효한 Python 파일 이름으로 사용자 스크립트 파일의 이름을 지정할 수 있습니다.

다음 절차에서는 사용자 스크립트를 생성하여 PySpark 분석의 핵심 기능을 정의하는 방법을 설명합니다.

**사전 조건**
+ PySpark 1.0(Python 3.11 및 Spark 3.5.3에 해당)
+ Amazon S3의 데이터 세트는 정의한 Spark 세션에서 구성된 테이블 연결로만 읽을 수 있습니다.
+ 코드가 Amazon S3 및를 직접 호출할 수 없습니다. AWS Glue
+ 코드가 네트워크 호출을 수행할 수 없음

**사용자 스크립트를 생성하려면**

1. 원하는 텍스트 편집기 또는 통합 개발 환경(IDE)을 엽니다.

   Python 파일을 지원하는 모든 텍스트 편집기 또는 IDE(예: Visual Studio Code, PyCharm 또는 Notepad\+\+)를 사용할 수 있습니다.

1. 선택한 이름으로 새 Python 파일을 생성합니다(예: **my\_analysis.py**).

1. 컨텍스트 객체 파라미터를 수락하는 진입점 함수를 정의합니다.

   ```
   def entrypoint(context)
   ```

   `context` 객체 파라미터는 필수 Spark 구성 요소, 참조 테이블 및 분석 파라미터에 대한 액세스를 제공하는 사전입니다. OTA 업데이트는 다음을 포함합니다.

   를 통한 Spark 세션 액세스 `context['sparkSession']`

   를 통해 참조된 테이블 `context['referencedTables']`

   를 통한 분석 파라미터`context['analysisParameters']`(템플릿에 파라미터가 정의된 경우)

1. 진입점 함수의 결과를 정의합니다.

   ```
   return results
   ```

   는 파일 이름의 결과 사전이 포함된 객체를 출력 DataFrame에 반환해야 `results` 합니다.
**참고**  
AWS Clean Rooms 는 결과 수신기의 S3 버킷에 DataFrame 객체를 자동으로 기록합니다.

1. 이제 다음에 대한 준비가 되었습니다.

   1. 이 사용자 스크립트를 S3에 저장합니다. 자세한 내용은 [S3에 사용자 스크립트 및 가상 환경 저장](store-artifacts-in-s3.md) 단원을 참조하십시오.

   1. 사용자 스크립트에 필요한 추가 라이브러리를 지원하도록 선택적 가상 환경을 생성합니다. 자세한 내용은 [가상 환경 생성(선택 사항)](create-virtual-environment.md) 단원을 참조하십시오.

**Example 예제 1.**  

```
# File name: my_analysis.py

def entrypoint(context):
    try:
        # Access Spark session
        spark = context['sparkSession']

        # Access input tables
        input_table1 = context['referencedTables']['table1_name']
        input_table2 = context['referencedTables']['table2_name']

        # Example data processing operations
        output_df1 = input_table1.select("column1", "column2")
        output_df2 = input_table2.join(input_table1, "join_key")
        output_df3 = input_table1.groupBy("category").count()
    
        # Return results - each key creates a separate output folder
        return {
            "results": {
                "output1": output_df1,        # Creates output1/ folder
                "output2": output_df2,        # Creates output2/ folder
                "analysis_summary": output_df3 # Creates analysis_summary/ folder
            }
        }
   
    except Exception as e:
        print(f"Error in main function: {str(e)}")
        raise e
```
이 예제의 폴더 구조는 다음과 같습니다.  

```
analysis_results/
│
├── output1/ # Basic selected columns
│ ├── part-00000.parquet
│ └── _SUCCESS
│
├── output2/ # Joined data
│ ├── part-00000.parquet
│ └── _SUCCESS
│
└── analysis_summary/ # Aggregated results
├── part-00000.parquet
└── _SUCCESS
```

**Example 예제 2.**  

```
def entrypoint(context):
    try:
        # Get DataFrames from context
        emp_df = context['referencedTables']['employees']
        dept_df = context['referencedTables']['departments']

        # Apply Transformations
        emp_dept_df = emp_df.join(
            dept_df,
            on="dept_id",
            how="left"
        ).select(
            "emp_id",
            "name",
            "salary",
            "dept_name"
        )

        # Return Dataframes
        return {
            "results": {
                "outputTable": emp_dept_df
            }
        }

    except Exception as e:
        print(f"Error in entrypoint function: {str(e)}")
        raise e
```