文档 AWS SDK 示例 GitHub 存储库中还有更多 S AWS DK 示例
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
学习 AWS Control Tower 使用 AWS SDK 的基础知识
以下代码示例演示了如何:
列出着陆区。
列出、启用、获取、重置和禁用基准。
列出、启用、获取和禁用控件。
- .NET
-
- 适用于 .NET 的 SDK (v4)
-
注意
还有更多相关信息 GitHub。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 运行演示 AWS Control Tower 功能的交互式场景。
using Amazon.ControlCatalog; using Amazon.ControlTower; using Amazon.ControlTower.Model; using Amazon.Organizations; using Amazon.Organizations.Model; using Amazon.SecurityToken; using Amazon.SecurityToken.Model; using ControlTowerActions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace ControlTowerBasics; /// <summary> /// Scenario class for AWS Control Tower basics. /// </summary> public class ControlTowerBasics { public static bool isInteractive = true; public static ILogger logger = null!; public static IAmazonOrganizations? orgClient = null; public static IAmazonSecurityTokenService? stsClient = null; public static ControlTowerWrapper? wrapper = null; private static string? ouArn; private static bool useLandingZone = false; /// <summary> /// Main entry point for the AWS Control Tower basics scenario. /// </summary> /// <param name="args">Command line arguments.</param> public static async Task Main(string[] args) { using var host = Host.CreateDefaultBuilder(args) .ConfigureServices((_, services) => services.AddAWSService<IAmazonControlTower>() .AddAWSService<IAmazonControlCatalog>() .AddAWSService<IAmazonOrganizations>() .AddAWSService<IAmazonSecurityTokenService>() .AddTransient<ControlTowerWrapper>() ) .Build(); logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<ControlTowerBasics>(); wrapper = host.Services.GetRequiredService<ControlTowerWrapper>(); orgClient = host.Services.GetRequiredService<IAmazonOrganizations>(); stsClient = host.Services.GetRequiredService<IAmazonSecurityTokenService>(); await RunScenario(); } /// <summary> /// Runs the example scenario. /// </summary> public static async Task RunScenario() { Console.WriteLine(new string('-', 88)); Console.WriteLine("\tWelcome to the AWS Control Tower with ControlCatalog example scenario."); Console.WriteLine(new string('-', 88)); Console.WriteLine("This demo will walk you through working with AWS Control Tower for landing zones,"); Console.WriteLine("managing baselines, and working with controls."); try { var accountId = (await stsClient!.GetCallerIdentityAsync(new GetCallerIdentityRequest())).Account; Console.WriteLine($"\nAccount ID: {accountId}"); Console.WriteLine("\nSome demo operations require the use of a landing zone."); Console.WriteLine("You can use an existing landing zone or opt out of these operations in the demo."); Console.WriteLine("For instructions on how to set up a landing zone,"); Console.WriteLine("see https://docs.aws.amazon.com/controltower/latest/userguide/getting-started-from-console.html"); // List available landing zones var landingZones = await wrapper!.ListLandingZonesAsync(); if (landingZones.Count > 0) { Console.WriteLine("\nAvailable Landing Zones:"); for (int i = 0; i < landingZones.Count; i++) { Console.WriteLine($"{i + 1}. {landingZones[i].Arn}"); } Console.Write($"\nDo you want to use the first landing zone in the list ({landingZones[0].Arn})? (y/n): "); if (GetUserConfirmation()) { useLandingZone = true; Console.WriteLine($"Using landing zone: {landingZones[0].Arn}"); ouArn = await SetupOrganizationAsync(); } } // Managing Baselines Console.WriteLine("\nManaging Baselines:"); var baselines = await wrapper.ListBaselinesAsync(); Console.WriteLine("\nListing available Baselines:"); BaselineSummary? controlTowerBaseline = null; foreach (var baseline in baselines) { if (baseline.Name == "AWSControlTowerBaseline") controlTowerBaseline = baseline; Console.WriteLine($" - {baseline.Name}"); } EnabledBaselineSummary? identityCenterBaseline = null; string? baselineArn = null; if (useLandingZone && ouArn != null) { Console.WriteLine("\nListing enabled baselines:"); var enabledBaselines = await wrapper.ListEnabledBaselinesAsync(); foreach (var baseline in enabledBaselines) { if (baseline.BaselineIdentifier.Contains("baseline/LN25R72TTG6IGPTQ")) identityCenterBaseline = baseline; Console.WriteLine($" - {baseline.BaselineIdentifier}"); } if (controlTowerBaseline != null) { Console.Write("\nDo you want to enable the Control Tower Baseline? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine("\nEnabling Control Tower Baseline."); var icBaselineArn = identityCenterBaseline?.Arn; baselineArn = await wrapper.EnableBaselineAsync(ouArn, controlTowerBaseline.Arn, "4.0", icBaselineArn ?? ""); var alreadyEnabled = false; if (baselineArn != null) { Console.WriteLine($"Enabled baseline ARN: {baselineArn}"); } else { // Find the enabled baseline foreach (var enabled in enabledBaselines) { if (enabled.BaselineIdentifier == controlTowerBaseline.Arn) { baselineArn = enabled.Arn; break; } } alreadyEnabled = true; Console.WriteLine("No change, the selected baseline was already enabled."); } if (baselineArn != null) { Console.Write("\nDo you want to reset the Control Tower Baseline? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine($"\nResetting Control Tower Baseline: {baselineArn}"); var operationId = await wrapper.ResetEnabledBaselineAsync(baselineArn); Console.WriteLine($"Reset baseline operation id: {operationId}"); } Console.Write("\nDo you want to disable the Control Tower Baseline? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine($"Disabling baseline ARN: {baselineArn}"); var operationId = await wrapper.DisableBaselineAsync(baselineArn); Console.WriteLine($"Disabled baseline operation id: {operationId}"); if (alreadyEnabled) { Console.WriteLine($"\nRe-enabling Control Tower Baseline: {baselineArn}"); // Re-enable the Control Tower baseline if it was originally enabled. await wrapper.EnableBaselineAsync(ouArn, controlTowerBaseline.Arn, "4.0", icBaselineArn ?? ""); } } } } } } // Managing Controls Console.WriteLine("\nManaging Controls:"); var controls = await wrapper.ListControlsAsync(); Console.WriteLine("\nListing first 5 available Controls:"); for (int i = 0; i < Math.Min(5, controls.Count); i++) { Console.WriteLine($"{i + 1}. {controls[i].Name} - {controls[i].Arn}"); } if (useLandingZone && ouArn != null) { var enabledControls = await wrapper.ListEnabledControlsAsync(ouArn); Console.WriteLine("\nListing enabled controls:"); for (int i = 0; i < enabledControls.Count; i++) { Console.WriteLine($"{i + 1}. {enabledControls[i].ControlIdentifier}"); } // Find first non-enabled control var enabledControlArns = enabledControls.Select(c => c.Arn).ToHashSet(); var controlArn = controls.FirstOrDefault(c => !enabledControlArns.Contains(c.Arn))?.Arn; if (controlArn != null) { Console.Write($"\nDo you want to enable the control {controlArn}? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine($"\nEnabling control: {controlArn}"); var operationId = await wrapper.EnableControlAsync(controlArn, ouArn); if (operationId != null) { Console.WriteLine($"Enabled control with operation id: {operationId}"); Console.Write("\nDo you want to disable the control? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine("\nDisabling the control..."); var disableOpId = await wrapper.DisableControlAsync(controlArn, ouArn); Console.WriteLine($"Disable operation ID: {disableOpId}"); } } } } } Console.WriteLine("\nThis concludes the example scenario."); Console.WriteLine("Thanks for watching!"); Console.WriteLine(new string('-', 88)); } catch (Exception ex) { logger.LogError(ex, "An error occurred during the Control Tower scenario."); Console.WriteLine($"An error occurred: {ex.Message}"); } } /// <summary> /// Sets up AWS Organizations and creates or finds a Sandbox OU. /// </summary> /// <returns>The ARN of the Sandbox organizational unit.</returns> private static async Task<string> SetupOrganizationAsync() { Console.WriteLine("\nChecking organization status..."); try { var orgResponse = await orgClient!.DescribeOrganizationAsync(new DescribeOrganizationRequest()); var orgId = orgResponse.Organization.Id; Console.WriteLine($"Account is part of organization: {orgId}"); } catch (AWSOrganizationsNotInUseException) { Console.WriteLine("No organization found. Creating a new organization..."); var createResponse = await orgClient!.CreateOrganizationAsync(new CreateOrganizationRequest { FeatureSet = OrganizationFeatureSet.ALL }); var orgId = createResponse.Organization.Id; Console.WriteLine($"Created new organization: {orgId}"); } // Look for Sandbox OU var roots = await orgClient.ListRootsAsync(new ListRootsRequest()); var rootId = roots.Roots[0].Id; Console.WriteLine("Checking for Sandbox OU..."); var ous = await orgClient.ListOrganizationalUnitsForParentAsync(new ListOrganizationalUnitsForParentRequest { ParentId = rootId }); var sandboxOu = ous.OrganizationalUnits.FirstOrDefault(ou => ou.Name == "Sandbox"); if (sandboxOu == null) { Console.WriteLine("Creating Sandbox OU..."); var createOuResponse = await orgClient.CreateOrganizationalUnitAsync(new CreateOrganizationalUnitRequest { ParentId = rootId, Name = "Sandbox" }); sandboxOu = createOuResponse.OrganizationalUnit; Console.WriteLine($"Created new Sandbox OU: {sandboxOu.Id}"); } else { Console.WriteLine($"Found existing Sandbox OU: {sandboxOu.Id}"); } return sandboxOu.Arn; } /// <summary> /// Gets user confirmation by waiting for input or returning true if not interactive. /// </summary> /// <returns>True if user enters 'y' or if isInteractive is false, otherwise false.</returns> private static bool GetUserConfirmation() { return Console.ReadLine()?.ToLower() == "y" || !isInteractive; } }
场景调用以管理 Aurora 操作的包装程序方法。
using Amazon.ControlCatalog; using Amazon.ControlCatalog.Model; using Amazon.ControlTower; using Amazon.ControlTower.Model; using ValidationException = Amazon.ControlTower.Model.ValidationException; namespace ControlTowerActions; /// <summary> /// Methods to perform AWS Control Tower actions. /// </summary> public class ControlTowerWrapper { private readonly IAmazonControlTower _controlTowerService; private readonly IAmazonControlCatalog _controlCatalogService; /// <summary> /// Constructor for the wrapper class containing AWS Control Tower actions. /// </summary> /// <param name="controlTowerService">The AWS Control Tower client object.</param> /// <param name="controlCatalogService">The AWS Control Catalog client object.</param> public ControlTowerWrapper(IAmazonControlTower controlTowerService, IAmazonControlCatalog controlCatalogService) { _controlTowerService = controlTowerService; _controlCatalogService = controlCatalogService; } /// <summary> /// List the AWS Control Tower landing zones for an account. /// </summary> /// <returns>A list of LandingZoneSummary objects.</returns> public async Task<List<LandingZoneSummary>> ListLandingZonesAsync() { try { var landingZones = new List<LandingZoneSummary>(); var landingZonesPaginator = _controlTowerService.Paginators.ListLandingZones(new ListLandingZonesRequest()); await foreach (var response in landingZonesPaginator.Responses) { landingZones.AddRange(response.LandingZones); } return landingZones; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list landing zones. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List all baselines. /// </summary> /// <returns>A list of baseline summaries.</returns> public async Task<List<BaselineSummary>> ListBaselinesAsync() { try { var baselines = new List<BaselineSummary>(); var baselinesPaginator = _controlTowerService.Paginators.ListBaselines(new ListBaselinesRequest()); await foreach (var response in baselinesPaginator.Responses) { baselines.AddRange(response.Baselines); } return baselines; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list baselines. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List all enabled baselines. /// </summary> /// <returns>A list of enabled baseline summaries.</returns> public async Task<List<EnabledBaselineSummary>> ListEnabledBaselinesAsync() { try { var enabledBaselines = new List<EnabledBaselineSummary>(); var enabledBaselinesPaginator = _controlTowerService.Paginators.ListEnabledBaselines(new ListEnabledBaselinesRequest()); await foreach (var response in enabledBaselinesPaginator.Responses) { enabledBaselines.AddRange(response.EnabledBaselines); } return enabledBaselines; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list enabled baselines. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Enable a baseline for the specified target. /// </summary> /// <param name="targetIdentifier">The ARN of the target.</param> /// <param name="baselineIdentifier">The identifier of baseline to enable.</param> /// <param name="baselineVersion">The version of baseline to enable.</param> /// <param name="identityCenterBaseline">The identifier of identity center baseline if it is enabled.</param> /// <returns>The enabled baseline ARN or null if already enabled.</returns> public async Task<string?> EnableBaselineAsync(string targetIdentifier, string baselineIdentifier, string baselineVersion, string identityCenterBaseline) { try { var parameters = new List<EnabledBaselineParameter> { new EnabledBaselineParameter { Key = "IdentityCenterEnabledBaselineArn", Value = identityCenterBaseline } }; var request = new EnableBaselineRequest { BaselineIdentifier = baselineIdentifier, BaselineVersion = baselineVersion, TargetIdentifier = targetIdentifier, Parameters = parameters }; var response = await _controlTowerService.EnableBaselineAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetBaselineOperationAsync(operationId); Console.WriteLine($"Baseline operation status: {status}"); if (status == BaselineOperationStatus.SUCCEEDED || status == BaselineOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return response.Arn; } catch (ValidationException ex) when (ex.Message.Contains("already enabled")) { Console.WriteLine("Baseline is already enabled for this target"); return null; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't enable baseline. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Disable a baseline for a specific target and wait for the operation to complete. /// </summary> /// <param name="enabledBaselineIdentifier">The identifier of the baseline to disable.</param> /// <returns>The operation ID or null if there was a conflict.</returns> public async Task<string?> DisableBaselineAsync(string enabledBaselineIdentifier) { try { var request = new DisableBaselineRequest { EnabledBaselineIdentifier = enabledBaselineIdentifier }; var response = await _controlTowerService.DisableBaselineAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetBaselineOperationAsync(operationId); Console.WriteLine($"Baseline operation status: {status}"); if (status == BaselineOperationStatus.SUCCEEDED || status == BaselineOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (ConflictException ex) { Console.WriteLine($"Conflict disabling baseline: {ex.Message}. Skipping disable step."); return null; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't disable baseline. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Reset an enabled baseline for a specific target. /// </summary> /// <param name="enabledBaselineIdentifier">The identifier of the enabled baseline to reset.</param> /// <returns>The operation ID.</returns> public async Task<string> ResetEnabledBaselineAsync(string enabledBaselineIdentifier) { try { var request = new ResetEnabledBaselineRequest { EnabledBaselineIdentifier = enabledBaselineIdentifier }; var response = await _controlTowerService.ResetEnabledBaselineAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetBaselineOperationAsync(operationId); Console.WriteLine($"Baseline operation status: {status}"); if (status == BaselineOperationStatus.SUCCEEDED || status == BaselineOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Target not found, unable to reset enabled baseline."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't reset enabled baseline. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Get the status of a baseline operation. /// </summary> /// <param name="operationId">The ID of the baseline operation.</param> /// <returns>The operation status.</returns> public async Task<BaselineOperationStatus> GetBaselineOperationAsync(string operationId) { try { var request = new GetBaselineOperationRequest { OperationIdentifier = operationId }; var response = await _controlTowerService.GetBaselineOperationAsync(request); return response.BaselineOperation.Status; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Operation not found."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't get baseline operation status. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List enabled controls for a target organizational unit. /// </summary> /// <param name="targetIdentifier">The target organizational unit identifier.</param> /// <returns>A list of enabled control summaries.</returns> public async Task<List<EnabledControlSummary>> ListEnabledControlsAsync(string targetIdentifier) { try { var request = new ListEnabledControlsRequest { TargetIdentifier = targetIdentifier }; var enabledControls = new List<EnabledControlSummary>(); var enabledControlsPaginator = _controlTowerService.Paginators.ListEnabledControls(request); await foreach (var response in enabledControlsPaginator.Responses) { enabledControls.AddRange(response.EnabledControls); } return enabledControls; } catch (Amazon.ControlTower.Model.ResourceNotFoundException ex) when (ex.Message.Contains("not registered with AWS Control Tower")) { Console.WriteLine("AWS Control Tower must be enabled to work with enabling controls."); return new List<EnabledControlSummary>(); } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list enabled controls. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Enable a control for a specified target. /// </summary> /// <param name="controlArn">The ARN of the control to enable.</param> /// <param name="targetIdentifier">The identifier of the target (e.g., OU ARN).</param> /// <returns>The operation ID or null if already enabled.</returns> public async Task<string?> EnableControlAsync(string controlArn, string targetIdentifier) { try { Console.WriteLine(controlArn); Console.WriteLine(targetIdentifier); var request = new EnableControlRequest { ControlIdentifier = controlArn, TargetIdentifier = targetIdentifier }; var response = await _controlTowerService.EnableControlAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetControlOperationAsync(operationId); Console.WriteLine($"Control operation status: {status}"); if (status == ControlOperationStatus.SUCCEEDED || status == ControlOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (Amazon.ControlTower.Model.ValidationException ex) when (ex.Message.Contains("already enabled")) { Console.WriteLine("Control is already enabled for this target"); return null; } catch (Amazon.ControlTower.Model.ResourceNotFoundException ex) when (ex.Message.Contains("not registered with AWS Control Tower")) { Console.WriteLine("AWS Control Tower must be enabled to work with enabling controls."); return null; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't enable control. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Disable a control for a specified target. /// </summary> /// <param name="controlArn">The ARN of the control to disable.</param> /// <param name="targetIdentifier">The identifier of the target (e.g., OU ARN).</param> /// <returns>The operation ID.</returns> public async Task<string> DisableControlAsync(string controlArn, string targetIdentifier) { try { var request = new DisableControlRequest { ControlIdentifier = controlArn, TargetIdentifier = targetIdentifier }; var response = await _controlTowerService.DisableControlAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetControlOperationAsync(operationId); Console.WriteLine($"Control operation status: {status}"); if (status == ControlOperationStatus.SUCCEEDED || status == ControlOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Control not found."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't disable control. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Get the status of a control operation. /// </summary> /// <param name="operationId">The ID of the control operation.</param> /// <returns>The operation status.</returns> public async Task<ControlOperationStatus> GetControlOperationAsync(string operationId) { try { var request = new GetControlOperationRequest { OperationIdentifier = operationId }; var response = await _controlTowerService.GetControlOperationAsync(request); return response.ControlOperation.Status; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Operation not found."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't get control operation status. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List all controls in the Control Tower control catalog. /// </summary> /// <returns>A list of control summaries.</returns> public async Task<List<ControlSummary>> ListControlsAsync() { try { var controls = new List<ControlSummary>(); var controlsPaginator = _controlCatalogService.Paginators.ListControls(new Amazon.ControlCatalog.Model.ListControlsRequest()); await foreach (var response in controlsPaginator.Responses) { controls.AddRange(response.Controls); } return controls; } catch (AmazonControlCatalogException ex) { Console.WriteLine($"Couldn't list controls. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } }
- Python
-
- 适用于 Python 的 SDK (Boto3)
-
注意
还有更多相关信息 GitHub。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 运行演示 AWS Control Tower 功能的交互式场景。
class ControlTowerScenario: IDENTITY_CENTER_BASELINE = "baseline/LN25R72TTG6IGPTQ" stack_name = "" def __init__( self, controltower_wrapper: ControlTowerWrapper, org_client: boto3.client ): """ :param controltower_wrapper: An instance of the ControlTowerWrapper class. :param org_client: A Boto3 Organization client. """ self.controltower_wrapper = controltower_wrapper self.org_client = org_client self.stack = None self.ou_id = None self.ou_arn = None self.account_id = None self.landing_zone_id = None self.use_landing_zone = False def run_scenario(self) -> None: print("-" * 88) print( "\tWelcome to the AWS Control Tower with ControlCatalog example scenario." ) print("-" * 88) print( "This demo will walk you through working with AWS Control Tower for landing zones,\n" "managing baselines, and working with controls." ) self.account_id = boto3.client("sts").get_caller_identity()["Account"] print( "Some demo operations require the use of a landing zone. " "\nYou can use an existing landing zone or opt out of these operations in the demo." "\nFor instructions on how to set up a landing zone, " "\nsee https://docs.aws.amazon.com/controltower/latest/userguide/getting-started-from-console.html" ) # List available landing zones landing_zones = self.controltower_wrapper.list_landing_zones() if landing_zones: print("\nAvailable Landing Zones:") for i, lz in enumerate(landing_zones, 1): print(f"{i} {lz['arn']})") # Ask if user wants to use the first landing zone in the list if q.ask( f"Do you want to use the first landing zone in the list ({landing_zones[0]['arn']})? (y/n) ", q.is_yesno, ): self.use_landing_zone = True self.landing_zone_id = landing_zones[0]["arn"] print(f"Using landing zone ID: {self.landing_zone_id})") # Set up organization and get Sandbox OU ID. sandbox_ou_id = self.setup_organization() # Store the OU ID for use in the CloudFormation template. self.ou_id = sandbox_ou_id elif q.ask( f"Do you want to use a different existing Landing Zone for this demo? (y/n) ", q.is_yesno, ): self.use_landing_zone = True self.landing_zone_id = q.ask("Enter landing zone id: ", q.non_empty) # Set up organization and get Sandbox OU ID. sandbox_ou_id = self.setup_organization() # Store the OU ID for use in the CloudFormation template. self.ou_id = sandbox_ou_id # List and Enable Baseline. print("\nManaging Baselines:") control_tower_baseline = None identity_center_baseline = None baselines = self.controltower_wrapper.list_baselines() print("\nListing available Baselines:") for baseline in baselines: if baseline["name"] == "AWSControlTowerBaseline": control_tower_baseline = baseline print(f"{baseline['name']}") if self.use_landing_zone: print("\nListing enabled baselines:") enabled_baselines = self.controltower_wrapper.list_enabled_baselines() for baseline in enabled_baselines: # If the Identity Center baseline is enabled, the identifier must be used for other baselines. if self.IDENTITY_CENTER_BASELINE in baseline["baselineIdentifier"]: identity_center_baseline = baseline print(f"{baseline['baselineIdentifier']}") if q.ask( f"Do you want to enable the Control Tower Baseline? (y/n) ", q.is_yesno, ): print("\nEnabling Control Tower Baseline.") ic_baseline_arn = ( identity_center_baseline["arn"] if identity_center_baseline else None ) baseline_arn = self.controltower_wrapper.enable_baseline( self.ou_arn, ic_baseline_arn, control_tower_baseline["arn"], "4.0" ) if baseline_arn: print(f"Enabled baseline ARN: {baseline_arn}") else: # Find the enabled baseline so we can reset it. for enabled_baseline in enabled_baselines: if ( enabled_baseline["baselineIdentifier"] == control_tower_baseline["arn"] ): baseline_arn = enabled_baseline["arn"] print("No change, the selected baseline was already enabled.") if q.ask( f"Do you want to reset the Control Tower Baseline? (y/n) ", q.is_yesno, ): print(f"\nResetting Control Tower Baseline. {baseline_arn}") operation_id = self.controltower_wrapper.reset_enabled_baseline( baseline_arn ) print(f"\nReset baseline operation id {operation_id}.") if baseline_arn and q.ask( f"Do you want to disable the Control Tower Baseline? (y/n) ", q.is_yesno, ): print(f"Disabling baseline ARN: {baseline_arn}") operation_id = self.controltower_wrapper.disable_baseline( baseline_arn ) print(f"\nDisabled baseline operation id {operation_id}.") # Re-enable the baseline for the next step. print("\nEnabling Control Tower Baseline.") self.controltower_wrapper.enable_baseline( self.ou_arn, ic_baseline_arn, control_tower_baseline["arn"], "4.0", ) # List and Enable Controls. print("\nManaging Controls:") controls = self.controltower_wrapper.list_controls() print("\nListing first 5 available Controls:") for i, control in enumerate(controls[:5], 1): print(f"{i}. {control['Name']} - {control['Arn']}") if self.use_landing_zone: target_ou = self.ou_arn enabled_controls = self.controltower_wrapper.list_enabled_controls( target_ou ) print("\nListing enabled controls:") for i, control in enumerate(enabled_controls, 1): print(f"{i}. {control['controlIdentifier']}") # Enable first non-enabled control as an example. enabled_control_arns = [control["arn"] for control in enabled_controls] control_arn = next( control["Arn"] for control in controls if control["Arn"] not in enabled_control_arns ) if control_arn and q.ask( f"Do you want to enable the control {control_arn}? (y/n) ", q.is_yesno, ): print(f"\nEnabling control: {control_arn}") operation_id = self.controltower_wrapper.enable_control( control_arn, target_ou ) if operation_id: print(f"Enabled control with operation id {operation_id}") if control_arn and q.ask( f"Do you want to disable the control? (y/n) ", q.is_yesno, ): print("\nDisabling the control...") operation_id = self.controltower_wrapper.disable_control( control_arn, target_ou ) print(f"Disable operation ID: {operation_id}") print("\nThis concludes the example scenario.") print("Thanks for watching!") print("-" * 88) def setup_organization(self): """ Checks if the current account is part of an organization and creates one if needed. Also ensures a Sandbox OU exists and returns its ID. :return: The ID of the Sandbox OU """ print("\nChecking organization status...") try: # Check if account is part of an organization org_response = self.org_client.describe_organization() org_id = org_response["Organization"]["Id"] print(f"Account is part of organization: {org_id}") except ClientError as error: if error.response["Error"]["Code"] == "AWSOrganizationsNotInUseException": print("No organization found. Creating a new organization...") try: create_response = self.org_client.create_organization( FeatureSet="ALL" ) org_id = create_response["Organization"]["Id"] print(f"Created new organization: {org_id}") # Wait for organization to be available. waiter = self.org_client.get_waiter("organization_active") waiter.wait( Organization=org_id, WaiterConfig={"Delay": 5, "MaxAttempts": 12}, ) except ClientError as create_error: logger.error( "Couldn't create organization. Here's why: %s: %s", create_error.response["Error"]["Code"], create_error.response["Error"]["Message"], ) raise else: logger.error( "Couldn't describe organization. Here's why: %s: %s", error.response["Error"]["Code"], error.response["Error"]["Message"], ) raise # Look for Sandbox OU. sandbox_ou_id = None paginator = self.org_client.get_paginator( "list_organizational_units_for_parent" ) try: # Get root ID first. roots = self.org_client.list_roots()["Roots"] if not roots: raise ValueError("No root found in organization") root_id = roots[0]["Id"] # Search for existing Sandbox OU. print("Checking for Sandbox OU...") for page in paginator.paginate(ParentId=root_id): for ou in page["OrganizationalUnits"]: if ou["Name"] == "Sandbox": sandbox_ou_id = ou["Id"] self.ou_arn = ou["Arn"] print(f"Found existing Sandbox OU: {sandbox_ou_id}") break if sandbox_ou_id: break # Create Sandbox OU if it doesn't exist. if not sandbox_ou_id: print("Creating Sandbox OU...") create_ou_response = self.org_client.create_organizational_unit( ParentId=root_id, Name="Sandbox" ) sandbox_ou_id = create_ou_response["OrganizationalUnit"]["Id"] print(f"Created new Sandbox OU: {sandbox_ou_id}") # Wait for OU to be available. waiter = self.org_client.get_waiter("organizational_unit_active") waiter.wait( OrganizationalUnitId=sandbox_ou_id, WaiterConfig={"Delay": 5, "MaxAttempts": 12}, ) except ClientError as error: logger.error( "Couldn't set up Sandbox OU. Here's why: %s: %s", error.response["Error"]["Code"], error.response["Error"]["Message"], ) raise return sandbox_ou_id if __name__ == "__main__": try: org = boto3.client("organizations") control_tower_wrapper = ControlTowerWrapper.from_client() scenario = ControlTowerScenario(control_tower_wrapper, org) scenario.run_scenario() except Exception: logging.exception("Something went wrong with the scenario.") class ControlTowerWrapper: """Encapsulates AWS Control Tower and Control Catalog functionality.""" def __init__( self, controltower_client: boto3.client, controlcatalog_client: boto3.client ): """ :param controltower_client: A Boto3 Amazon ControlTower client. :param controlcatalog_client: A Boto3 Amazon ControlCatalog client. """ self.controltower_client = controltower_client self.controlcatalog_client = controlcatalog_client @classmethod def from_client(cls): controltower_client = boto3.client("controltower") controlcatalog_client = boto3.client("controlcatalog") return cls(controltower_client, controlcatalog_client) def list_baselines(self): """ Lists all baselines. :return: List of baselines. :raises ClientError: If the listing operation fails. """ try: paginator = self.controltower_client.get_paginator("list_baselines") baselines = [] for page in paginator.paginate(): baselines.extend(page["baselines"]) return baselines except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) else: logger.error( "Couldn't list baselines. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def enable_baseline( self, target_identifier: str, identity_center_baseline: str, baseline_identifier: str, baseline_version: str, ): """ Enables a baseline for the specified target if it's not already enabled. :param target_identifier: The ARN of the target. :param baseline_identifier: The identifier of baseline to enable. :param identity_center_baseline: The identifier of identity center baseline if it is enabled. :param baseline_version: The version of baseline to enable. :return: The enabled baseline ARN or None if already enabled. :raises ClientError: If enabling the baseline fails for reasons other than it being already enabled. """ try: response = self.controltower_client.enable_baseline( baselineIdentifier=baseline_identifier, baselineVersion=baseline_version, targetIdentifier=target_identifier, parameters=[ { "key": "IdentityCenterEnabledBaselineArn", "value": identity_center_baseline, } ], ) operation_id = response["operationIdentifier"] while True: status = self.get_baseline_operation(operation_id) print(f"Baseline operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return response["arn"] except ClientError as err: if err.response["Error"]["Code"] == "ValidationException": if "already enabled" in err.response["Error"]["Message"]: print("Baseline is already enabled for this target") return None else: print( "Unable to enable baseline due to validation exception: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) logger.error( "Couldn't enable baseline. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_controls(self): """ Lists all controls in the Control Tower control catalog. :return: List of controls. :raises ClientError: If the listing operation fails. """ try: paginator = self.controlcatalog_client.get_paginator("list_controls") controls = [] for page in paginator.paginate(): controls.extend(page["Controls"]) return controls except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) else: logger.error( "Couldn't list controls. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def enable_control(self, control_arn: str, target_identifier: str): """ Enables a control for a specified target. :param control_arn: The ARN of the control to enable. :param target_identifier: The identifier of the target (e.g., OU ARN). :return: The operation ID. :raises ClientError: If enabling the control fails. """ try: print(control_arn) print(target_identifier) response = self.controltower_client.enable_control( controlIdentifier=control_arn, targetIdentifier=target_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_control_operation(operation_id) print(f"Control operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return operation_id except ClientError as err: if ( err.response["Error"]["Code"] == "ValidationException" and "already enabled" in err.response["Error"]["Message"] ): logger.info("Control is already enabled for this target") return None elif ( err.response["Error"]["Code"] == "ResourceNotFoundException" and "not registered with AWS Control Tower" in err.response["Error"]["Message"] ): logger.error("Control Tower must be enabled to work with controls.") return None logger.error( "Couldn't enable control. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_control_operation(self, operation_id: str): """ Gets the status of a control operation. :param operation_id: The ID of the control operation. :return: The operation status. :raises ClientError: If getting the operation status fails. """ try: response = self.controltower_client.get_control_operation( operationIdentifier=operation_id ) return response["controlOperation"]["status"] except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Operation not found.") else: logger.error( "Couldn't get control operation status. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_baseline_operation(self, operation_id: str): """ Gets the status of a baseline operation. :param operation_id: The ID of the baseline operation. :return: The operation status. :raises ClientError: If getting the operation status fails. """ try: response = self.controltower_client.get_baseline_operation( operationIdentifier=operation_id ) return response["baselineOperation"]["status"] except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Operation not found.") else: logger.error( "Couldn't get baseline operation status. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def disable_control(self, control_arn: str, target_identifier: str): """ Disables a control for a specified target. :param control_arn: The ARN of the control to disable. :param target_identifier: The identifier of the target (e.g., OU ARN). :return: The operation ID. :raises ClientError: If disabling the control fails. """ try: response = self.controltower_client.disable_control( controlIdentifier=control_arn, targetIdentifier=target_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_control_operation(operation_id) print(f"Control operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return operation_id except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Control not found.") else: logger.error( "Couldn't disable control. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_landing_zones(self): """ Lists all landing zones. :return: List of landing zones. :raises ClientError: If the listing operation fails. """ try: paginator = self.controltower_client.get_paginator("list_landing_zones") landing_zones = [] for page in paginator.paginate(): landing_zones.extend(page["landingZones"]) return landing_zones except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) else: logger.error( "Couldn't list landing zones. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_enabled_baselines(self): """ Lists all enabled baselines. :return: List of enabled baselines. :raises ClientError: If the listing operation fails. """ try: paginator = self.controltower_client.get_paginator("list_enabled_baselines") enabled_baselines = [] for page in paginator.paginate(): enabled_baselines.extend(page["enabledBaselines"]) return enabled_baselines except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Target not found.") else: logger.error( "Couldn't list enabled baselines. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def reset_enabled_baseline(self, enabled_baseline_identifier: str): """ Resets an enabled baseline for a specific target. :param enabled_baseline_identifier: The identifier of the enabled baseline to reset. :return: The operation ID. :raises ClientError: If resetting the baseline fails. """ try: response = self.controltower_client.reset_enabled_baseline( enabledBaselineIdentifier=enabled_baseline_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_baseline_operation(operation_id) print(f"Baseline operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return operation_id except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Target not found.") else: logger.error( "Couldn't reset enabled baseline. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def disable_baseline(self, enabled_baseline_identifier: str): """ Disables a baseline for a specific target and waits for the operation to complete. :param enabled_baseline_identifier: The identifier of the baseline to disable. :return: The operation ID. :raises ClientError: If disabling the baseline fails. """ try: response = self.controltower_client.disable_baseline( enabledBaselineIdentifier=enabled_baseline_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_baseline_operation(operation_id) print(f"Baseline operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return response["operationIdentifier"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": print( f"Conflict disabling baseline: {err.response['Error']['Message']}. Skipping disable step." ) return None else: logger.error( "Couldn't disable baseline. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_enabled_controls(self, target_identifier: str): """ Lists all enabled controls for a specific target. :param target_identifier: The identifier of the target (e.g., OU ARN). :return: List of enabled controls. :raises ClientError: If the listing operation fails. """ enabled_controls = [] try: paginator = self.controltower_client.get_paginator("list_enabled_controls") for page in paginator.paginate(targetIdentifier=target_identifier): enabled_controls.extend(page["enabledControls"]) return enabled_controls except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) return enabled_controls elif ( err.response["Error"]["Code"] == "ResourceNotFoundException" and "not registered with AWS Control Tower" in err.response["Error"]["Message"] ): logger.error("Control Tower must be enabled to work with controls.") return enabled_controls else: logger.error( "Couldn't list enabled controls. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的以下主题。
-
你好 AWS Control Tower
操作