Esempi di Amazon S3 Control con SDK per Python (Boto3) - AWS Esempi di codice SDK

Sono disponibili altri esempi AWS SDK nel repository AWS Doc SDK Examples. GitHub

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempi di Amazon S3 Control con SDK per Python (Boto3)

I seguenti esempi di codice mostrano come eseguire azioni e implementare scenari comuni utilizzando il controllo AWS SDK per Python (Boto3) con Amazon S3.

Nozioni di base: esempi di codice che mostrano come eseguire le operazioni essenziali all’interno di un servizio.

Le azioni sono estratti di codice da programmi più grandi e devono essere eseguite nel contesto. Sebbene le azioni mostrino come richiamare le singole funzioni del servizio, è possibile visualizzarle contestualizzate negli scenari correlati.

Ogni esempio include un link al codice sorgente completo, in cui vengono fornite le istruzioni su come configurare ed eseguire il codice nel contesto.

Nozioni di base

L’esempio di codice seguente mostra come iniziare a utilizzare Amazon S3 Control.

SDK per Python (Boto3)
Nota

C'è altro su. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise
  • Per i dettagli sull'API, consulta ListJobs AWSSDK for Python (Boto3) API Reference.

Nozioni di base

L’esempio di codice seguente mostra come imparare a utilizzare le operazioni di base per Amazon S3 Control.

SDK per Python (Boto3)
Nota

C'è di più su. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

Scopri lo scenario di base di S3 Batch.

class S3BatchWrapper: """Wrapper class for managing S3 Batch Operations.""" def __init__(self, s3_client: Any, s3control_client: Any, sts_client: Any) -> None: """ Initializes the S3BatchWrapper with AWS service clients. :param s3_client: A Boto3 Amazon S3 client. This client provides low-level access to AWS S3 services. :param s3control_client: A Boto3 Amazon S3 Control client. This client provides low-level access to AWS S3 Control services. :param sts_client: A Boto3 AWS STS client. This client provides low-level access to AWS STS services. """ self.s3_client = s3_client self.s3control_client = s3control_client self.sts_client = sts_client # Get region from the client for bucket creation logic self.region_name = self.s3_client.meta.region_name def get_account_id(self) -> str: """ Get AWS account ID. Returns: str: AWS account ID """ return self.sts_client.get_caller_identity()["Account"] def create_bucket(self, bucket_name: str) -> None: """ Create an S3 bucket. Args: bucket_name (str): Name of the bucket to create Raises: ClientError: If bucket creation fails """ try: if self.region_name and self.region_name != 'us-east-1': self.s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': self.region_name } ) else: self.s3_client.create_bucket(Bucket=bucket_name) print(f"Created bucket: {bucket_name}") except ClientError as e: print(f"Error creating bucket: {e}") raise def upload_files_to_bucket(self, bucket_name: str, file_names: List[str]) -> str: """ Upload files to S3 bucket including manifest file. Args: bucket_name (str): Target bucket name file_names (list): List of file names to upload Returns: str: ETag of the manifest file Raises: ClientError: If file upload fails """ try: for file_name in file_names: if file_name != "job-manifest.csv": content = f"Content for {file_name}" self.s3_client.put_object( Bucket=bucket_name, Key=file_name, Body=content.encode('utf-8') ) print(f"Uploaded {file_name} to {bucket_name}") manifest_content = "" for file_name in file_names: if file_name != "job-manifest.csv": manifest_content += f"{bucket_name},{file_name}\n" manifest_response = self.s3_client.put_object( Bucket=bucket_name, Key="job-manifest.csv", Body=manifest_content.encode('utf-8') ) print(f"Uploaded manifest file to {bucket_name}") print(f"Manifest content:\n{manifest_content}") return manifest_response['ETag'].strip('"') except ClientError as e: print(f"Error uploading files: {e}") raise def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise def check_job_failure_reasons(self, job_id: str, account_id: str) -> List[Dict[str, Any]]: """ Check for any failure reasons of a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID Returns: list: List of failure reasons Raises: ClientError: If checking job failure reasons fails """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) if 'FailureReasons' in response['Job']: for reason in response['Job']['FailureReasons']: print(f"- {reason}") return response['Job'].get('FailureReasons', []) except ClientError as e: print(f"Error checking job failure reasons: {e}") raise def wait_for_job_ready(self, job_id: str, account_id: str, desired_status: str = 'Ready') -> bool: """ Wait for a job to reach the desired status. Args: job_id (str): ID of the batch job account_id (str): AWS account ID desired_status (str): Target status to wait for Returns: bool: True if desired status is reached, False otherwise Raises: ClientError: If checking job status fails """ print(f"Waiting for job to become {desired_status}...") max_attempts = 60 attempt = 0 while attempt < max_attempts: try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status == desired_status: return True if current_status == 'Suspended': print("Job is in Suspended state, can proceed with activation") return True if current_status in ['Active', 'Failed', 'Cancelled', 'Complete']: print(f"Job is in {current_status} state, cannot reach {desired_status} status") if 'FailureReasons' in response['Job']: print("Failure reasons:") for reason in response['Job']['FailureReasons']: print(f"- {reason}") return False time.sleep(20) attempt += 1 except ClientError as e: print(f"Error checking job status: {e}") raise print(f"Timeout waiting for job to become {desired_status}") return False def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise def cleanup_resources(self, bucket_name: str, file_names: List[str]) -> None: """ Clean up all resources created during the scenario. Args: bucket_name (str): Name of the bucket to clean up file_names (list): List of files to delete Raises: ClientError: If cleanup fails """ try: for file_name in file_names: self.s3_client.delete_object(Bucket=bucket_name, Key=file_name) print(f"Deleted {file_name}") response = self.s3_client.list_objects_v2( Bucket=bucket_name, Prefix='batch-op-reports/' ) if 'Contents' in response: for obj in response['Contents']: self.s3_client.delete_object( Bucket=bucket_name, Key=obj['Key'] ) print(f"Deleted {obj['Key']}") self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Deleted bucket {bucket_name}") except ClientError as e: print(f"Error in cleanup: {e}") raise

Azioni

Il seguente esempio di codice mostra come usare. CreateJob

SDK per Python (Boto3)
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise
  • Per i dettagli sull'API, consulta CreateJob AWSSDK for Python (Boto3) API Reference.

Il seguente esempio di codice mostra come utilizzare. DeleteJobTagging

SDK per Python (Boto3)
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise

Il seguente esempio di codice mostra come utilizzare. DescribeJob

SDK per Python (Boto3)
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise
  • Per i dettagli sull'API, consulta DescribeJob AWSSDK for Python (Boto3) API Reference.

Il seguente esempio di codice mostra come utilizzare. GetJobTagging

SDK per Python (Boto3)
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise
  • Per i dettagli sull'API, consulta GetJobTagging AWSSDK for Python (Boto3) API Reference.

Il seguente esempio di codice mostra come utilizzare. PutJobTagging

SDK per Python (Boto3)
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise
  • Per i dettagli sull'API, consulta PutJobTagging AWSSDK for Python (Boto3) API Reference.

Il seguente esempio di codice mostra come utilizzare. UpdateJobPriority

SDK per Python (Boto3)
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return

Il seguente esempio di codice mostra come utilizzare. UpdateJobStatus

SDK per Python (Boto3)
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise