Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Crear un script de usuario
El script de usuario debe contener una función de punto de entrada (en otras palabras, un controlador). Puede asignar un nombre a su archivo de script de usuario con cualquier nombre de archivo de Python válido.
El siguiente procedimiento describe cómo crear un script de usuario para definir la funcionalidad principal del PySpark análisis.
Requisitos previos
-
PySpark 1.0 (corresponde a Python 3.9 y Python 3.11 y Spark 3.5.2)
-
Los conjuntos de datos de Amazon S3 solo se pueden leer como asociaciones de tablas configuradas en la sesión de Spark que defina.
-
Su código no puede llamar directamente a Amazon S3 y AWS Glue
-
Su código no puede realizar llamadas de red
Para crear un script de usuario
-
Abra el editor de texto o el entorno de desarrollo integrado (IDE) de su elección.
Puede usar cualquier editor de texto o IDE (como Visual Studio Code o Notepad++) que admita archivos de Python. PyCharm
-
Cree un nuevo archivo de Python con el nombre que desee (por ejemplo,
my_analysis.py
). -
Defina una función de punto de entrada que acepte un parámetro de objeto de contexto.
def entrypoint(context)
El parámetro
context
objeto es un diccionario que proporciona acceso a los componentes esenciales de Spark y a las tablas referenciadas. Contiene el acceso a la sesión de Spark para ejecutar las operaciones de Spark y las tablas a las que se hace referencia:El acceso a las sesiones de Spark está disponible a través de
context['sparkSession']
Las tablas de referencia están disponibles en
context['referencedTables']
-
Defina los resultados de la función de punto de entrada:
return results
results
Debe devolver un objeto que contenga un diccionario de resultados con nombres de archivos a una salida. DataFramenota
AWS Clean Rooms escribe automáticamente los DataFrame objetos en el depósito S3 del receptor de resultados.
-
Ya puede hacer lo siguiente:
-
Guarde este script de usuario en S3. Para obtener más información, consulte Almacenamiento de un script de usuario y un entorno virtual en S3.
-
Cree el entorno virtual opcional para admitir cualquier biblioteca adicional que necesite su script de usuario. Para obtener más información, consulte Crear un entorno virtual (opcional).
-
ejemplo Ejemplo 1
# File name: my_analysis.py def entrypoint(context): try: # Access Spark session spark = context['sparkSession'] # Access input tables input_table1 = context['referencedTables']['table1_name'] input_table2 = context['referencedTables']['table2_name'] # Example data processing operations output_df1 = input_table1.select("column1", "column2") output_df2 = input_table2.join(input_table1, "join_key") output_df3 = input_table1.groupBy("category").count() # Return results - each key creates a separate output folder return { "results": { "output1": output_df1, # Creates output1/ folder "output2": output_df2, # Creates output2/ folder "analysis_summary": output_df3 # Creates analysis_summary/ folder } } except Exception as e: print(f"Error in main function: {str(e)}") raise e
La estructura de carpetas de este ejemplo es la siguiente:
analysis_results/ │ ├── output1/ # Basic selected columns │ ├── part-00000.parquet │ └── _SUCCESS │ ├── output2/ # Joined data │ ├── part-00000.parquet │ └── _SUCCESS │ └── analysis_summary/ # Aggregated results ├── part-00000.parquet └── _SUCCESS
ejemplo Ejemplo 2
def entrypoint(context): try: # Get DataFrames from context emp_df = context['referencedTables']['employees'] dept_df = context['referencedTables']['departments'] # Apply Transformations emp_dept_df = emp_df.join( dept_df, on="dept_id", how="left" ).select( "emp_id", "name", "salary", "dept_name" ) # Return Dataframes return { "results": { "outputTable": emp_dept_df } } except Exception as e: print(f"Error in entrypoint function: {str(e)}") raise e