Lernen Sie die Grundlagen von AWS Control Tower mit einem SDK kennen AWS - AWS SDK-Codebeispiele

Weitere AWS SDK-Beispiele sind im Repo AWS Doc SDK Examples GitHub verfügbar.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Lernen Sie die Grundlagen von AWS Control Tower mit einem SDK kennen AWS

Die folgenden Code-Beispiele veranschaulichen Folgendes:

  • Landezonen auflisten.

  • Baselines auflisten, aktivieren, abrufen, zurücksetzen und deaktivieren.

  • Steuerelemente auflisten, aktivieren, abrufen und deaktivieren.

.NET
SDK für .NET (v4)
Anmerkung

Es gibt noch mehr dazu GitHub. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel- einrichten und ausführen.

Führen Sie ein interaktives Szenario durch, in dem AWS Control Tower Funktionen demonstriert werden.

using Amazon.ControlCatalog; using Amazon.ControlTower; using Amazon.ControlTower.Model; using Amazon.Organizations; using Amazon.Organizations.Model; using Amazon.SecurityToken; using Amazon.SecurityToken.Model; using ControlTowerActions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace ControlTowerBasics; /// <summary> /// Scenario class for AWS Control Tower basics. /// </summary> public class ControlTowerBasics { public static bool isInteractive = true; public static ILogger logger = null!; public static IAmazonOrganizations? orgClient = null; public static IAmazonSecurityTokenService? stsClient = null; public static ControlTowerWrapper? wrapper = null; private static string? ouArn; private static bool useLandingZone = false; /// <summary> /// Main entry point for the AWS Control Tower basics scenario. /// </summary> /// <param name="args">Command line arguments.</param> public static async Task Main(string[] args) { using var host = Host.CreateDefaultBuilder(args) .ConfigureServices((_, services) => services.AddAWSService<IAmazonControlTower>() .AddAWSService<IAmazonControlCatalog>() .AddAWSService<IAmazonOrganizations>() .AddAWSService<IAmazonSecurityTokenService>() .AddTransient<ControlTowerWrapper>() ) .Build(); logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<ControlTowerBasics>(); wrapper = host.Services.GetRequiredService<ControlTowerWrapper>(); orgClient = host.Services.GetRequiredService<IAmazonOrganizations>(); stsClient = host.Services.GetRequiredService<IAmazonSecurityTokenService>(); await RunScenario(); } /// <summary> /// Runs the example scenario. /// </summary> public static async Task RunScenario() { Console.WriteLine(new string('-', 88)); Console.WriteLine("\tWelcome to the AWS Control Tower with ControlCatalog example scenario."); Console.WriteLine(new string('-', 88)); Console.WriteLine("This demo will walk you through working with AWS Control Tower for landing zones,"); Console.WriteLine("managing baselines, and working with controls."); try { var accountId = (await stsClient!.GetCallerIdentityAsync(new GetCallerIdentityRequest())).Account; Console.WriteLine($"\nAccount ID: {accountId}"); Console.WriteLine("\nSome demo operations require the use of a landing zone."); Console.WriteLine("You can use an existing landing zone or opt out of these operations in the demo."); Console.WriteLine("For instructions on how to set up a landing zone,"); Console.WriteLine("see https://docs.aws.amazon.com/controltower/latest/userguide/getting-started-from-console.html"); // List available landing zones var landingZones = await wrapper!.ListLandingZonesAsync(); if (landingZones.Count > 0) { Console.WriteLine("\nAvailable Landing Zones:"); for (int i = 0; i < landingZones.Count; i++) { Console.WriteLine($"{i + 1}. {landingZones[i].Arn}"); } Console.Write($"\nDo you want to use the first landing zone in the list ({landingZones[0].Arn})? (y/n): "); if (GetUserConfirmation()) { useLandingZone = true; Console.WriteLine($"Using landing zone: {landingZones[0].Arn}"); ouArn = await SetupOrganizationAsync(); } } // Managing Baselines Console.WriteLine("\nManaging Baselines:"); var baselines = await wrapper.ListBaselinesAsync(); Console.WriteLine("\nListing available Baselines:"); BaselineSummary? controlTowerBaseline = null; foreach (var baseline in baselines) { if (baseline.Name == "AWSControlTowerBaseline") controlTowerBaseline = baseline; Console.WriteLine($" - {baseline.Name}"); } EnabledBaselineSummary? identityCenterBaseline = null; string? baselineArn = null; if (useLandingZone && ouArn != null) { Console.WriteLine("\nListing enabled baselines:"); var enabledBaselines = await wrapper.ListEnabledBaselinesAsync(); foreach (var baseline in enabledBaselines) { if (baseline.BaselineIdentifier.Contains("baseline/LN25R72TTG6IGPTQ")) identityCenterBaseline = baseline; Console.WriteLine($" - {baseline.BaselineIdentifier}"); } if (controlTowerBaseline != null) { Console.Write("\nDo you want to enable the Control Tower Baseline? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine("\nEnabling Control Tower Baseline."); var icBaselineArn = identityCenterBaseline?.Arn; baselineArn = await wrapper.EnableBaselineAsync(ouArn, controlTowerBaseline.Arn, "4.0", icBaselineArn ?? ""); var alreadyEnabled = false; if (baselineArn != null) { Console.WriteLine($"Enabled baseline ARN: {baselineArn}"); } else { // Find the enabled baseline foreach (var enabled in enabledBaselines) { if (enabled.BaselineIdentifier == controlTowerBaseline.Arn) { baselineArn = enabled.Arn; break; } } alreadyEnabled = true; Console.WriteLine("No change, the selected baseline was already enabled."); } if (baselineArn != null) { Console.Write("\nDo you want to reset the Control Tower Baseline? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine($"\nResetting Control Tower Baseline: {baselineArn}"); var operationId = await wrapper.ResetEnabledBaselineAsync(baselineArn); Console.WriteLine($"Reset baseline operation id: {operationId}"); } Console.Write("\nDo you want to disable the Control Tower Baseline? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine($"Disabling baseline ARN: {baselineArn}"); var operationId = await wrapper.DisableBaselineAsync(baselineArn); Console.WriteLine($"Disabled baseline operation id: {operationId}"); if (alreadyEnabled) { Console.WriteLine($"\nRe-enabling Control Tower Baseline: {baselineArn}"); // Re-enable the Control Tower baseline if it was originally enabled. await wrapper.EnableBaselineAsync(ouArn, controlTowerBaseline.Arn, "4.0", icBaselineArn ?? ""); } } } } } } // Managing Controls Console.WriteLine("\nManaging Controls:"); var controls = await wrapper.ListControlsAsync(); Console.WriteLine("\nListing first 5 available Controls:"); for (int i = 0; i < Math.Min(5, controls.Count); i++) { Console.WriteLine($"{i + 1}. {controls[i].Name} - {controls[i].Arn}"); } if (useLandingZone && ouArn != null) { var enabledControls = await wrapper.ListEnabledControlsAsync(ouArn); Console.WriteLine("\nListing enabled controls:"); for (int i = 0; i < enabledControls.Count; i++) { Console.WriteLine($"{i + 1}. {enabledControls[i].ControlIdentifier}"); } // Find first non-enabled control var enabledControlArns = enabledControls.Select(c => c.Arn).ToHashSet(); var controlArn = controls.FirstOrDefault(c => !enabledControlArns.Contains(c.Arn))?.Arn; if (controlArn != null) { Console.Write($"\nDo you want to enable the control {controlArn}? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine($"\nEnabling control: {controlArn}"); var operationId = await wrapper.EnableControlAsync(controlArn, ouArn); if (operationId != null) { Console.WriteLine($"Enabled control with operation id: {operationId}"); Console.Write("\nDo you want to disable the control? (y/n): "); if (GetUserConfirmation()) { Console.WriteLine("\nDisabling the control..."); var disableOpId = await wrapper.DisableControlAsync(controlArn, ouArn); Console.WriteLine($"Disable operation ID: {disableOpId}"); } } } } } Console.WriteLine("\nThis concludes the example scenario."); Console.WriteLine("Thanks for watching!"); Console.WriteLine(new string('-', 88)); } catch (Exception ex) { logger.LogError(ex, "An error occurred during the Control Tower scenario."); Console.WriteLine($"An error occurred: {ex.Message}"); } } /// <summary> /// Sets up AWS Organizations and creates or finds a Sandbox OU. /// </summary> /// <returns>The ARN of the Sandbox organizational unit.</returns> private static async Task<string> SetupOrganizationAsync() { Console.WriteLine("\nChecking organization status..."); try { var orgResponse = await orgClient!.DescribeOrganizationAsync(new DescribeOrganizationRequest()); var orgId = orgResponse.Organization.Id; Console.WriteLine($"Account is part of organization: {orgId}"); } catch (AWSOrganizationsNotInUseException) { Console.WriteLine("No organization found. Creating a new organization..."); var createResponse = await orgClient!.CreateOrganizationAsync(new CreateOrganizationRequest { FeatureSet = OrganizationFeatureSet.ALL }); var orgId = createResponse.Organization.Id; Console.WriteLine($"Created new organization: {orgId}"); } // Look for Sandbox OU var roots = await orgClient.ListRootsAsync(new ListRootsRequest()); var rootId = roots.Roots[0].Id; Console.WriteLine("Checking for Sandbox OU..."); var ous = await orgClient.ListOrganizationalUnitsForParentAsync(new ListOrganizationalUnitsForParentRequest { ParentId = rootId }); var sandboxOu = ous.OrganizationalUnits.FirstOrDefault(ou => ou.Name == "Sandbox"); if (sandboxOu == null) { Console.WriteLine("Creating Sandbox OU..."); var createOuResponse = await orgClient.CreateOrganizationalUnitAsync(new CreateOrganizationalUnitRequest { ParentId = rootId, Name = "Sandbox" }); sandboxOu = createOuResponse.OrganizationalUnit; Console.WriteLine($"Created new Sandbox OU: {sandboxOu.Id}"); } else { Console.WriteLine($"Found existing Sandbox OU: {sandboxOu.Id}"); } return sandboxOu.Arn; } /// <summary> /// Gets user confirmation by waiting for input or returning true if not interactive. /// </summary> /// <returns>True if user enters 'y' or if isInteractive is false, otherwise false.</returns> private static bool GetUserConfirmation() { return Console.ReadLine()?.ToLower() == "y" || !isInteractive; } }

Wrapper-Methoden, die vom Szenario aufgerufen werden, um Aurora-Aktionen zu verwalten.

using Amazon.ControlCatalog; using Amazon.ControlCatalog.Model; using Amazon.ControlTower; using Amazon.ControlTower.Model; using ValidationException = Amazon.ControlTower.Model.ValidationException; namespace ControlTowerActions; /// <summary> /// Methods to perform AWS Control Tower actions. /// </summary> public class ControlTowerWrapper { private readonly IAmazonControlTower _controlTowerService; private readonly IAmazonControlCatalog _controlCatalogService; /// <summary> /// Constructor for the wrapper class containing AWS Control Tower actions. /// </summary> /// <param name="controlTowerService">The AWS Control Tower client object.</param> /// <param name="controlCatalogService">The AWS Control Catalog client object.</param> public ControlTowerWrapper(IAmazonControlTower controlTowerService, IAmazonControlCatalog controlCatalogService) { _controlTowerService = controlTowerService; _controlCatalogService = controlCatalogService; } /// <summary> /// List the AWS Control Tower landing zones for an account. /// </summary> /// <returns>A list of LandingZoneSummary objects.</returns> public async Task<List<LandingZoneSummary>> ListLandingZonesAsync() { try { var landingZones = new List<LandingZoneSummary>(); var landingZonesPaginator = _controlTowerService.Paginators.ListLandingZones(new ListLandingZonesRequest()); await foreach (var response in landingZonesPaginator.Responses) { landingZones.AddRange(response.LandingZones); } return landingZones; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list landing zones. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List all baselines. /// </summary> /// <returns>A list of baseline summaries.</returns> public async Task<List<BaselineSummary>> ListBaselinesAsync() { try { var baselines = new List<BaselineSummary>(); var baselinesPaginator = _controlTowerService.Paginators.ListBaselines(new ListBaselinesRequest()); await foreach (var response in baselinesPaginator.Responses) { baselines.AddRange(response.Baselines); } return baselines; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list baselines. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List all enabled baselines. /// </summary> /// <returns>A list of enabled baseline summaries.</returns> public async Task<List<EnabledBaselineSummary>> ListEnabledBaselinesAsync() { try { var enabledBaselines = new List<EnabledBaselineSummary>(); var enabledBaselinesPaginator = _controlTowerService.Paginators.ListEnabledBaselines(new ListEnabledBaselinesRequest()); await foreach (var response in enabledBaselinesPaginator.Responses) { enabledBaselines.AddRange(response.EnabledBaselines); } return enabledBaselines; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list enabled baselines. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Enable a baseline for the specified target. /// </summary> /// <param name="targetIdentifier">The ARN of the target.</param> /// <param name="baselineIdentifier">The identifier of baseline to enable.</param> /// <param name="baselineVersion">The version of baseline to enable.</param> /// <param name="identityCenterBaseline">The identifier of identity center baseline if it is enabled.</param> /// <returns>The enabled baseline ARN or null if already enabled.</returns> public async Task<string?> EnableBaselineAsync(string targetIdentifier, string baselineIdentifier, string baselineVersion, string identityCenterBaseline) { try { var parameters = new List<EnabledBaselineParameter> { new EnabledBaselineParameter { Key = "IdentityCenterEnabledBaselineArn", Value = identityCenterBaseline } }; var request = new EnableBaselineRequest { BaselineIdentifier = baselineIdentifier, BaselineVersion = baselineVersion, TargetIdentifier = targetIdentifier, Parameters = parameters }; var response = await _controlTowerService.EnableBaselineAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetBaselineOperationAsync(operationId); Console.WriteLine($"Baseline operation status: {status}"); if (status == BaselineOperationStatus.SUCCEEDED || status == BaselineOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return response.Arn; } catch (ValidationException ex) when (ex.Message.Contains("already enabled")) { Console.WriteLine("Baseline is already enabled for this target"); return null; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't enable baseline. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Disable a baseline for a specific target and wait for the operation to complete. /// </summary> /// <param name="enabledBaselineIdentifier">The identifier of the baseline to disable.</param> /// <returns>The operation ID or null if there was a conflict.</returns> public async Task<string?> DisableBaselineAsync(string enabledBaselineIdentifier) { try { var request = new DisableBaselineRequest { EnabledBaselineIdentifier = enabledBaselineIdentifier }; var response = await _controlTowerService.DisableBaselineAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetBaselineOperationAsync(operationId); Console.WriteLine($"Baseline operation status: {status}"); if (status == BaselineOperationStatus.SUCCEEDED || status == BaselineOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (ConflictException ex) { Console.WriteLine($"Conflict disabling baseline: {ex.Message}. Skipping disable step."); return null; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't disable baseline. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Reset an enabled baseline for a specific target. /// </summary> /// <param name="enabledBaselineIdentifier">The identifier of the enabled baseline to reset.</param> /// <returns>The operation ID.</returns> public async Task<string> ResetEnabledBaselineAsync(string enabledBaselineIdentifier) { try { var request = new ResetEnabledBaselineRequest { EnabledBaselineIdentifier = enabledBaselineIdentifier }; var response = await _controlTowerService.ResetEnabledBaselineAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetBaselineOperationAsync(operationId); Console.WriteLine($"Baseline operation status: {status}"); if (status == BaselineOperationStatus.SUCCEEDED || status == BaselineOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Target not found, unable to reset enabled baseline."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't reset enabled baseline. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Get the status of a baseline operation. /// </summary> /// <param name="operationId">The ID of the baseline operation.</param> /// <returns>The operation status.</returns> public async Task<BaselineOperationStatus> GetBaselineOperationAsync(string operationId) { try { var request = new GetBaselineOperationRequest { OperationIdentifier = operationId }; var response = await _controlTowerService.GetBaselineOperationAsync(request); return response.BaselineOperation.Status; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Operation not found."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't get baseline operation status. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List enabled controls for a target organizational unit. /// </summary> /// <param name="targetIdentifier">The target organizational unit identifier.</param> /// <returns>A list of enabled control summaries.</returns> public async Task<List<EnabledControlSummary>> ListEnabledControlsAsync(string targetIdentifier) { try { var request = new ListEnabledControlsRequest { TargetIdentifier = targetIdentifier }; var enabledControls = new List<EnabledControlSummary>(); var enabledControlsPaginator = _controlTowerService.Paginators.ListEnabledControls(request); await foreach (var response in enabledControlsPaginator.Responses) { enabledControls.AddRange(response.EnabledControls); } return enabledControls; } catch (Amazon.ControlTower.Model.ResourceNotFoundException ex) when (ex.Message.Contains("not registered with AWS Control Tower")) { Console.WriteLine("AWS Control Tower must be enabled to work with enabling controls."); return new List<EnabledControlSummary>(); } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't list enabled controls. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Enable a control for a specified target. /// </summary> /// <param name="controlArn">The ARN of the control to enable.</param> /// <param name="targetIdentifier">The identifier of the target (e.g., OU ARN).</param> /// <returns>The operation ID or null if already enabled.</returns> public async Task<string?> EnableControlAsync(string controlArn, string targetIdentifier) { try { Console.WriteLine(controlArn); Console.WriteLine(targetIdentifier); var request = new EnableControlRequest { ControlIdentifier = controlArn, TargetIdentifier = targetIdentifier }; var response = await _controlTowerService.EnableControlAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetControlOperationAsync(operationId); Console.WriteLine($"Control operation status: {status}"); if (status == ControlOperationStatus.SUCCEEDED || status == ControlOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (Amazon.ControlTower.Model.ValidationException ex) when (ex.Message.Contains("already enabled")) { Console.WriteLine("Control is already enabled for this target"); return null; } catch (Amazon.ControlTower.Model.ResourceNotFoundException ex) when (ex.Message.Contains("not registered with AWS Control Tower")) { Console.WriteLine("AWS Control Tower must be enabled to work with enabling controls."); return null; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't enable control. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Disable a control for a specified target. /// </summary> /// <param name="controlArn">The ARN of the control to disable.</param> /// <param name="targetIdentifier">The identifier of the target (e.g., OU ARN).</param> /// <returns>The operation ID.</returns> public async Task<string> DisableControlAsync(string controlArn, string targetIdentifier) { try { var request = new DisableControlRequest { ControlIdentifier = controlArn, TargetIdentifier = targetIdentifier }; var response = await _controlTowerService.DisableControlAsync(request); var operationId = response.OperationIdentifier; // Wait for operation to complete while (true) { var status = await GetControlOperationAsync(operationId); Console.WriteLine($"Control operation status: {status}"); if (status == ControlOperationStatus.SUCCEEDED || status == ControlOperationStatus.FAILED) { break; } await Task.Delay(30000); // Wait 30 seconds } return operationId; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Control not found."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't disable control. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// Get the status of a control operation. /// </summary> /// <param name="operationId">The ID of the control operation.</param> /// <returns>The operation status.</returns> public async Task<ControlOperationStatus> GetControlOperationAsync(string operationId) { try { var request = new GetControlOperationRequest { OperationIdentifier = operationId }; var response = await _controlTowerService.GetControlOperationAsync(request); return response.ControlOperation.Status; } catch (Amazon.ControlTower.Model.ResourceNotFoundException) { Console.WriteLine("Operation not found."); throw; } catch (AmazonControlTowerException ex) { Console.WriteLine($"Couldn't get control operation status. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } /// <summary> /// List all controls in the Control Tower control catalog. /// </summary> /// <returns>A list of control summaries.</returns> public async Task<List<ControlSummary>> ListControlsAsync() { try { var controls = new List<ControlSummary>(); var controlsPaginator = _controlCatalogService.Paginators.ListControls(new Amazon.ControlCatalog.Model.ListControlsRequest()); await foreach (var response in controlsPaginator.Responses) { controls.AddRange(response.Controls); } return controls; } catch (AmazonControlCatalogException ex) { Console.WriteLine($"Couldn't list controls. Here's why: {ex.ErrorCode}: {ex.Message}"); throw; } } }
Python
SDK für Python (Boto3)
Anmerkung

Es gibt noch mehr dazu GitHub. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS -Code-Beispiel- einrichten und ausführen.

Führen Sie ein interaktives Szenario durch, in dem AWS Control Tower Funktionen demonstriert werden.

class ControlTowerScenario: IDENTITY_CENTER_BASELINE = "baseline/LN25R72TTG6IGPTQ" stack_name = "" def __init__( self, controltower_wrapper: ControlTowerWrapper, org_client: boto3.client ): """ :param controltower_wrapper: An instance of the ControlTowerWrapper class. :param org_client: A Boto3 Organization client. """ self.controltower_wrapper = controltower_wrapper self.org_client = org_client self.stack = None self.ou_id = None self.ou_arn = None self.account_id = None self.landing_zone_id = None self.use_landing_zone = False def run_scenario(self) -> None: print("-" * 88) print( "\tWelcome to the AWS Control Tower with ControlCatalog example scenario." ) print("-" * 88) print( "This demo will walk you through working with AWS Control Tower for landing zones,\n" "managing baselines, and working with controls." ) self.account_id = boto3.client("sts").get_caller_identity()["Account"] print( "Some demo operations require the use of a landing zone. " "\nYou can use an existing landing zone or opt out of these operations in the demo." "\nFor instructions on how to set up a landing zone, " "\nsee https://docs.aws.amazon.com/controltower/latest/userguide/getting-started-from-console.html" ) # List available landing zones landing_zones = self.controltower_wrapper.list_landing_zones() if landing_zones: print("\nAvailable Landing Zones:") for i, lz in enumerate(landing_zones, 1): print(f"{i} {lz['arn']})") # Ask if user wants to use the first landing zone in the list if q.ask( f"Do you want to use the first landing zone in the list ({landing_zones[0]['arn']})? (y/n) ", q.is_yesno, ): self.use_landing_zone = True self.landing_zone_id = landing_zones[0]["arn"] print(f"Using landing zone ID: {self.landing_zone_id})") # Set up organization and get Sandbox OU ID. sandbox_ou_id = self.setup_organization() # Store the OU ID for use in the CloudFormation template. self.ou_id = sandbox_ou_id elif q.ask( f"Do you want to use a different existing Landing Zone for this demo? (y/n) ", q.is_yesno, ): self.use_landing_zone = True self.landing_zone_id = q.ask("Enter landing zone id: ", q.non_empty) # Set up organization and get Sandbox OU ID. sandbox_ou_id = self.setup_organization() # Store the OU ID for use in the CloudFormation template. self.ou_id = sandbox_ou_id # List and Enable Baseline. print("\nManaging Baselines:") control_tower_baseline = None identity_center_baseline = None baselines = self.controltower_wrapper.list_baselines() print("\nListing available Baselines:") for baseline in baselines: if baseline["name"] == "AWSControlTowerBaseline": control_tower_baseline = baseline print(f"{baseline['name']}") if self.use_landing_zone: print("\nListing enabled baselines:") enabled_baselines = self.controltower_wrapper.list_enabled_baselines() for baseline in enabled_baselines: # If the Identity Center baseline is enabled, the identifier must be used for other baselines. if self.IDENTITY_CENTER_BASELINE in baseline["baselineIdentifier"]: identity_center_baseline = baseline print(f"{baseline['baselineIdentifier']}") if q.ask( f"Do you want to enable the Control Tower Baseline? (y/n) ", q.is_yesno, ): print("\nEnabling Control Tower Baseline.") ic_baseline_arn = ( identity_center_baseline["arn"] if identity_center_baseline else None ) baseline_arn = self.controltower_wrapper.enable_baseline( self.ou_arn, ic_baseline_arn, control_tower_baseline["arn"], "4.0" ) if baseline_arn: print(f"Enabled baseline ARN: {baseline_arn}") else: # Find the enabled baseline so we can reset it. for enabled_baseline in enabled_baselines: if ( enabled_baseline["baselineIdentifier"] == control_tower_baseline["arn"] ): baseline_arn = enabled_baseline["arn"] print("No change, the selected baseline was already enabled.") if q.ask( f"Do you want to reset the Control Tower Baseline? (y/n) ", q.is_yesno, ): print(f"\nResetting Control Tower Baseline. {baseline_arn}") operation_id = self.controltower_wrapper.reset_enabled_baseline( baseline_arn ) print(f"\nReset baseline operation id {operation_id}.") if baseline_arn and q.ask( f"Do you want to disable the Control Tower Baseline? (y/n) ", q.is_yesno, ): print(f"Disabling baseline ARN: {baseline_arn}") operation_id = self.controltower_wrapper.disable_baseline( baseline_arn ) print(f"\nDisabled baseline operation id {operation_id}.") # Re-enable the baseline for the next step. print("\nEnabling Control Tower Baseline.") self.controltower_wrapper.enable_baseline( self.ou_arn, ic_baseline_arn, control_tower_baseline["arn"], "4.0", ) # List and Enable Controls. print("\nManaging Controls:") controls = self.controltower_wrapper.list_controls() print("\nListing first 5 available Controls:") for i, control in enumerate(controls[:5], 1): print(f"{i}. {control['Name']} - {control['Arn']}") if self.use_landing_zone: target_ou = self.ou_arn enabled_controls = self.controltower_wrapper.list_enabled_controls( target_ou ) print("\nListing enabled controls:") for i, control in enumerate(enabled_controls, 1): print(f"{i}. {control['controlIdentifier']}") # Enable first non-enabled control as an example. enabled_control_arns = [control["arn"] for control in enabled_controls] control_arn = next( control["Arn"] for control in controls if control["Arn"] not in enabled_control_arns ) if control_arn and q.ask( f"Do you want to enable the control {control_arn}? (y/n) ", q.is_yesno, ): print(f"\nEnabling control: {control_arn}") operation_id = self.controltower_wrapper.enable_control( control_arn, target_ou ) if operation_id: print(f"Enabled control with operation id {operation_id}") if control_arn and q.ask( f"Do you want to disable the control? (y/n) ", q.is_yesno, ): print("\nDisabling the control...") operation_id = self.controltower_wrapper.disable_control( control_arn, target_ou ) print(f"Disable operation ID: {operation_id}") print("\nThis concludes the example scenario.") print("Thanks for watching!") print("-" * 88) def setup_organization(self): """ Checks if the current account is part of an organization and creates one if needed. Also ensures a Sandbox OU exists and returns its ID. :return: The ID of the Sandbox OU """ print("\nChecking organization status...") try: # Check if account is part of an organization org_response = self.org_client.describe_organization() org_id = org_response["Organization"]["Id"] print(f"Account is part of organization: {org_id}") except ClientError as error: if error.response["Error"]["Code"] == "AWSOrganizationsNotInUseException": print("No organization found. Creating a new organization...") try: create_response = self.org_client.create_organization( FeatureSet="ALL" ) org_id = create_response["Organization"]["Id"] print(f"Created new organization: {org_id}") # Wait for organization to be available. waiter = self.org_client.get_waiter("organization_active") waiter.wait( Organization=org_id, WaiterConfig={"Delay": 5, "MaxAttempts": 12}, ) except ClientError as create_error: logger.error( "Couldn't create organization. Here's why: %s: %s", create_error.response["Error"]["Code"], create_error.response["Error"]["Message"], ) raise else: logger.error( "Couldn't describe organization. Here's why: %s: %s", error.response["Error"]["Code"], error.response["Error"]["Message"], ) raise # Look for Sandbox OU. sandbox_ou_id = None paginator = self.org_client.get_paginator( "list_organizational_units_for_parent" ) try: # Get root ID first. roots = self.org_client.list_roots()["Roots"] if not roots: raise ValueError("No root found in organization") root_id = roots[0]["Id"] # Search for existing Sandbox OU. print("Checking for Sandbox OU...") for page in paginator.paginate(ParentId=root_id): for ou in page["OrganizationalUnits"]: if ou["Name"] == "Sandbox": sandbox_ou_id = ou["Id"] self.ou_arn = ou["Arn"] print(f"Found existing Sandbox OU: {sandbox_ou_id}") break if sandbox_ou_id: break # Create Sandbox OU if it doesn't exist. if not sandbox_ou_id: print("Creating Sandbox OU...") create_ou_response = self.org_client.create_organizational_unit( ParentId=root_id, Name="Sandbox" ) sandbox_ou_id = create_ou_response["OrganizationalUnit"]["Id"] print(f"Created new Sandbox OU: {sandbox_ou_id}") # Wait for OU to be available. waiter = self.org_client.get_waiter("organizational_unit_active") waiter.wait( OrganizationalUnitId=sandbox_ou_id, WaiterConfig={"Delay": 5, "MaxAttempts": 12}, ) except ClientError as error: logger.error( "Couldn't set up Sandbox OU. Here's why: %s: %s", error.response["Error"]["Code"], error.response["Error"]["Message"], ) raise return sandbox_ou_id if __name__ == "__main__": try: org = boto3.client("organizations") control_tower_wrapper = ControlTowerWrapper.from_client() scenario = ControlTowerScenario(control_tower_wrapper, org) scenario.run_scenario() except Exception: logging.exception("Something went wrong with the scenario.") class ControlTowerWrapper: """Encapsulates AWS Control Tower and Control Catalog functionality.""" def __init__( self, controltower_client: boto3.client, controlcatalog_client: boto3.client ): """ :param controltower_client: A Boto3 Amazon ControlTower client. :param controlcatalog_client: A Boto3 Amazon ControlCatalog client. """ self.controltower_client = controltower_client self.controlcatalog_client = controlcatalog_client @classmethod def from_client(cls): controltower_client = boto3.client("controltower") controlcatalog_client = boto3.client("controlcatalog") return cls(controltower_client, controlcatalog_client) def list_baselines(self): """ Lists all baselines. :return: List of baselines. :raises ClientError: If the listing operation fails. """ try: paginator = self.controltower_client.get_paginator("list_baselines") baselines = [] for page in paginator.paginate(): baselines.extend(page["baselines"]) return baselines except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) else: logger.error( "Couldn't list baselines. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def enable_baseline( self, target_identifier: str, identity_center_baseline: str, baseline_identifier: str, baseline_version: str, ): """ Enables a baseline for the specified target if it's not already enabled. :param target_identifier: The ARN of the target. :param baseline_identifier: The identifier of baseline to enable. :param identity_center_baseline: The identifier of identity center baseline if it is enabled. :param baseline_version: The version of baseline to enable. :return: The enabled baseline ARN or None if already enabled. :raises ClientError: If enabling the baseline fails for reasons other than it being already enabled. """ try: response = self.controltower_client.enable_baseline( baselineIdentifier=baseline_identifier, baselineVersion=baseline_version, targetIdentifier=target_identifier, parameters=[ { "key": "IdentityCenterEnabledBaselineArn", "value": identity_center_baseline, } ], ) operation_id = response["operationIdentifier"] while True: status = self.get_baseline_operation(operation_id) print(f"Baseline operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return response["arn"] except ClientError as err: if err.response["Error"]["Code"] == "ValidationException": if "already enabled" in err.response["Error"]["Message"]: print("Baseline is already enabled for this target") return None else: print( "Unable to enable baseline due to validation exception: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) logger.error( "Couldn't enable baseline. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_controls(self): """ Lists all controls in the Control Tower control catalog. :return: List of controls. :raises ClientError: If the listing operation fails. """ try: paginator = self.controlcatalog_client.get_paginator("list_controls") controls = [] for page in paginator.paginate(): controls.extend(page["Controls"]) return controls except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) else: logger.error( "Couldn't list controls. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def enable_control(self, control_arn: str, target_identifier: str): """ Enables a control for a specified target. :param control_arn: The ARN of the control to enable. :param target_identifier: The identifier of the target (e.g., OU ARN). :return: The operation ID. :raises ClientError: If enabling the control fails. """ try: print(control_arn) print(target_identifier) response = self.controltower_client.enable_control( controlIdentifier=control_arn, targetIdentifier=target_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_control_operation(operation_id) print(f"Control operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return operation_id except ClientError as err: if ( err.response["Error"]["Code"] == "ValidationException" and "already enabled" in err.response["Error"]["Message"] ): logger.info("Control is already enabled for this target") return None elif ( err.response["Error"]["Code"] == "ResourceNotFoundException" and "not registered with AWS Control Tower" in err.response["Error"]["Message"] ): logger.error("Control Tower must be enabled to work with controls.") return None logger.error( "Couldn't enable control. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_control_operation(self, operation_id: str): """ Gets the status of a control operation. :param operation_id: The ID of the control operation. :return: The operation status. :raises ClientError: If getting the operation status fails. """ try: response = self.controltower_client.get_control_operation( operationIdentifier=operation_id ) return response["controlOperation"]["status"] except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Operation not found.") else: logger.error( "Couldn't get control operation status. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_baseline_operation(self, operation_id: str): """ Gets the status of a baseline operation. :param operation_id: The ID of the baseline operation. :return: The operation status. :raises ClientError: If getting the operation status fails. """ try: response = self.controltower_client.get_baseline_operation( operationIdentifier=operation_id ) return response["baselineOperation"]["status"] except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Operation not found.") else: logger.error( "Couldn't get baseline operation status. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def disable_control(self, control_arn: str, target_identifier: str): """ Disables a control for a specified target. :param control_arn: The ARN of the control to disable. :param target_identifier: The identifier of the target (e.g., OU ARN). :return: The operation ID. :raises ClientError: If disabling the control fails. """ try: response = self.controltower_client.disable_control( controlIdentifier=control_arn, targetIdentifier=target_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_control_operation(operation_id) print(f"Control operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return operation_id except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Control not found.") else: logger.error( "Couldn't disable control. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_landing_zones(self): """ Lists all landing zones. :return: List of landing zones. :raises ClientError: If the listing operation fails. """ try: paginator = self.controltower_client.get_paginator("list_landing_zones") landing_zones = [] for page in paginator.paginate(): landing_zones.extend(page["landingZones"]) return landing_zones except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) else: logger.error( "Couldn't list landing zones. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_enabled_baselines(self): """ Lists all enabled baselines. :return: List of enabled baselines. :raises ClientError: If the listing operation fails. """ try: paginator = self.controltower_client.get_paginator("list_enabled_baselines") enabled_baselines = [] for page in paginator.paginate(): enabled_baselines.extend(page["enabledBaselines"]) return enabled_baselines except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Target not found.") else: logger.error( "Couldn't list enabled baselines. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def reset_enabled_baseline(self, enabled_baseline_identifier: str): """ Resets an enabled baseline for a specific target. :param enabled_baseline_identifier: The identifier of the enabled baseline to reset. :return: The operation ID. :raises ClientError: If resetting the baseline fails. """ try: response = self.controltower_client.reset_enabled_baseline( enabledBaselineIdentifier=enabled_baseline_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_baseline_operation(operation_id) print(f"Baseline operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return operation_id except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Target not found.") else: logger.error( "Couldn't reset enabled baseline. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def disable_baseline(self, enabled_baseline_identifier: str): """ Disables a baseline for a specific target and waits for the operation to complete. :param enabled_baseline_identifier: The identifier of the baseline to disable. :return: The operation ID. :raises ClientError: If disabling the baseline fails. """ try: response = self.controltower_client.disable_baseline( enabledBaselineIdentifier=enabled_baseline_identifier ) operation_id = response["operationIdentifier"] while True: status = self.get_baseline_operation(operation_id) print(f"Baseline operation status: {status}") if status in ["SUCCEEDED", "FAILED"]: break time.sleep(30) return response["operationIdentifier"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": print( f"Conflict disabling baseline: {err.response['Error']['Message']}. Skipping disable step." ) return None else: logger.error( "Couldn't disable baseline. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_enabled_controls(self, target_identifier: str): """ Lists all enabled controls for a specific target. :param target_identifier: The identifier of the target (e.g., OU ARN). :return: List of enabled controls. :raises ClientError: If the listing operation fails. """ enabled_controls = [] try: paginator = self.controltower_client.get_paginator("list_enabled_controls") for page in paginator.paginate(targetIdentifier=target_identifier): enabled_controls.extend(page["enabledControls"]) return enabled_controls except ClientError as err: if err.response["Error"]["Code"] == "AccessDeniedException": logger.error( "Access denied. Please ensure you have the necessary permissions." ) return enabled_controls elif ( err.response["Error"]["Code"] == "ResourceNotFoundException" and "not registered with AWS Control Tower" in err.response["Error"]["Message"] ): logger.error("Control Tower must be enabled to work with controls.") return enabled_controls else: logger.error( "Couldn't list enabled controls. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise