文件 AWS 開發套件範例 GitHub 儲存庫中有更多可用的 AWS SDK 範例
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用適用於 Python 的 SDK (Boto3) 的 Amazon S3 控制範例
下列程式碼範例示範如何使用 適用於 Python (Boto3) 的 AWS SDK 搭配 Amazon S3 Control 執行動作和實作常見案例。
基本概念是程式碼範例,這些範例說明如何在服務內執行基本操作。
Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會告訴您如何呼叫個別服務函數,但您可以在其相關情境中查看內容中的動作。
每個範例均包含完整原始碼的連結,您可在連結中找到如何設定和執行內容中程式碼的相關指示。
開始使用
下列程式碼範例示範如何開始使用 Amazon S3 Control。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise-
如需 API 詳細資訊,請參閱《AWS SDK for Python (Boto3) API 參考》中的 ListJobs。
-
基本概念
下列程式碼範例示範如何學習 Amazon S3 Control 的核心操作。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 了解 S3 批次基本概念案例。
class S3BatchWrapper: """Wrapper class for managing S3 Batch Operations.""" def __init__(self, s3_client: Any, s3control_client: Any, sts_client: Any) -> None: """ Initializes the S3BatchWrapper with AWS service clients. :param s3_client: A Boto3 Amazon S3 client. This client provides low-level access to AWS S3 services. :param s3control_client: A Boto3 Amazon S3 Control client. This client provides low-level access to AWS S3 Control services. :param sts_client: A Boto3 AWS STS client. This client provides low-level access to AWS STS services. """ self.s3_client = s3_client self.s3control_client = s3control_client self.sts_client = sts_client # Get region from the client for bucket creation logic self.region_name = self.s3_client.meta.region_name def get_account_id(self) -> str: """ Get AWS account ID. Returns: str: AWS account ID """ return self.sts_client.get_caller_identity()["Account"] def create_bucket(self, bucket_name: str) -> None: """ Create an S3 bucket. Args: bucket_name (str): Name of the bucket to create Raises: ClientError: If bucket creation fails """ try: if self.region_name and self.region_name != 'us-east-1': self.s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': self.region_name } ) else: self.s3_client.create_bucket(Bucket=bucket_name) print(f"Created bucket: {bucket_name}") except ClientError as e: print(f"Error creating bucket: {e}") raise def upload_files_to_bucket(self, bucket_name: str, file_names: List[str]) -> str: """ Upload files to S3 bucket including manifest file. Args: bucket_name (str): Target bucket name file_names (list): List of file names to upload Returns: str: ETag of the manifest file Raises: ClientError: If file upload fails """ try: for file_name in file_names: if file_name != "job-manifest.csv": content = f"Content for {file_name}" self.s3_client.put_object( Bucket=bucket_name, Key=file_name, Body=content.encode('utf-8') ) print(f"Uploaded {file_name} to {bucket_name}") manifest_content = "" for file_name in file_names: if file_name != "job-manifest.csv": manifest_content += f"{bucket_name},{file_name}\n" manifest_response = self.s3_client.put_object( Bucket=bucket_name, Key="job-manifest.csv", Body=manifest_content.encode('utf-8') ) print(f"Uploaded manifest file to {bucket_name}") print(f"Manifest content:\n{manifest_content}") return manifest_response['ETag'].strip('"') except ClientError as e: print(f"Error uploading files: {e}") raise def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise def check_job_failure_reasons(self, job_id: str, account_id: str) -> List[Dict[str, Any]]: """ Check for any failure reasons of a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID Returns: list: List of failure reasons Raises: ClientError: If checking job failure reasons fails """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) if 'FailureReasons' in response['Job']: for reason in response['Job']['FailureReasons']: print(f"- {reason}") return response['Job'].get('FailureReasons', []) except ClientError as e: print(f"Error checking job failure reasons: {e}") raise def wait_for_job_ready(self, job_id: str, account_id: str, desired_status: str = 'Ready') -> bool: """ Wait for a job to reach the desired status. Args: job_id (str): ID of the batch job account_id (str): AWS account ID desired_status (str): Target status to wait for Returns: bool: True if desired status is reached, False otherwise Raises: ClientError: If checking job status fails """ print(f"Waiting for job to become {desired_status}...") max_attempts = 60 attempt = 0 while attempt < max_attempts: try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status == desired_status: return True if current_status == 'Suspended': print("Job is in Suspended state, can proceed with activation") return True if current_status in ['Active', 'Failed', 'Cancelled', 'Complete']: print(f"Job is in {current_status} state, cannot reach {desired_status} status") if 'FailureReasons' in response['Job']: print("Failure reasons:") for reason in response['Job']['FailureReasons']: print(f"- {reason}") return False time.sleep(20) attempt += 1 except ClientError as e: print(f"Error checking job status: {e}") raise print(f"Timeout waiting for job to become {desired_status}") return False def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise def cleanup_resources(self, bucket_name: str, file_names: List[str]) -> None: """ Clean up all resources created during the scenario. Args: bucket_name (str): Name of the bucket to clean up file_names (list): List of files to delete Raises: ClientError: If cleanup fails """ try: for file_name in file_names: self.s3_client.delete_object(Bucket=bucket_name, Key=file_name) print(f"Deleted {file_name}") response = self.s3_client.list_objects_v2( Bucket=bucket_name, Prefix='batch-op-reports/' ) if 'Contents' in response: for obj in response['Contents']: self.s3_client.delete_object( Bucket=bucket_name, Key=obj['Key'] ) print(f"Deleted {obj['Key']}") self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Deleted bucket {bucket_name}") except ClientError as e: print(f"Error in cleanup: {e}") raise-
如需 API 詳細資訊,請參閱《適用於 Python (Boto3) 的AWS SDK API 參考》中的下列主題。
-
動作
以下程式碼範例顯示如何使用 CreateJob。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise-
如需 API 詳細資訊,請參閱《AWS SDK for Python (Boto3) API 參考》中的 CreateJob。
-
以下程式碼範例顯示如何使用 DeleteJobTagging。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise-
如需 API 詳細資訊,請參閱《適用於 AWS Python (Boto3) 的 SDK API 參考》中的 DeleteJobTagging。
-
以下程式碼範例顯示如何使用 DescribeJob。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise-
如需 API 的詳細資訊,請參閱《適用於 Python (Boto3) 的AWS 開發套件 API 參考》中的 DescribeJob。
-
以下程式碼範例顯示如何使用 GetJobTagging。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise-
如需 API 詳細資訊,請參閱《適用於 AWS Python (Boto3) 的 SDK API 參考》中的 GetJobTagging。
-
以下程式碼範例顯示如何使用 PutJobTagging。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise-
如需 API 詳細資訊,請參閱《適用於 AWS Python (Boto3) 的 SDK API 參考》中的 PutJobTagging。
-
以下程式碼範例顯示如何使用 UpdateJobPriority。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return-
如需 API 詳細資訊,請參閱《適用於 AWS Python (Boto3) 的 SDK API 參考》中的 UpdateJobPriority。
-
以下程式碼範例顯示如何使用 UpdateJobStatus。
- 適用於 Python 的 SDK (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫
中設定和執行。 def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise-
如需 API 詳細資訊,請參閱《適用於 AWS Python (Boto3) 的 SDK API 參考》中的 UpdateJobStatus。
-