View a markdown version of this page

Bash 스크립트와 AWS CLI 함께를 사용하는 Amazon EMR 예제 - AWS SDK 코드 예제

Doc AWS SDK 예제 GitHub 리포지토리에서 더 많은 SDK 예제를 사용할 수 있습니다. AWS

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Bash 스크립트와 AWS CLI 함께를 사용하는 Amazon EMR 예제

다음 코드 예제에서는 Amazon EMR에서 Bash 스크립트 AWS Command Line Interface 와 함께를 사용하여 작업을 수행하고 일반적인 시나리오를 구현하는 방법을 보여줍니다.

시나리오는 동일한 서비스 내에서 또는 다른 AWS 서비스와 결합된 상태에서 여러 함수를 직접적으로 호출하여 특정 태스크를 수행하는 방법을 보여주는 코드 예제입니다.

각 예시에는 전체 소스 코드에 대한 링크가 포함되어 있으며, 여기에서 컨텍스트에 맞춰 코드를 설정하고 실행하는 방법에 대한 지침을 찾을 수 있습니다.

시나리오

다음 코드 예제에서는 다음과 같은 작업을 수행하는 방법을 보여줍니다.

  • EC2 키 페어 생성

  • 스토리지 설정 및 애플리케이션 준비

  • 리소스 정리

AWS CLI Bash 스크립트 사용
참고

GitHub에 더 많은 내용이 있습니다. 샘플 개발자 튜토리얼 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 배워보세요.

#!/bin/bash # EMR Getting Started Tutorial Script # This script automates the steps in the Amazon EMR Getting Started tutorial set -euo pipefail # Security: Set strict mode and trap errors trap 'handle_error "Script interrupted or command failed"' ERR # Set up logging with secure permissions LOG_FILE="emr-tutorial.log" touch "$LOG_FILE" chmod 600 "$LOG_FILE" exec > >(tee -a "$LOG_FILE") 2>&1 echo "Starting Amazon EMR Getting Started Tutorial Script" echo "Logging to $LOG_FILE" # Function to handle errors handle_error() { echo "ERROR: $1" echo "Resources created so far:" if [ -n "${BUCKET_NAME:-}" ]; then echo "- S3 Bucket: $BUCKET_NAME"; fi if [ -n "${CLUSTER_ID:-}" ]; then echo "- EMR Cluster: $CLUSTER_ID"; fi echo "Attempting to clean up resources..." cleanup exit 1 } # Function to clean up resources cleanup() { echo "" echo "===========================================" echo "CLEANUP IN PROGRESS" echo "===========================================" echo "Starting cleanup process..." # Terminate EMR cluster if it exists if [ -n "${CLUSTER_ID:-}" ]; then echo "Terminating EMR cluster: $CLUSTER_ID" aws emr terminate-clusters --cluster-ids "$CLUSTER_ID" 2>/dev/null || true echo "Waiting for cluster to terminate..." aws emr wait cluster-terminated --cluster-id "$CLUSTER_ID" 2>/dev/null || true echo "Cluster terminated successfully." fi # Delete S3 bucket and contents if it exists and is not shared if [ -n "${BUCKET_NAME:-}" ] && [ "${BUCKET_IS_SHARED:-false}" != "true" ]; then echo "Deleting S3 bucket contents: $BUCKET_NAME" aws s3 rm "s3://$BUCKET_NAME" --recursive 2>/dev/null || true echo "Deleting S3 bucket: $BUCKET_NAME" aws s3 rb "s3://$BUCKET_NAME" 2>/dev/null || true fi # Remove temporary key pair file if created by this script if [ -f "${KEY_NAME_FILE:-}" ]; then rm -f "$KEY_NAME_FILE" echo "Removed temporary key pair file." fi echo "Cleanup completed." } # Validate AWS CLI is installed and configured if ! command -v aws &> /dev/null; then handle_error "AWS CLI is not installed" fi # Test AWS credentials if ! aws sts get-caller-identity > /dev/null 2>&1; then handle_error "AWS credentials are not configured or invalid" fi # Generate a random identifier for S3 bucket RANDOM_ID=$(openssl rand -hex 6) # Check for shared prereq bucket PREREQ_BUCKET=$(aws cloudformation describe-stacks --stack-name tutorial-prereqs-bucket \ --query 'Stacks[0].Outputs[?OutputKey==`BucketName`].OutputValue' --output text 2>/dev/null || true) if [ -n "$PREREQ_BUCKET" ] && [ "$PREREQ_BUCKET" != "None" ]; then BUCKET_NAME="$PREREQ_BUCKET" BUCKET_IS_SHARED=true echo "Using shared bucket: $BUCKET_NAME" else BUCKET_IS_SHARED=false BUCKET_NAME="emr-${RANDOM_ID}" fi echo "Using bucket name: $BUCKET_NAME" # Create S3 bucket with security best practices echo "Creating S3 bucket: $BUCKET_NAME" aws s3 mb "s3://$BUCKET_NAME" --region "${AWS_REGION:-us-east-1}" || handle_error "Failed to create S3 bucket" # Tag the bucket aws s3api put-bucket-tagging --bucket "$BUCKET_NAME" \ --tagging 'TagSet=[{Key=project,Value=doc-smith},{Key=tutorial,Value=emr-gs}]' # Enable bucket versioning for safety aws s3api put-bucket-versioning --bucket "$BUCKET_NAME" --versioning-configuration Status=Enabled || true # Block public access to bucket aws s3api put-public-access-block --bucket "$BUCKET_NAME" \ --public-access-block-configuration \ "BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true" || true # Enable encryption on bucket aws s3api put-bucket-encryption --bucket "$BUCKET_NAME" \ --server-side-encryption-configuration '{ "Rules": [{ "ApplyServerSideEncryptionByDefault": { "SSEAlgorithm": "AES256" } }] }' || true echo "S3 bucket created successfully with security best practices." # Create PySpark script echo "Creating PySpark script: health_violations.py" cat > health_violations.py << 'EOL' import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI of your food establishment data CSV, such as 's3://emr-tutorial-bucket/food-establishment-data.csv'. :param output_uri: The URI where output is written, such as 's3://emr-tutorial-bucket/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations FROM restaurant_violations WHERE violation_type = 'RED' GROUP BY name ORDER BY total_red_violations DESC LIMIT 10""") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.") parser.add_argument( '--output_uri', help="The URI where output is saved, like an S3 bucket location.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri) EOL # Secure the script file chmod 600 health_violations.py # Upload PySpark script to S3 echo "Uploading PySpark script to S3" aws s3 cp health_violations.py "s3://$BUCKET_NAME/" --sse AES256 || handle_error "Failed to upload PySpark script" echo "PySpark script uploaded successfully." # Download and prepare sample data echo "Downloading sample data" curl -sS -o food_establishment_data.zip "https://docs.aws.amazon.com/emr/latest/ManagementGuide/samples/food_establishment_data.zip" || handle_error "Failed to download sample data" # Verify downloaded file if [ ! -f food_establishment_data.zip ] || [ ! -s food_establishment_data.zip ]; then handle_error "Downloaded file is empty or missing" fi unzip -o food_establishment_data.zip || handle_error "Failed to unzip sample data" echo "Sample data downloaded and extracted successfully." # Secure the sample data file chmod 600 food_establishment_data.csv # Upload sample data to S3 echo "Uploading sample data to S3" aws s3 cp food_establishment_data.csv "s3://$BUCKET_NAME/" --sse AES256 || handle_error "Failed to upload sample data" echo "Sample data uploaded successfully." # Clean up sensitive local files rm -f food_establishment_data.zip health_violations.py # Create IAM default roles for EMR echo "Creating IAM default roles for EMR" aws emr create-default-roles 2>/dev/null || true echo "IAM default roles created successfully." # Check if EC2 key pair exists echo "Checking for EC2 key pair" KEY_PAIRS=$(aws ec2 describe-key-pairs --query "KeyPairs[*].KeyName" --output text 2>/dev/null || true) if [ -z "$KEY_PAIRS" ]; then echo "No EC2 key pairs found. Creating a new key pair..." KEY_NAME="emr-tutorial-key-${RANDOM_ID}" KEY_NAME_FILE="${KEY_NAME}.pem" aws ec2 create-key-pair --key-name "$KEY_NAME" \ --tag-specifications 'ResourceType=key-pair,Tags=[{Key=project,Value=doc-smith},{Key=tutorial,Value=emr-gs}]' \ --query "KeyMaterial" --output text > "$KEY_NAME_FILE" chmod 400 "$KEY_NAME_FILE" echo "Created new key pair: $KEY_NAME" else # Use the first available key pair KEY_NAME=$(echo "$KEY_PAIRS" | awk '{print $1}') echo "Using existing key pair: $KEY_NAME" fi # Launch EMR cluster with security best practices echo "Launching EMR cluster with Spark" CLUSTER_RESPONSE=$(aws emr create-cluster \ --name "EMR Tutorial Cluster" \ --release-label emr-6.10.0 \ --applications Name=Spark \ --ec2-attributes KeyName="$KEY_NAME" \ --instance-type m5.xlarge \ --instance-count 3 \ --use-default-roles \ --log-uri "s3://$BUCKET_NAME/logs/" \ --ebs-root-volume-size 100 \ --tags Key=project,Value=doc-smith Key=tutorial,Value=emr-gs \ --security-configuration "EMR-Tutorial-SecurityConfig" 2>/dev/null || true) # Check for errors in the response if echo "$CLUSTER_RESPONSE" | grep -i "error" > /dev/null; then handle_error "Failed to create EMR cluster: $CLUSTER_RESPONSE" fi # Extract cluster ID using jq if available, otherwise use alternative parsing if command -v jq &> /dev/null; then CLUSTER_ID=$(echo "$CLUSTER_RESPONSE" | jq -r '.ClusterId // empty') else CLUSTER_ID=$(echo "$CLUSTER_RESPONSE" | grep -o '"ClusterId"[[:space:]]*:[[:space:]]*"[^"]*' | grep -o 'j-[A-Z0-9]*' || true) fi if [ -z "$CLUSTER_ID" ] || [ "$CLUSTER_ID" == "null" ]; then handle_error "Failed to extract cluster ID from response: $CLUSTER_RESPONSE" fi echo "EMR cluster created with ID: $CLUSTER_ID" # Wait for cluster to be ready echo "Waiting for cluster to be ready (this may take several minutes)..." aws emr wait cluster-running --cluster-id "$CLUSTER_ID" || handle_error "Cluster failed to reach running state" # Check if cluster is in WAITING state CLUSTER_STATE=$(aws emr describe-cluster --cluster-id "$CLUSTER_ID" --query "Cluster.Status.State" --output text) if [ "$CLUSTER_STATE" != "WAITING" ]; then echo "Waiting for cluster to reach WAITING state..." WAIT_COUNT=0 MAX_WAIT=120 while [ "$CLUSTER_STATE" != "WAITING" ]; do if [ $WAIT_COUNT -ge $MAX_WAIT ]; then handle_error "Cluster did not reach WAITING state within timeout period" fi sleep 30 CLUSTER_STATE=$(aws emr describe-cluster --cluster-id "$CLUSTER_ID" --query "Cluster.Status.State" --output text) echo "Current cluster state: $CLUSTER_STATE" # Check for error states if [[ "$CLUSTER_STATE" == "TERMINATED_WITH_ERRORS" || "$CLUSTER_STATE" == "TERMINATED" ]]; then handle_error "Cluster entered error state: $CLUSTER_STATE" fi WAIT_COUNT=$((WAIT_COUNT + 1)) done fi echo "Cluster is now in WAITING state and ready to accept work." # Submit Spark application as a step echo "Submitting Spark application as a step" STEP_RESPONSE=$(aws emr add-steps \ --cluster-id "$CLUSTER_ID" \ --steps Type=Spark,Name="Health Violations Analysis",ActionOnFailure=CONTINUE,Args=["s3://$BUCKET_NAME/health_violations.py","--data_source","s3://$BUCKET_NAME/food_establishment_data.csv","--output_uri","s3://$BUCKET_NAME/results/"]) # Check for errors in the response if echo "$STEP_RESPONSE" | grep -i "error" > /dev/null; then handle_error "Failed to submit step: $STEP_RESPONSE" fi # Extract step ID using appropriate method if command -v jq &> /dev/null; then STEP_ID=$(echo "$STEP_RESPONSE" | jq -r '.StepIds[0] // empty') else STEP_ID=$(echo "$STEP_RESPONSE" | grep -o 's-[A-Z0-9]*' | head -1 || true) fi if [ -z "$STEP_ID" ] || [ "$STEP_ID" == "null" ]; then echo "Full step response: $STEP_RESPONSE" handle_error "Failed to extract valid step ID from response" fi echo "Step submitted with ID: $STEP_ID" # Wait for step to complete with timeout echo "Waiting for step to complete (this may take several minutes)..." aws emr wait step-complete --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" || handle_error "Step failed to complete" # Check step status STEP_STATE=$(aws emr describe-step --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" --query "Step.Status.State" --output text) if [ "$STEP_STATE" != "COMPLETED" ]; then handle_error "Step did not complete successfully. Final state: $STEP_STATE" fi echo "Step completed successfully." # View results echo "Listing output files in S3" aws s3 ls "s3://$BUCKET_NAME/results/" || handle_error "Failed to list output files" # Download results echo "Downloading results file" RESULT_FILE=$(aws s3 ls "s3://$BUCKET_NAME/results/" | grep -o "part-[0-9]*\.csv" | head -1 || true) if [ -z "$RESULT_FILE" ]; then echo "No result file found with pattern 'part-[0-9]*.csv'. Trying to find any CSV file..." RESULT_FILE=$(aws s3 ls "s3://$BUCKET_NAME/results/" | grep -o "part-.*\.csv" | head -1 || true) if [ -z "$RESULT_FILE" ]; then echo "Listing all files in results directory:" aws s3 ls "s3://$BUCKET_NAME/results/" handle_error "No result file found in the output directory" fi fi aws s3 cp "s3://$BUCKET_NAME/results/$RESULT_FILE" ./results.csv --sse AES256 || handle_error "Failed to download results file" chmod 600 ./results.csv echo "Results downloaded to results.csv" echo "Top 10 establishments with the most red violations:" cat results.csv # Display SSH connection information echo "" echo "To connect to the cluster via SSH, use the following command:" echo "aws emr ssh --cluster-id $CLUSTER_ID --key-pair-file ${KEY_NAME_FILE:-./${KEY_NAME}.pem}" # Display summary of created resources echo "" echo "===========================================" echo "RESOURCES CREATED" echo "===========================================" echo "- S3 Bucket: $BUCKET_NAME" echo "- EMR Cluster: $CLUSTER_ID" echo "- Results file: results.csv" if [ -f "${KEY_NAME_FILE:-}" ]; then echo "- EC2 Key Pair: $KEY_NAME (saved to ${KEY_NAME_FILE})" fi # Perform cleanup cleanup echo "Script completed successfully."