

# Utility Methods


The Amazon SageMaker Unified Studio SDK provides utility modules for common data operations including SQL execution, DataFrame operations, and Spark session management.

# SQL Utilities


The SQL utilities module provides a simple interface for executing SQL queries against various database engines within Amazon SageMaker Unified Studio. When no connection is specified, queries are executed locally using DuckDB.

## Supported Database Engines


The following database engines are supported:
+ Amazon Athena
+ Amazon Redshift
+ MySQL
+ PostgreSQL
+ Snowflake
+ Google BigQuery
+ Amazon DynamoDB
+ Microsoft SQL Server
+ DuckDB (default when no connection specified)

## Basic Usage


Import the SQL utilities:

```
from sagemaker_studio import sqlutils
```

### Execute SQL with DuckDB (No Connection)


When no connection is specified, queries are executed locally using DuckDB:

```
# Simple SELECT query
result = sqlutils.sql("SELECT 1 as test_column")
result

# Query with literal values
result = sqlutils.sql("SELECT * FROM table WHERE id = 123")
```

### Execute SQL with Project Connections


Use existing project connections by specifying either connection name or ID:

```
# Using connection name
result = sqlutils.sql(
    "SELECT * FROM my_table",
    connection_name="my_athena_connection"
)

# Using connection ID
result = sqlutils.sql(
    "SELECT * FROM my_table",
    connection_id="conn_12345"
)
```

## Examples by Database Engine


### Amazon Athena


```
# Query Athena using project connection with parameters
result = sqlutils.sql(
    """
    SELECT customer_id, order_date, total_amount
    FROM orders
    WHERE order_date >= :start_date
    """,
    parameters={"start_date": "2024-01-01"},
    connection_name="project.athena"
)

# Create external table in Athena
sqlutils.sql(
    """
    CREATE EXTERNAL TABLE sales_data (
        customer_id bigint,
        order_date date,
        amount decimal(10,2)
    )
    LOCATION 's3://my-bucket/sales-data/'
    """,
    connection_name="project.athena"
)

# Insert data using Create Table As Select (CTAS)
sqlutils.sql(
    """
    CREATE TABLE monthly_sales AS
    SELECT
        DATE_TRUNC('month', order_date) as month,
        SUM(amount) as total_sales
    FROM sales_data
    GROUP BY DATE_TRUNC('month', order_date)
    """,
    connection_name="project.athena"
)
```

### Amazon Redshift


```
# Query Redshift with parameters
result = sqlutils.sql(
    """
    SELECT product_name, category, price
    FROM products
    WHERE category = :category
    AND price > :min_price
    """,
    parameters={"category": "Electronics", "min_price": 100},
    connection_name="project.redshift"
)

# Create table in Redshift
sqlutils.sql(
    """
    CREATE TABLE customer_summary (
        customer_id INTEGER PRIMARY KEY,
        total_orders INTEGER,
        total_spent DECIMAL(10,2),
        last_order_date DATE
    )
    """,
    connection_name="project.redshift"
)

# Insert aggregated data
sqlutils.sql(
    """
    INSERT INTO customer_summary
    SELECT
        customer_id,
        COUNT(*) as total_orders,
        SUM(amount) as total_spent,
        MAX(order_date) as last_order_date
    FROM orders
    GROUP BY customer_id
    """,
    connection_name="project.redshift"
)

# Update existing records
sqlutils.sql(
    """
    UPDATE products
    SET price = price * 1.1
    WHERE category = 'Electronics'
    """,
    connection_name="project.redshift"
)
```

## Advanced Usage


### Working with DataFrames


The sql function returns pandas DataFrames for SELECT queries, and row counts for DML operations:

```
import pandas as pd

# Execute query and get DataFrame
df = sqlutils.sql("SELECT * FROM sales_data", connection_name="redshift_conn")

# Use pandas operations
summary = df.groupby('region')['sales'].sum()
print(summary)

# Save to file
df.to_csv('sales_report.csv', index=False)

# DML operations return row counts
rows_affected = sqlutils.sql(
    "UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 123",
    connection_name="redshift_conn"
)
print(f"Updated {rows_affected} inventory records")
```

### Parameterized Queries


Use parameters to safely pass values to queries:

```
# Dictionary parameters (recommended)
result = sqlutils.sql(
    "SELECT * FROM orders WHERE customer_id = :customer_id AND status = :status",
    parameters={"customer_id": 12345, "status": "completed"},
    connection_name="redshift_connection"
)

# Athena with named parameters
result = sqlutils.sql(
    "SELECT * FROM products WHERE category = :category AND price > :min_price",
    parameters={"category": "Electronics", "min_price": 100},
    connection_name="athena_connection"
)
```

### Getting Database Engine


You can also get the underlying SQLAlchemy engine for advanced operations:

```
# Get engine for a connection
engine = sqlutils.get_engine(connection_name="redshift_connection")

# Use engine directly with pandas
import pandas as pd

df = pd.read_sql("SELECT * FROM large_table LIMIT 1000", engine)
```

### DuckDB Features


When using DuckDB (no connection specified), you get additional capabilities:

#### Python Integration


```
# DuckDB can access Python variables directly
import pandas as pd

my_df = pd.DataFrame({'id': [1, 2, 3], 'name': ['A', 'B', 'C']})

result = sqlutils.sql("SELECT * FROM my_df WHERE id > 1")
```

### Notes

+ All queries return pandas DataFrames for easy data manipulation
+ DuckDB is automatically configured with Amazon S3 credentials from the environment
+ Connection credentials are managed through Amazon SageMaker Unified Studio project connections
+ The module handles connection pooling and cleanup automatically

# DataFrame Utilities


Read from and write to catalog tables using pandas DataFrames with automatic format detection and database management.

Supported catalog types:
+ AwsDataCatalog
+ S3CatalogTables

## Basic Usage


Import the DataFrame utilities:

```
from sagemaker_studio import dataframeutils
```

## Reading from Catalog Tables


Required Inputs:
+ database (str): Database name within the catalog
+ table (str): Table name

Optional Parameters:
+ catalog (str): Catalog identifier (defaults to AwsDataCatalog if not specified)
+ format (str): Data format - auto-detects from table metadata, falls back to parquet
+ \$1\$1kwargs: Additional arguments
  + for AwsDataCatalog, kwargs can be columns, chunked, etc
  + for S3Tables, kwargs can be limit, row\$1filter, selected\$1fields, etc

```
import pandas as pd

# Read from AwsDataCatalog
df = pd.read_catalog_table(
    database="my_database",
    table="my_table"
)

# Read from S3 Tables
df = pd.read_catalog_table(
   database="my_database",
   table="my_table",
   catalog="s3tablescatalog/my_s3_tables_catalog",
)
```

### Usage with optional parameters


```
import pandas as pd

# Read from AwsDataCatalog by explicitly specifying catalogID and format
df = pd.read_catalog_table(
    database="my_database",
    table="my_table",
    catalog="123456789012",
    format="parquet"
)

# Read from AwsDataCatalog by explicitly specifying catalogID, format, and additional args -> columns
df = pd.read_catalog_table(
    database="my_database",
    table="my_table",
    catalog="123456789012",
    format="parquet",
    columns=['<column_name_1>, <column_name_2>']
)

# Read from S3 Tables with additional args -> limit
df = pd.read_catalog_table(
   database="my_database",
   table="my_table",
   catalog="s3tablescatalog/my_s3_tables_catalog",
   limit=500
)

# Read from S3 Tables with additional args -> selected_fields
df = pd.read_catalog_table(
   database="my_database",
   table="my_table",
   catalog="s3tablescatalog/my_s3_tables_catalog",
   selected_fields=['<field_name_1>, <field_name_2>']
)
```

## Writing to Catalog Tables


Required Inputs:
+ database (str): Database name within the catalog
+ table (str): Table name

Optional Parameters:
+ catalog (str): Catalog identifier (defaults to AwsDataCatalog if not specified)
+ format (str): Data format used for AwsDataCatalog (default: parquet)
+ path (str): Custom Amazon S3 path for writing to AwsDataCatalog (auto-determined if not provided)
+ \$1\$1kwargs: Additional arguments

Path Resolution Priority - Amazon S3 path is determined in this order:
+ User-provided path parameter
+ Existing database location \$1 table name
+ Existing table location
+ Project default Amazon S3 location

```
import pandas as pd

# Create sample DataFrame
df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'value': [10.5, 20.3, 15.7]
})

# Write to AwsDataCatalog
df.to_catalog_table(
    database="my_database",
    table="my_table"
)

# Write to S3 Table Catalog
df.to_catalog_table(
    database="my_database",
    table="my_table",
    catalog="s3tablescatalog/my_s3_tables_catalog"
)
```

### Writing to Catalog Tables


```
# Write to AwsDataCatalog with csv format
df.to_catalog_table(
    database="my_database",
    table="my_table",
    format="csv"
)

# Write to AwsDataCatalog at user specified s3 path
df.to_catalog_table(
    database="my_database",
    table="my_table",
    path="s3://my-bucket/custom/path/"
)

# Write to AwsDataCatalog with additional argument -> compression
df.to_catalog_table(
    database="my_database",
    table="my_table",
    compression='gzip'
)
```

# Spark Utilities


The Spark utilities module provides a simple interface for working with Spark Connect sessions and managing Spark configurations for various data sources within Amazon SageMaker Unified Studio. When no connection is specified, a Spark Connect session is created using the default Amazon Athena Spark connection.

## Basic Usage


Import the Spark utilities:

```
from sagemaker_studio import sparkutils
```

## Initialize Spark Session


Supported connection types:
+ Spark connect

Optional Parameters:
+ connection\$1name (str): Name of the connection to execute query against (e.g., "my\$1redshift\$1connection")

When no connection is specified, a default Amazon Athena Spark session is created:

```
# Default session
spark = sparkutils.init()

# Session with specific connection
spark = sparkutils.init(connection_name="my_spark_connection")
```

## Working with Spark Options


Supported connection types:
+ Amazon DocumentDB
+ Amazon DynamoDB
+ Amazon Redshift
+ Aurora MySQL
+ Aurora PostgreSQL
+ Azure SQL
+ Google BigQuery
+ Microsoft SQL Server
+ MySQL
+ PostgreSQL
+ Oracle
+ Snowflake

Required Inputs:
+ connection\$1name (str): Name of the connection to get Spark options for (e.g., "my\$1redshift\$1connection")

Get formatted Spark options for connecting to data sources:

```
# Get options for Redshift connection
options = sparkutils.get_spark_options("my_redshift_connection")
```

## Examples by Operation Type


### Reading and Writing Data


```
# Create sample DataFrame
df_to_write = spark.createDataFrame(
    [(1, "Alice"), (2, "Bob")],
    ["id", "name"]
)

# Get spark options for Redshift connection
spark_options = sparkutils.get_spark_options("my_redshift_connection")

# Write DataFrame using JDBC
df_to_write.write \
    .format("jdbc") \
    .options(**spark_options) \
    .option("dbtable", "sample_table") \
    .save()

# Read DataFrame using JDBC
df_to_read = spark.read \
    .format('jdbc') \
    .options(**spark_options) \
    .option('dbtable', 'sample_table') \
    .load()

# Display results
df_to_read.show()
```

## Notes

+ Spark sessions are automatically configured for Amazon Athena spark compute
+ Connection credentials are managed through Amazon SageMaker Unified Studio project connections
+ The module handles session management and cleanup automatically
+ Spark options are formatted appropriately for each supported data source
+ When get\$1spark\$1options is used in EMR-S or EMR-on-EC2 compute, and the connection has EnforceSSL enabled, the formatted spark options will not have the sslrootcert value and hence that would need to be passed explicitly.