There are more AWS SDK examples available in the AWS Doc SDK Examples
Getting started with Amazon EMR
The following code example shows how to:
Create an EC2 key pair
Set up storage and prepare your application
Clean up resources
- Bash
-
- AWS CLI with Bash script
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Sample developer tutorials
repository. #!/bin/bash # EMR Getting Started Tutorial Script # This script automates the steps in the Amazon EMR Getting Started tutorial set -euo pipefail # Security: Set strict mode and trap errors trap 'handle_error "Script interrupted or command failed"' ERR # Set up logging with secure permissions LOG_FILE="emr-tutorial.log" touch "$LOG_FILE" chmod 600 "$LOG_FILE" exec > >(tee -a "$LOG_FILE") 2>&1 echo "Starting Amazon EMR Getting Started Tutorial Script" echo "Logging to $LOG_FILE" # Function to handle errors handle_error() { echo "ERROR: $1" echo "Resources created so far:" if [ -n "${BUCKET_NAME:-}" ]; then echo "- S3 Bucket: $BUCKET_NAME"; fi if [ -n "${CLUSTER_ID:-}" ]; then echo "- EMR Cluster: $CLUSTER_ID"; fi echo "Attempting to clean up resources..." cleanup exit 1 } # Function to clean up resources cleanup() { echo "" echo "===========================================" echo "CLEANUP IN PROGRESS" echo "===========================================" echo "Starting cleanup process..." # Terminate EMR cluster if it exists if [ -n "${CLUSTER_ID:-}" ]; then echo "Terminating EMR cluster: $CLUSTER_ID" aws emr terminate-clusters --cluster-ids "$CLUSTER_ID" 2>/dev/null || true echo "Waiting for cluster to terminate..." aws emr wait cluster-terminated --cluster-id "$CLUSTER_ID" 2>/dev/null || true echo "Cluster terminated successfully." fi # Delete S3 bucket and contents if it exists and is not shared if [ -n "${BUCKET_NAME:-}" ] && [ "${BUCKET_IS_SHARED:-false}" != "true" ]; then echo "Deleting S3 bucket contents: $BUCKET_NAME" aws s3 rm "s3://$BUCKET_NAME" --recursive 2>/dev/null || true echo "Deleting S3 bucket: $BUCKET_NAME" aws s3 rb "s3://$BUCKET_NAME" 2>/dev/null || true fi # Remove temporary key pair file if created by this script if [ -f "${KEY_NAME_FILE:-}" ]; then rm -f "$KEY_NAME_FILE" echo "Removed temporary key pair file." fi echo "Cleanup completed." } # Validate AWS CLI is installed and configured if ! command -v aws &> /dev/null; then handle_error "AWS CLI is not installed" fi # Test AWS credentials if ! aws sts get-caller-identity > /dev/null 2>&1; then handle_error "AWS credentials are not configured or invalid" fi # Generate a random identifier for S3 bucket RANDOM_ID=$(openssl rand -hex 6) # Check for shared prereq bucket PREREQ_BUCKET=$(aws cloudformation describe-stacks --stack-name tutorial-prereqs-bucket \ --query 'Stacks[0].Outputs[?OutputKey==`BucketName`].OutputValue' --output text 2>/dev/null || true) if [ -n "$PREREQ_BUCKET" ] && [ "$PREREQ_BUCKET" != "None" ]; then BUCKET_NAME="$PREREQ_BUCKET" BUCKET_IS_SHARED=true echo "Using shared bucket: $BUCKET_NAME" else BUCKET_IS_SHARED=false BUCKET_NAME="emr-${RANDOM_ID}" fi echo "Using bucket name: $BUCKET_NAME" # Create S3 bucket with security best practices echo "Creating S3 bucket: $BUCKET_NAME" aws s3 mb "s3://$BUCKET_NAME" --region "${AWS_REGION:-us-east-1}" || handle_error "Failed to create S3 bucket" # Tag the bucket aws s3api put-bucket-tagging --bucket "$BUCKET_NAME" \ --tagging 'TagSet=[{Key=project,Value=doc-smith},{Key=tutorial,Value=emr-gs}]' # Enable bucket versioning for safety aws s3api put-bucket-versioning --bucket "$BUCKET_NAME" --versioning-configuration Status=Enabled || true # Block public access to bucket aws s3api put-public-access-block --bucket "$BUCKET_NAME" \ --public-access-block-configuration \ "BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true" || true # Enable encryption on bucket aws s3api put-bucket-encryption --bucket "$BUCKET_NAME" \ --server-side-encryption-configuration '{ "Rules": [{ "ApplyServerSideEncryptionByDefault": { "SSEAlgorithm": "AES256" } }] }' || true echo "S3 bucket created successfully with security best practices." # Create PySpark script echo "Creating PySpark script: health_violations.py" cat > health_violations.py << 'EOL' import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI of your food establishment data CSV, such as 's3://emr-tutorial-bucket/food-establishment-data.csv'. :param output_uri: The URI where output is written, such as 's3://emr-tutorial-bucket/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations FROM restaurant_violations WHERE violation_type = 'RED' GROUP BY name ORDER BY total_red_violations DESC LIMIT 10""") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.") parser.add_argument( '--output_uri', help="The URI where output is saved, like an S3 bucket location.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri) EOL # Secure the script file chmod 600 health_violations.py # Upload PySpark script to S3 echo "Uploading PySpark script to S3" aws s3 cp health_violations.py "s3://$BUCKET_NAME/" --sse AES256 || handle_error "Failed to upload PySpark script" echo "PySpark script uploaded successfully." # Download and prepare sample data echo "Downloading sample data" curl -sS -o food_establishment_data.zip "https://docs.aws.amazon.com/emr/latest/ManagementGuide/samples/food_establishment_data.zip" || handle_error "Failed to download sample data" # Verify downloaded file if [ ! -f food_establishment_data.zip ] || [ ! -s food_establishment_data.zip ]; then handle_error "Downloaded file is empty or missing" fi unzip -o food_establishment_data.zip || handle_error "Failed to unzip sample data" echo "Sample data downloaded and extracted successfully." # Secure the sample data file chmod 600 food_establishment_data.csv # Upload sample data to S3 echo "Uploading sample data to S3" aws s3 cp food_establishment_data.csv "s3://$BUCKET_NAME/" --sse AES256 || handle_error "Failed to upload sample data" echo "Sample data uploaded successfully." # Clean up sensitive local files rm -f food_establishment_data.zip health_violations.py # Create IAM default roles for EMR echo "Creating IAM default roles for EMR" aws emr create-default-roles 2>/dev/null || true echo "IAM default roles created successfully." # Check if EC2 key pair exists echo "Checking for EC2 key pair" KEY_PAIRS=$(aws ec2 describe-key-pairs --query "KeyPairs[*].KeyName" --output text 2>/dev/null || true) if [ -z "$KEY_PAIRS" ]; then echo "No EC2 key pairs found. Creating a new key pair..." KEY_NAME="emr-tutorial-key-${RANDOM_ID}" KEY_NAME_FILE="${KEY_NAME}.pem" aws ec2 create-key-pair --key-name "$KEY_NAME" \ --tag-specifications 'ResourceType=key-pair,Tags=[{Key=project,Value=doc-smith},{Key=tutorial,Value=emr-gs}]' \ --query "KeyMaterial" --output text > "$KEY_NAME_FILE" chmod 400 "$KEY_NAME_FILE" echo "Created new key pair: $KEY_NAME" else # Use the first available key pair KEY_NAME=$(echo "$KEY_PAIRS" | awk '{print $1}') echo "Using existing key pair: $KEY_NAME" fi # Launch EMR cluster with security best practices echo "Launching EMR cluster with Spark" CLUSTER_RESPONSE=$(aws emr create-cluster \ --name "EMR Tutorial Cluster" \ --release-label emr-6.10.0 \ --applications Name=Spark \ --ec2-attributes KeyName="$KEY_NAME" \ --instance-type m5.xlarge \ --instance-count 3 \ --use-default-roles \ --log-uri "s3://$BUCKET_NAME/logs/" \ --ebs-root-volume-size 100 \ --tags Key=project,Value=doc-smith Key=tutorial,Value=emr-gs \ --security-configuration "EMR-Tutorial-SecurityConfig" 2>/dev/null || true) # Check for errors in the response if echo "$CLUSTER_RESPONSE" | grep -i "error" > /dev/null; then handle_error "Failed to create EMR cluster: $CLUSTER_RESPONSE" fi # Extract cluster ID using jq if available, otherwise use alternative parsing if command -v jq &> /dev/null; then CLUSTER_ID=$(echo "$CLUSTER_RESPONSE" | jq -r '.ClusterId // empty') else CLUSTER_ID=$(echo "$CLUSTER_RESPONSE" | grep -o '"ClusterId"[[:space:]]*:[[:space:]]*"[^"]*' | grep -o 'j-[A-Z0-9]*' || true) fi if [ -z "$CLUSTER_ID" ] || [ "$CLUSTER_ID" == "null" ]; then handle_error "Failed to extract cluster ID from response: $CLUSTER_RESPONSE" fi echo "EMR cluster created with ID: $CLUSTER_ID" # Wait for cluster to be ready echo "Waiting for cluster to be ready (this may take several minutes)..." aws emr wait cluster-running --cluster-id "$CLUSTER_ID" || handle_error "Cluster failed to reach running state" # Check if cluster is in WAITING state CLUSTER_STATE=$(aws emr describe-cluster --cluster-id "$CLUSTER_ID" --query "Cluster.Status.State" --output text) if [ "$CLUSTER_STATE" != "WAITING" ]; then echo "Waiting for cluster to reach WAITING state..." WAIT_COUNT=0 MAX_WAIT=120 while [ "$CLUSTER_STATE" != "WAITING" ]; do if [ $WAIT_COUNT -ge $MAX_WAIT ]; then handle_error "Cluster did not reach WAITING state within timeout period" fi sleep 30 CLUSTER_STATE=$(aws emr describe-cluster --cluster-id "$CLUSTER_ID" --query "Cluster.Status.State" --output text) echo "Current cluster state: $CLUSTER_STATE" # Check for error states if [[ "$CLUSTER_STATE" == "TERMINATED_WITH_ERRORS" || "$CLUSTER_STATE" == "TERMINATED" ]]; then handle_error "Cluster entered error state: $CLUSTER_STATE" fi WAIT_COUNT=$((WAIT_COUNT + 1)) done fi echo "Cluster is now in WAITING state and ready to accept work." # Submit Spark application as a step echo "Submitting Spark application as a step" STEP_RESPONSE=$(aws emr add-steps \ --cluster-id "$CLUSTER_ID" \ --steps Type=Spark,Name="Health Violations Analysis",ActionOnFailure=CONTINUE,Args=["s3://$BUCKET_NAME/health_violations.py","--data_source","s3://$BUCKET_NAME/food_establishment_data.csv","--output_uri","s3://$BUCKET_NAME/results/"]) # Check for errors in the response if echo "$STEP_RESPONSE" | grep -i "error" > /dev/null; then handle_error "Failed to submit step: $STEP_RESPONSE" fi # Extract step ID using appropriate method if command -v jq &> /dev/null; then STEP_ID=$(echo "$STEP_RESPONSE" | jq -r '.StepIds[0] // empty') else STEP_ID=$(echo "$STEP_RESPONSE" | grep -o 's-[A-Z0-9]*' | head -1 || true) fi if [ -z "$STEP_ID" ] || [ "$STEP_ID" == "null" ]; then echo "Full step response: $STEP_RESPONSE" handle_error "Failed to extract valid step ID from response" fi echo "Step submitted with ID: $STEP_ID" # Wait for step to complete with timeout echo "Waiting for step to complete (this may take several minutes)..." aws emr wait step-complete --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" || handle_error "Step failed to complete" # Check step status STEP_STATE=$(aws emr describe-step --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" --query "Step.Status.State" --output text) if [ "$STEP_STATE" != "COMPLETED" ]; then handle_error "Step did not complete successfully. Final state: $STEP_STATE" fi echo "Step completed successfully." # View results echo "Listing output files in S3" aws s3 ls "s3://$BUCKET_NAME/results/" || handle_error "Failed to list output files" # Download results echo "Downloading results file" RESULT_FILE=$(aws s3 ls "s3://$BUCKET_NAME/results/" | grep -o "part-[0-9]*\.csv" | head -1 || true) if [ -z "$RESULT_FILE" ]; then echo "No result file found with pattern 'part-[0-9]*.csv'. Trying to find any CSV file..." RESULT_FILE=$(aws s3 ls "s3://$BUCKET_NAME/results/" | grep -o "part-.*\.csv" | head -1 || true) if [ -z "$RESULT_FILE" ]; then echo "Listing all files in results directory:" aws s3 ls "s3://$BUCKET_NAME/results/" handle_error "No result file found in the output directory" fi fi aws s3 cp "s3://$BUCKET_NAME/results/$RESULT_FILE" ./results.csv --sse AES256 || handle_error "Failed to download results file" chmod 600 ./results.csv echo "Results downloaded to results.csv" echo "Top 10 establishments with the most red violations:" cat results.csv # Display SSH connection information echo "" echo "To connect to the cluster via SSH, use the following command:" echo "aws emr ssh --cluster-id $CLUSTER_ID --key-pair-file ${KEY_NAME_FILE:-./${KEY_NAME}.pem}" # Display summary of created resources echo "" echo "===========================================" echo "RESOURCES CREATED" echo "===========================================" echo "- S3 Bucket: $BUCKET_NAME" echo "- EMR Cluster: $CLUSTER_ID" echo "- Results file: results.csv" if [ -f "${KEY_NAME_FILE:-}" ]; then echo "- EC2 Key Pair: $KEY_NAME (saved to ${KEY_NAME_FILE})" fi # Perform cleanup cleanup echo "Script completed successfully."-
For API details, see the following topics in AWS CLI Command Reference.
-
Getting started with Amazon EC2
Getting started with Amazon ElastiCache