

# Using Amazon SageMaker Unified Studio Library for Python


The Amazon SageMaker Unified Studio library is an open source library for interacting with Amazon SageMaker Unified Studio resources. With this library, you can access resources such as domains, projects, connections, and databases, all in one place with minimal code. The following examples demonstrate how to use the library in local and remote sessions.

**Note**  
For IAM-based domains, the Amazon SageMaker Unified Studio Library for Python is supported only when using Space Distribution Image version 2.11\$1 or 3.6\$1 in JupyterLab Notebooks or the Code Editor. Earlier versions (e.g., 2.9 or 3.2) do not support the Amazon SageMaker Unified Studio Library for Python.

# Using ClientConfig


If using `ClientConfig` for supplying credentials or changing the AWS region name, the `ClientConfig` object will need to be supplied when initializing any further Amazon SageMaker Studio objects, such as `Domain` or `Project`. If using non-prod endpoint for an AWS service, it can also be supplied in the `ClientConfig`. Note: In sagemaker space, datazone endpoint is by default fetched from the metadata JSON file.

```
from sagemaker_studio import ClientConfig, Project
conf = ClientConfig(region="eu-west-1")
proj = Project(config=conf)
```

# Domain


`Domain` can be initialized using the following command.

```
from sagemaker_studio import Domain
dom = Domain()
```

If you are not using the Amazon SageMaker Studio within Amazon SageMaker Unified Studio JupyterLab IDE, you will need to provide the ID of the domain you want to use.

```
dom = Domain(id="123456")
```

## Domain Properties


A `Domain` object has several string properties that can provide information about the domain that you are using.

```
dom.id
dom.root_domain_unit_id
dom.name
dom.domain_execution_role
dom.status
dom.portal_url
```

# Project


`Project` can be initialized using the following command.

```
from sagemaker_studio import Project
proj = Project()
```

If you are not using the Amazon SageMaker Studio library within the Amazon SageMaker Unified Studio JupyterLab IDE, you will need to provide either the ID or name of the project you would like to use and the domain ID of the project.

```
proj = Project(name="my_proj_name", domain_id="123456")
```

## Project properties


A `Project` object has several string properties that can provide information about the project that you are using.

```
proj.id
proj.name
proj.domain_id,
proj.project_status,
proj.domain_unit_id,
proj.project_profile_id
proj.user_id
```

### IAM Role ARN


To retrieve the project IAM role ARN, you can retrieve the `iam_role` field. This gets the IAM role ARN of the default IAM connection within your project.

```
proj.iam_role
```

### AWS KMS Key ARN


If you are using a AWS KMS key within your project, you can retrieve the `kms_key_arn` field.

```
proj.kms_key_arn
```

# S3 Path


One of the properties of a `Project` is `s3`. You can access various S3 paths that exist within your project.

```
# S3 path of project root directory
proj.s3.root
# S3 path of datalake consumer Glue DB directory (requires DataLake environment)
proj.s3.datalake_consumer_glue_db
# S3 path of Athena workgroup directory (requires DataLake environment)
proj.s3.datalake_athena_workgroup
# S3 path of workflows output directory (requires Workflows environment)
proj.s3.workflow_output_directory
# S3 path of workflows temp storage directory (requires Workflows environment)
proj.s3.workflow_temp_storage
# S3 path of EMR EC2 log destination directory (requires EMR EC2 environment)
proj.s3.emr_ec2_log_destination
# S3 path of EMR EC2 log bootstrap directory (requires EMR EC2 environment)
proj.s3.emr_ec2_certificates
# S3 path of EMR EC2 log bootstrap directory (requires EMR EC2 environment)
proj.s3.emr_ec2_log_bootstrap
```

## Other Environment S3 Paths


You can also access the S3 path of a different environment by providing an environment ID.

```
proj.s3.environment_path(environment_id="env_1234")
```

# Connections


You can retrieve a list of connections for a project, or you can retrieve a single connection by providing its name.

```
proj_connections: List[Connection] = proj.connections
proj_redshift_conn = proj.connection("my_redshift_connection_name")
```

Each `Connection` object has several properties that can provide information about the connection.

```
proj_redshift_conn.name
proj_redshift_conn.id
proj_redshift_conn.physical_endpoints[0].host
proj_redshift_conn.iam_role
```

# Retrieving AWS client with SDK for Python (Boto3)
AWS clients

You can retrieve an SDK for Python (Boto3) AWS client initialized with the connection's credentials.

**Example**  
The following example shows how to create a Redshift client using create\$1client() from Redshift connection.  

```
redshift_connection: Connection = proj.connection("project.redshift")
redshift_client = redshift_connection.create_client()
```

Some connections are directly associated with an AWS service, and will default to using that AWS service's client if no service name is specified. Those connections are listed in the following table.


| Connection Type | AWS Service Name | 
| --- | --- | 
| ATHENA | athena | 
| DYNAMODB | dynamodb | 
| REDSHIFT | redshift | 
| S3 | s3 | 
| S3\$1FOLDER | s3 | 

For other connection types, you must specify an AWS service name.

**Example**  
See the following example for details.  

```
iam_connection: Connection = proj.connection("project.iam")
glue_client = iam_connection.create_client("glue")
```

# Connection data


To retrieve all properties of a `Connection`, you can access the `data` field to get a `ConnectionData` object. `ConnectionData` fields can be accessed using the dot notation (e.g. `conn_data.top_level_field`). For retrieving further nested data within `ConnectionData`, you can access it as a dictionary. For example: `conn_data.top_level_field["nested_field"]`.

```
conn_data: ConnectionData = proj_redshift_conn.data
red_temp_dir = conn_data.redshiftTempDir
lineage_sync = conn_data.lineageSync
lineage_job_id = lineage_sync["lineageJobId"]
spark_conn = proj.connection("my_spark_glue_connection_name")
id = spark_conn.id
env_id = spark_conn.environment_id
glue_conn = spark_conn.data.glue_connection_name
workers = spark_conn.data.number_of_workers
glue_version = spark_conn.data.glue_version
# Fetching tracking server ARN and tracking server name from an MLFlow connection
ml_flow_conn = proj.connection('<my_ml_flow_connection_name>')
tracking_server_arn = ml_flow_conn.data.tracking_server_arn
tracking_server_name = ml_flow_conn.data.tracking_server_name
```

# Secrets


Retrieve the secret (username, password, other connection-related metadata) for the connection using the following property.

```
snowflake_connection: Connection = proj.connection("project.snowflake")
secret = snowflake_connection.secret
```

Secrets can be a dictionary containing credentials or a single string depending on the connection type.

# Catalogs, databases, and tables


If your `Connection` is of the `LAKEHOUSE` or `IAM` type, you can retrieve catalogs, databases, and tables within a project.

## Catalogs


If your Connection is of the `LAKEHOUSE` or `IAM` type, you can retrieve a list of catalogs, or a single catalog by providing its id.

```
conn_catalogs: List[Catalog] = proj.connection().catalogs
my_default_catalog: Catalog = proj.connection().catalog()
my_catalog: Catalog = proj.connection().catalog("1234567890:catalog1/sub_catalog")
proj.connection("<lakehouse_connection_name>").catalogs
```

Each `Catalog` object has several properties that can provide information about the catalog.

```
my_catalog.name
my_catalog.id
my_catalog.type
my_catalog.spark_catalog_name
my_catalog.resource_arn
```

## Databases


You can retrieve a list of databases or a single database within a catalog by providing its name.

```
my_catalog: Catalog
catalog_dbs: List[Database] = my_catalog.databases
my_db: Database = my_catalog.database("my_db")
```

Each `Database` object has several properties that can provide information about the database.

```
my_db.name
my_db.catalog_id
my_db.location_uri
my_db.project_id
my_db.domain_id
```

## Tables


You can also retrieve a list of tables or a specific table within a `Database`.

```
my_db_tables: List[Table] = my_db.tables
my_table: Table = my_db.table("my_table")
```

Each `Table` object has several properties that can provide information about the table.

```
my_table.name
my_table.database_name
my_table.catalog_id
my_table.location
```

You can also retrieve a list of the columns within a table. `Column` contains the column name and the data type of the column.

```
my_table_columns: List[Column] = my_table.columns
col_0: Column = my_table_columns[0]
col_0.name
col_0.type
```

# Utility Methods


The Amazon SageMaker Unified Studio SDK provides utility modules for common data operations including SQL execution, DataFrame operations, and Spark session management.

# SQL Utilities


The SQL utilities module provides a simple interface for executing SQL queries against various database engines within Amazon SageMaker Unified Studio. When no connection is specified, queries are executed locally using DuckDB.

## Supported Database Engines


The following database engines are supported:
+ Amazon Athena
+ Amazon Redshift
+ MySQL
+ PostgreSQL
+ Snowflake
+ Google BigQuery
+ Amazon DynamoDB
+ Microsoft SQL Server
+ DuckDB (default when no connection specified)

## Basic Usage


Import the SQL utilities:

```
from sagemaker_studio import sqlutils
```

### Execute SQL with DuckDB (No Connection)


When no connection is specified, queries are executed locally using DuckDB:

```
# Simple SELECT query
result = sqlutils.sql("SELECT 1 as test_column")
result

# Query with literal values
result = sqlutils.sql("SELECT * FROM table WHERE id = 123")
```

### Execute SQL with Project Connections


Use existing project connections by specifying either connection name or ID:

```
# Using connection name
result = sqlutils.sql(
    "SELECT * FROM my_table",
    connection_name="my_athena_connection"
)

# Using connection ID
result = sqlutils.sql(
    "SELECT * FROM my_table",
    connection_id="conn_12345"
)
```

## Examples by Database Engine


### Amazon Athena


```
# Query Athena using project connection with parameters
result = sqlutils.sql(
    """
    SELECT customer_id, order_date, total_amount
    FROM orders
    WHERE order_date >= :start_date
    """,
    parameters={"start_date": "2024-01-01"},
    connection_name="project.athena"
)

# Create external table in Athena
sqlutils.sql(
    """
    CREATE EXTERNAL TABLE sales_data (
        customer_id bigint,
        order_date date,
        amount decimal(10,2)
    )
    LOCATION 's3://my-bucket/sales-data/'
    """,
    connection_name="project.athena"
)

# Insert data using Create Table As Select (CTAS)
sqlutils.sql(
    """
    CREATE TABLE monthly_sales AS
    SELECT
        DATE_TRUNC('month', order_date) as month,
        SUM(amount) as total_sales
    FROM sales_data
    GROUP BY DATE_TRUNC('month', order_date)
    """,
    connection_name="project.athena"
)
```

### Amazon Redshift


```
# Query Redshift with parameters
result = sqlutils.sql(
    """
    SELECT product_name, category, price
    FROM products
    WHERE category = :category
    AND price > :min_price
    """,
    parameters={"category": "Electronics", "min_price": 100},
    connection_name="project.redshift"
)

# Create table in Redshift
sqlutils.sql(
    """
    CREATE TABLE customer_summary (
        customer_id INTEGER PRIMARY KEY,
        total_orders INTEGER,
        total_spent DECIMAL(10,2),
        last_order_date DATE
    )
    """,
    connection_name="project.redshift"
)

# Insert aggregated data
sqlutils.sql(
    """
    INSERT INTO customer_summary
    SELECT
        customer_id,
        COUNT(*) as total_orders,
        SUM(amount) as total_spent,
        MAX(order_date) as last_order_date
    FROM orders
    GROUP BY customer_id
    """,
    connection_name="project.redshift"
)

# Update existing records
sqlutils.sql(
    """
    UPDATE products
    SET price = price * 1.1
    WHERE category = 'Electronics'
    """,
    connection_name="project.redshift"
)
```

## Advanced Usage


### Working with DataFrames


The sql function returns pandas DataFrames for SELECT queries, and row counts for DML operations:

```
import pandas as pd

# Execute query and get DataFrame
df = sqlutils.sql("SELECT * FROM sales_data", connection_name="redshift_conn")

# Use pandas operations
summary = df.groupby('region')['sales'].sum()
print(summary)

# Save to file
df.to_csv('sales_report.csv', index=False)

# DML operations return row counts
rows_affected = sqlutils.sql(
    "UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 123",
    connection_name="redshift_conn"
)
print(f"Updated {rows_affected} inventory records")
```

### Parameterized Queries


Use parameters to safely pass values to queries:

```
# Dictionary parameters (recommended)
result = sqlutils.sql(
    "SELECT * FROM orders WHERE customer_id = :customer_id AND status = :status",
    parameters={"customer_id": 12345, "status": "completed"},
    connection_name="redshift_connection"
)

# Athena with named parameters
result = sqlutils.sql(
    "SELECT * FROM products WHERE category = :category AND price > :min_price",
    parameters={"category": "Electronics", "min_price": 100},
    connection_name="athena_connection"
)
```

### Getting Database Engine


You can also get the underlying SQLAlchemy engine for advanced operations:

```
# Get engine for a connection
engine = sqlutils.get_engine(connection_name="redshift_connection")

# Use engine directly with pandas
import pandas as pd

df = pd.read_sql("SELECT * FROM large_table LIMIT 1000", engine)
```

### DuckDB Features


When using DuckDB (no connection specified), you get additional capabilities:

#### Python Integration


```
# DuckDB can access Python variables directly
import pandas as pd

my_df = pd.DataFrame({'id': [1, 2, 3], 'name': ['A', 'B', 'C']})

result = sqlutils.sql("SELECT * FROM my_df WHERE id > 1")
```

### Notes

+ All queries return pandas DataFrames for easy data manipulation
+ DuckDB is automatically configured with Amazon S3 credentials from the environment
+ Connection credentials are managed through Amazon SageMaker Unified Studio project connections
+ The module handles connection pooling and cleanup automatically

# DataFrame Utilities


Read from and write to catalog tables using pandas DataFrames with automatic format detection and database management.

Supported catalog types:
+ AwsDataCatalog
+ S3CatalogTables

## Basic Usage


Import the DataFrame utilities:

```
from sagemaker_studio import dataframeutils
```

## Reading from Catalog Tables


Required Inputs:
+ database (str): Database name within the catalog
+ table (str): Table name

Optional Parameters:
+ catalog (str): Catalog identifier (defaults to AwsDataCatalog if not specified)
+ format (str): Data format - auto-detects from table metadata, falls back to parquet
+ \$1\$1kwargs: Additional arguments
  + for AwsDataCatalog, kwargs can be columns, chunked, etc
  + for S3Tables, kwargs can be limit, row\$1filter, selected\$1fields, etc

```
import pandas as pd

# Read from AwsDataCatalog
df = pd.read_catalog_table(
    database="my_database",
    table="my_table"
)

# Read from S3 Tables
df = pd.read_catalog_table(
   database="my_database",
   table="my_table",
   catalog="s3tablescatalog/my_s3_tables_catalog",
)
```

### Usage with optional parameters


```
import pandas as pd

# Read from AwsDataCatalog by explicitly specifying catalogID and format
df = pd.read_catalog_table(
    database="my_database",
    table="my_table",
    catalog="123456789012",
    format="parquet"
)

# Read from AwsDataCatalog by explicitly specifying catalogID, format, and additional args -> columns
df = pd.read_catalog_table(
    database="my_database",
    table="my_table",
    catalog="123456789012",
    format="parquet",
    columns=['<column_name_1>, <column_name_2>']
)

# Read from S3 Tables with additional args -> limit
df = pd.read_catalog_table(
   database="my_database",
   table="my_table",
   catalog="s3tablescatalog/my_s3_tables_catalog",
   limit=500
)

# Read from S3 Tables with additional args -> selected_fields
df = pd.read_catalog_table(
   database="my_database",
   table="my_table",
   catalog="s3tablescatalog/my_s3_tables_catalog",
   selected_fields=['<field_name_1>, <field_name_2>']
)
```

## Writing to Catalog Tables


Required Inputs:
+ database (str): Database name within the catalog
+ table (str): Table name

Optional Parameters:
+ catalog (str): Catalog identifier (defaults to AwsDataCatalog if not specified)
+ format (str): Data format used for AwsDataCatalog (default: parquet)
+ path (str): Custom Amazon S3 path for writing to AwsDataCatalog (auto-determined if not provided)
+ \$1\$1kwargs: Additional arguments

Path Resolution Priority - Amazon S3 path is determined in this order:
+ User-provided path parameter
+ Existing database location \$1 table name
+ Existing table location
+ Project default Amazon S3 location

```
import pandas as pd

# Create sample DataFrame
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [10.5, 20.3, 15.7]
})

# Write to AwsDataCatalog
df.to_catalog_table(
    database="my_database",
    table="my_table"
)

# Write to S3 Table Catalog
df.to_catalog_table(
    database="my_database",
    table="my_table",
    catalog="s3tablescatalog/my_s3_tables_catalog"
)
```

### Writing to Catalog Tables


```
# Write to AwsDataCatalog with csv format
df.to_catalog_table(
    database="my_database",
    table="my_table",
    format="csv"
)

# Write to AwsDataCatalog at user specified s3 path
df.to_catalog_table(
    database="my_database",
    table="my_table",
    path="s3://my-bucket/custom/path/"
)

# Write to AwsDataCatalog with additional argument -> compression
df.to_catalog_table(
    database="my_database",
    table="my_table",
    compression='gzip'
)
```

# Spark Utilities


The Spark utilities module provides a simple interface for working with Spark Connect sessions and managing Spark configurations for various data sources within Amazon SageMaker Unified Studio. When no connection is specified, a Spark Connect session is created using the default Amazon Athena Spark connection.

## Basic Usage


Import the Spark utilities:

```
from sagemaker_studio import sparkutils
```

## Initialize Spark Session


Supported connection types:
+ Spark connect

Optional Parameters:
+ connection\$1name (str): Name of the connection to execute query against (e.g., "my\$1redshift\$1connection")

When no connection is specified, a default Amazon Athena Spark session is created:

```
# Default session
spark = sparkutils.init()

# Session with specific connection
spark = sparkutils.init(connection_name="my_spark_connection")
```

## Working with Spark Options


Supported connection types:
+ Amazon DocumentDB
+ Amazon DynamoDB
+ Amazon Redshift
+ Aurora MySQL
+ Aurora PostgreSQL
+ Azure SQL
+ Google BigQuery
+ Microsoft SQL Server
+ MySQL
+ PostgreSQL
+ Oracle
+ Snowflake

Required Inputs:
+ connection\$1name (str): Name of the connection to get Spark options for (e.g., "my\$1redshift\$1connection")

Get formatted Spark options for connecting to data sources:

```
# Get options for Redshift connection
options = sparkutils.get_spark_options("my_redshift_connection")
```

## Examples by Operation Type


### Reading and Writing Data


```
# Create sample DataFrame
df_to_write = spark.createDataFrame(
    [(1, "Alice"), (2, "Bob")],
    ["id", "name"]
)

# Get spark options for Redshift connection
spark_options = sparkutils.get_spark_options("my_redshift_connection")

# Write DataFrame using JDBC
df_to_write.write \
    .format("jdbc") \
    .options(**spark_options) \
    .option("dbtable", "sample_table") \
    .save()

# Read DataFrame using JDBC
df_to_read = spark.read \
    .format('jdbc') \
    .options(**spark_options) \
    .option('dbtable', 'sample_table') \
    .load()

# Display results
df_to_read.show()
```

## Notes

+ Spark sessions are automatically configured for Amazon Athena spark compute
+ Connection credentials are managed through Amazon SageMaker Unified Studio project connections
+ The module handles session management and cleanup automatically
+ Spark options are formatted appropriately for each supported data source
+ When get\$1spark\$1options is used in EMR-S or EMR-on-EC2 compute, and the connection has EnforceSSL enabled, the formatted spark options will not have the sslrootcert value and hence that would need to be passed explicitly.

# Execution APIs


Execution APIs provide you the ability to start an execution to run a notebook headlessly within the same user space or on remote compute.

## Local Execution APIs


Use the following APIs to start, stop, get, or list executions within the user's space.

### StartExecution


You can start a notebook execution headlessly within the same user space.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(overrides={
            "execution": {
                "local": True,
            }
        })
sagemaker_studio_api = SageMakerStudioAPI(config)

result = sagemaker_studio_api.execution_client.start_execution(
    execution_name="my-execution",
    input_config={"notebook_config": {
        "input_path": "src/folder2/test.ipynb"}},
    execution_type="NOTEBOOK",
    output_config={"notebook_config": {
        "output_formats": ["NOTEBOOK", "HTML"]
    }}
)
print(result)
```

### GetExecution


You can retrieve details about a local execution using the `GetExecution` API.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(region="us-west-2", overrides={
            "execution": {
                "local": True,
            }
        })
sagemaker_studio_api = SageMakerStudioAPI(config)

get_response = sagemaker_studio_api.execution_client.get_execution(execution_id="asdf-3b998be2-02dd-42af-8802-593d48d04daa")
print(get_response)
```

### ListExecutions


You can use the `ListExecutions` API to list all the executions that ran in the user's space.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(region="us-west-2", overrides={
            "execution": {
                "local": True,
            }
        })
sagemaker_studio_api = SageMakerStudioAPI(config)

list_executions_response = sagemaker_studio_api.execution_client.list_executions(status="COMPLETED")
print(list_executions_response)
```

### StopExecution


You can use the `StopExecution` API to stop an execution that's running in the user space.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(region="us-west-2", overrides={
            "execution": {
                "local": True,
            }
        })
sagemaker_studio_api = SageMakerStudioAPI(config)

stop_response = sagemaker_studio_api.execution_client.stop_execution(execution_id="asdf-3b998be2-02dd-42af-8802-593d48d04daa")
print(stop_response)
```

# Remote Execution APIs


Use the following APIs to start, stop, get, or list executions running on remote compute.

## StartExecution


You can start a notebook execution headlessly on a remote compute specified in the `StartExecution` request.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(region="us-west-2")
sagemaker_studio_api = SageMakerStudioAPI(config)

result = sagemaker_studio_api.execution_client.start_execution(
    execution_name="my-execution",
    execution_type="NOTEBOOK",
    input_config={"notebook_config": {"input_path": "src/folder2/test.ipynb"}},
    output_config={"notebook_config": {"output_formats": ["NOTEBOOK", "HTML"]}},
    termination_condition={"max_runtime_in_seconds": 9000},
    compute={
        "instance_type": "ml.c5.xlarge",
        "image_details": {
            # provide either ecr_uri or (image_name and image_version)
            "image_name": "sagemaker-distribution-embargoed-loadtest",
            "image_version": "2.2",
            "ecr_uri": "123456123456.dkr.ecr.us-west-2.amazonaws.com/ImageName:latest",
        }
    }
)
print(result)
```

## GetExecution


You can retrieve details about an execution running on remote compute using the `GetExecution` API.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(region="us-west-2")
sagemaker_studio_api = SageMakerStudioAPI(config)

get_response = sagemaker_studio_api.execution_client.get_execution(execution_id="asdf-3b998be2-02dd-42af-8802-593d48d04daa")
print(get_response)
```

## ListExecutions


You can use the `ListExecutions` API to list all the headless executions that ran on remote compute.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(region="us-west-2")
sagemaker_studio_api = SageMakerStudioAPI(config)

list_executions_response = sagemaker_studio_api.execution_client.list_executions(status="COMPLETED")
print(list_executions_response)
```

## StopExecution


You can use the `StopExecution` API to stop an execution that's running on remote compute.

```
from sagemaker_studio.sagemaker_studio_api import SageMakerStudioAPI
from sagemaker_studio import ClientConfig

config = ClientConfig(region="us-west-2")
sagemaker_studio_api = SageMakerStudioAPI(config)

stop_response = sagemaker_studio_api.execution_client.stop_execution(execution_id="asdf-3b998be2-02dd-42af-8802-593d48d04daa")
print(stop_response)
```