AWS SDK가 있는 AWS Glue의 기본 사항 알아보기 - AWS Glue

AWS SDK가 있는 AWS Glue의 기본 사항 알아보기

다음 코드 예제는 다음과 같은 작업을 수행하는 방법을 보여줍니다.

  • 퍼블릭 Amazon S3 버킷을 크롤링하고 CSV 형식의 메타데이터 데이터베이스를 생성하는 크롤러를 생성합니다.

  • AWS Glue Data Catalog의 데이터베이스 및 테이블에 대한 정보를 나열합니다.

  • 작업을 생성하여 S3 버킷에서 CSV 데이터를 추출하고, 데이터를 변환하며, JSON 형식의 출력을 다른 S3 버킷으로 로드합니다.

  • 작업 실행에 대한 정보를 나열하고 변환된 데이터를 확인하며 리소스를 정리합니다.

자세한 내용은 자습서:AWS Glue Studio 시작하기를 참조하세요.

.NET
SDK for .NET
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예제 리포지토리에서 전체 예제를 찾고 설정 및 실행하는 방법을 배워보세요.

시나리오에서 사용되는 AWS Glue 함수를 래핑하는 클래스를 생성합니다.

using System.Net; namespace GlueActions; public class GlueWrapper { private readonly IAmazonGlue _amazonGlue; /// <summary> /// Constructor for the AWS Glue actions wrapper. /// </summary> /// <param name="amazonGlue"></param> public GlueWrapper(IAmazonGlue amazonGlue) { _amazonGlue = amazonGlue; } /// <summary> /// Create an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name for the crawler.</param> /// <param name="crawlerDescription">A description of the crawler.</param> /// <param name="role">The AWS Identity and Access Management (IAM) role to /// be assumed by the crawler.</param> /// <param name="schedule">The schedule on which the crawler will be executed.</param> /// <param name="s3Path">The path to the Amazon Simple Storage Service (Amazon S3) /// bucket where the Python script has been stored.</param> /// <param name="dbName">The name to use for the database that will be /// created by the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateCrawlerAsync( string crawlerName, string crawlerDescription, string role, string schedule, string s3Path, string dbName) { var s3Target = new S3Target { Path = s3Path, }; var targetList = new List<S3Target> { s3Target, }; var targets = new CrawlerTargets { S3Targets = targetList, }; var crawlerRequest = new CreateCrawlerRequest { DatabaseName = dbName, Name = crawlerName, Description = crawlerDescription, Targets = targets, Role = role, Schedule = schedule, }; var response = await _amazonGlue.CreateCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Create an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="roleName">The name of the IAM role to be assumed by /// the job.</param> /// <param name="description">A description of the job.</param> /// <param name="scriptUrl">The URL to the script.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateJobAsync(string dbName, string tableName, string bucketUrl, string jobName, string roleName, string description, string scriptUrl) { var command = new JobCommand { PythonVersion = "3", Name = "glueetl", ScriptLocation = scriptUrl, }; var arguments = new Dictionary<string, string> { { "--input_database", dbName }, { "--input_table", tableName }, { "--output_bucket_url", bucketUrl } }; var request = new CreateJobRequest { Command = command, DefaultArguments = arguments, Description = description, GlueVersion = "3.0", Name = jobName, NumberOfWorkers = 10, Role = roleName, WorkerType = "G.1X" }; var response = await _amazonGlue.CreateJobAsync(request); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteCrawlerAsync(string crawlerName) { var response = await _amazonGlue.DeleteCrawlerAsync(new DeleteCrawlerRequest { Name = crawlerName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete the AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteDatabaseAsync(string dbName) { var response = await _amazonGlue.DeleteDatabaseAsync(new DeleteDatabaseRequest { Name = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteJobAsync(string jobName) { var response = await _amazonGlue.DeleteJobAsync(new DeleteJobRequest { JobName = jobName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete a table from an AWS Glue database. /// </summary> /// <param name="tableName">The table to delete.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteTableAsync(string dbName, string tableName) { var response = await _amazonGlue.DeleteTableAsync(new DeleteTableRequest { Name = tableName, DatabaseName = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Get information about an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Crawler object describing the crawler.</returns> public async Task<Crawler?> GetCrawlerAsync(string crawlerName) { var crawlerRequest = new GetCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.GetCrawlerAsync(crawlerRequest); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { var databaseName = response.Crawler.DatabaseName; Console.WriteLine($"{crawlerName} has the database {databaseName}"); return response.Crawler; } Console.WriteLine($"No information regarding {crawlerName} could be found."); return null; } /// <summary> /// Get information about the state of an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A value describing the state of the crawler.</returns> public async Task<CrawlerState> GetCrawlerStateAsync(string crawlerName) { var response = await _amazonGlue.GetCrawlerAsync( new GetCrawlerRequest { Name = crawlerName }); return response.Crawler.State; } /// <summary> /// Get information about an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Database object containing information about the database.</returns> public async Task<Database> GetDatabaseAsync(string dbName) { var databasesRequest = new GetDatabaseRequest { Name = dbName, }; var response = await _amazonGlue.GetDatabaseAsync(databasesRequest); return response.Database; } /// <summary> /// Get information about a specific AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="jobRunId">The Id of the job run.</param> /// <returns>A JobRun object with information about the job run.</returns> public async Task<JobRun> GetJobRunAsync(string jobName, string jobRunId) { var response = await _amazonGlue.GetJobRunAsync(new GetJobRunRequest { JobName = jobName, RunId = jobRunId }); return response.JobRun; } /// <summary> /// Get information about all AWS Glue runs of a specific job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A list of JobRun objects.</returns> public async Task<List<JobRun>> GetJobRunsAsync(string jobName) { var jobRuns = new List<JobRun>(); var request = new GetJobRunsRequest { JobName = jobName, }; // No need to loop to get all the log groups--the SDK does it for us behind the scenes var paginatorForJobRuns = _amazonGlue.Paginators.GetJobRuns(request); await foreach (var response in paginatorForJobRuns.Responses) { response.JobRuns.ForEach(jobRun => { jobRuns.Add(jobRun); }); } return jobRuns; } /// <summary> /// Get a list of tables for an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A list of Table objects.</returns> public async Task<List<Table>> GetTablesAsync(string dbName) { var request = new GetTablesRequest { DatabaseName = dbName }; var tables = new List<Table>(); // Get a paginator for listing the tables. var tablePaginator = _amazonGlue.Paginators.GetTables(request); await foreach (var response in tablePaginator.Responses) { tables.AddRange(response.TableList); } return tables; } /// <summary> /// List AWS Glue jobs using a paginator. /// </summary> /// <returns>A list of AWS Glue job names.</returns> public async Task<List<string>> ListJobsAsync() { var jobNames = new List<string>(); var listJobsPaginator = _amazonGlue.Paginators.ListJobs(new ListJobsRequest { MaxResults = 10 }); await foreach (var response in listJobsPaginator.Responses) { jobNames.AddRange(response.JobNames); } return jobNames; } /// <summary> /// Start an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> StartCrawlerAsync(string crawlerName) { var crawlerRequest = new StartCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.StartCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Start an AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A string representing the job run Id.</returns> public async Task<string> StartJobRunAsync( string jobName, string inputDatabase, string inputTable, string bucketName) { var request = new StartJobRunRequest { JobName = jobName, Arguments = new Dictionary<string, string> { {"--input_database", inputDatabase}, {"--input_table", inputTable}, {"--output_bucket_url", $"s3://{bucketName}/"} } }; var response = await _amazonGlue.StartJobRunAsync(request); return response.JobRunId; } }

시나리오를 실행하는 클래스를 생성합니다.

global using Amazon.Glue; global using GlueActions; global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Logging.Console; global using Microsoft.Extensions.Logging.Debug; using Amazon.Glue.Model; using Amazon.S3; using Amazon.S3.Model; namespace GlueBasics; public class GlueBasics { private static ILogger logger = null!; private static IConfiguration _configuration = null!; static async Task Main(string[] args) { // Set up dependency injection for AWS Glue. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonGlue>() .AddTransient<GlueWrapper>() .AddTransient<UiWrapper>() ) .Build(); logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<GlueBasics>(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally load local settings. .Build(); // These values are stored in settings.json // Once you have run the CDK script to deploy the resources, // edit the file to set "BucketName", "RoleName", and "ScriptURL" // to the appropriate values. Also set "CrawlerName" to the name // you want to give the crawler when it is created. string bucketName = _configuration["BucketName"]!; string bucketUrl = _configuration["BucketUrl"]!; string crawlerName = _configuration["CrawlerName"]!; string roleName = _configuration["RoleName"]!; string sourceData = _configuration["SourceData"]!; string dbName = _configuration["DbName"]!; string cron = _configuration["Cron"]!; string scriptUrl = _configuration["ScriptURL"]!; string jobName = _configuration["JobName"]!; var wrapper = host.Services.GetRequiredService<GlueWrapper>(); var uiWrapper = host.Services.GetRequiredService<UiWrapper>(); uiWrapper.DisplayOverview(); uiWrapper.PressEnter(); // Create the crawler and wait for it to be ready. uiWrapper.DisplayTitle("Create AWS Glue crawler"); Console.WriteLine("Let's begin by creating the AWS Glue crawler."); var crawlerDescription = "Crawler created for the AWS Glue Basics scenario."; var crawlerCreated = await wrapper.CreateCrawlerAsync(crawlerName, crawlerDescription, roleName, cron, sourceData, dbName); if (crawlerCreated) { Console.WriteLine($"The crawler: {crawlerName} has been created. Now let's wait until it's ready."); CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't create crawler {crawlerName}."); return; // Exit the application. } uiWrapper.DisplayTitle("Start AWS Glue crawler"); Console.WriteLine("Now let's wait until the crawler has successfully started."); var crawlerStarted = await wrapper.StartCrawlerAsync(crawlerName); if (crawlerStarted) { CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't start the crawler {crawlerName}."); return; // Exit the application. } uiWrapper.PressEnter(); Console.WriteLine($"\nLet's take a look at the database: {dbName}"); var database = await wrapper.GetDatabaseAsync(dbName); if (database != null) { uiWrapper.DisplayTitle($"{database.Name} Details"); Console.WriteLine($"{database.Name} created on {database.CreateTime}"); Console.WriteLine(database.Description); } uiWrapper.PressEnter(); var tables = await wrapper.GetTablesAsync(dbName); if (tables.Count > 0) { tables.ForEach(table => { Console.WriteLine($"{table.Name}\tCreated: {table.CreateTime}\tUpdated: {table.UpdateTime}"); }); } uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Create AWS Glue job"); Console.WriteLine("Creating a new AWS Glue job."); var description = "An AWS Glue job created using the AWS SDK for .NET"; await wrapper.CreateJobAsync(dbName, tables[0].Name, bucketUrl, jobName, roleName, description, scriptUrl); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Starting AWS Glue job"); Console.WriteLine("Starting the new AWS Glue job..."); var jobRunId = await wrapper.StartJobRunAsync(jobName, dbName, tables[0].Name, bucketName); var jobRunComplete = false; var jobRun = new JobRun(); do { jobRun = await wrapper.GetJobRunAsync(jobName, jobRunId); if (jobRun.JobRunState == "SUCCEEDED" || jobRun.JobRunState == "STOPPED" || jobRun.JobRunState == "FAILED" || jobRun.JobRunState == "TIMEOUT") { jobRunComplete = true; } } while (!jobRunComplete); uiWrapper.DisplayTitle($"Data in {bucketName}"); // Get the list of data stored in the S3 bucket. var s3Client = new AmazonS3Client(); var response = await s3Client.ListObjectsAsync(new ListObjectsRequest { BucketName = bucketName }); response.S3Objects.ForEach(s3Object => { Console.WriteLine(s3Object.Key); }); uiWrapper.DisplayTitle("AWS Glue jobs"); var jobNames = await wrapper.ListJobsAsync(); jobNames.ForEach(jobName => { Console.WriteLine(jobName); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Get AWS Glue job run information"); Console.WriteLine("Getting information about the AWS Glue job."); var jobRuns = await wrapper.GetJobRunsAsync(jobName); jobRuns.ForEach(jobRun => { Console.WriteLine($"{jobRun.JobName}\t{jobRun.JobRunState}\t{jobRun.CompletedOn}"); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Deleting resources"); Console.WriteLine("Deleting the AWS Glue job used by the example."); await wrapper.DeleteJobAsync(jobName); Console.WriteLine("Deleting the tables from the database."); tables.ForEach(async table => { await wrapper.DeleteTableAsync(dbName, table.Name); }); Console.WriteLine("Deleting the database."); await wrapper.DeleteDatabaseAsync(dbName); Console.WriteLine("Deleting the AWS Glue crawler."); await wrapper.DeleteCrawlerAsync(crawlerName); Console.WriteLine("The AWS Glue scenario has completed."); uiWrapper.PressEnter(); } } namespace GlueBasics; public class UiWrapper { public readonly string SepBar = new string('-', Console.WindowWidth); /// <summary> /// Show information about the scenario. /// </summary> public void DisplayOverview() { Console.Clear(); DisplayTitle("Amazon Glue: get started with crawlers and jobs"); Console.WriteLine("This example application does the following:"); Console.WriteLine("\t 1. Create a crawler, pass it the IAM role and the URL to the public S3 bucket that contains the source data"); Console.WriteLine("\t 2. Start the crawler."); Console.WriteLine("\t 3. Get the database created by the crawler and the tables in the database."); Console.WriteLine("\t 4. Create a job."); Console.WriteLine("\t 5. Start a job run."); Console.WriteLine("\t 6. Wait for the job run to complete."); Console.WriteLine("\t 7. Show the data stored in the bucket."); Console.WriteLine("\t 8. List jobs for the account."); Console.WriteLine("\t 9. Get job run details for the job that was run."); Console.WriteLine("\t10. Delete the demo job."); Console.WriteLine("\t11. Delete the database and tables created for the demo."); Console.WriteLine("\t12. Delete the crawler."); } /// <summary> /// Display a message and wait until the user presses enter. /// </summary> public void PressEnter() { Console.Write("\nPlease press <Enter> to continue. "); _ = Console.ReadLine(); } /// <summary> /// Pad a string with spaces to center it on the console display. /// </summary> /// <param name="strToCenter">The string to center on the screen.</param> /// <returns>The string padded to make it center on the screen.</returns> public string CenterString(string strToCenter) { var padAmount = (Console.WindowWidth - strToCenter.Length) / 2; var leftPad = new string(' ', padAmount); return $"{leftPad}{strToCenter}"; } /// <summary> /// Display a line of hyphens, the centered text of the title and another /// line of hyphens. /// </summary> /// <param name="strTitle">The string to be displayed.</param> public void DisplayTitle(string strTitle) { Console.WriteLine(SepBar); Console.WriteLine(CenterString(strTitle)); Console.WriteLine(SepBar); } }
C++
SDK for C++
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예제 리포지토리에서 전체 예제를 찾고 설정 및 실행하는 방법을 배워보세요.

//! Scenario which demonstrates using AWS Glue to add a crawler and run a job. /*! \\sa runGettingStartedWithGlueScenario() \param bucketName: An S3 bucket created in the setup. \param roleName: An AWS Identity and Access Management (IAM) role created in the setup. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::runGettingStartedWithGlueScenario(const Aws::String &bucketName, const Aws::String &roleName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::Glue::GlueClient client(clientConfig); Aws::String roleArn; if (!getRoleArn(roleName, roleArn, clientConfig)) { std::cerr << "Error getting role ARN for role." << std::endl; return false; } // 1. Upload the job script to the S3 bucket. { std::cout << "Uploading the job script '" << AwsDoc::Glue::PYTHON_SCRIPT << "'." << std::endl; if (!AwsDoc::Glue::uploadFile(bucketName, AwsDoc::Glue::PYTHON_SCRIPT_PATH, AwsDoc::Glue::PYTHON_SCRIPT, clientConfig)) { std::cerr << "Error uploading the job file." << std::endl; return false; } } // 2. Create a crawler. { Aws::Glue::Model::S3Target s3Target; s3Target.SetPath("s3://crawler-public-us-east-1/flight/2016/csv"); Aws::Glue::Model::CrawlerTargets crawlerTargets; crawlerTargets.AddS3Targets(s3Target); Aws::Glue::Model::CreateCrawlerRequest request; request.SetTargets(crawlerTargets); request.SetName(CRAWLER_NAME); request.SetDatabaseName(CRAWLER_DATABASE_NAME); request.SetTablePrefix(CRAWLER_DATABASE_PREFIX); request.SetRole(roleArn); Aws::Glue::Model::CreateCrawlerOutcome outcome = client.CreateCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the crawler." << std::endl; } else { std::cerr << "Error creating a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets("", CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 3. Get a crawler. { Aws::Glue::Model::GetCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome outcome = client.GetCrawler(request); if (outcome.IsSuccess()) { Aws::Glue::Model::CrawlerState crawlerState = outcome.GetResult().GetCrawler().GetState(); std::cout << "Retrieved crawler with state " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << "." << std::endl; } else { std::cerr << "Error retrieving a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 4. Start a crawler. { Aws::Glue::Model::StartCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::StartCrawlerOutcome outcome = client.StartCrawler(request); if (outcome.IsSuccess() || (Aws::Glue::GlueErrors::CRAWLER_RUNNING == outcome.GetError().GetErrorType())) { if (!outcome.IsSuccess()) { std::cout << "Crawler was already started." << std::endl; } else { std::cout << "Successfully started crawler." << std::endl; } std::cout << "This may take a while to run." << std::endl; Aws::Glue::Model::CrawlerState crawlerState = Aws::Glue::Model::CrawlerState::NOT_SET; int iterations = 0; while (Aws::Glue::Model::CrawlerState::READY != crawlerState) { std::this_thread::sleep_for(std::chrono::seconds(1)); ++iterations; if ((iterations % 10) == 0) { // Log status every 10 seconds. std::cout << "Crawler status " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << ". After " << iterations << " seconds elapsed." << std::endl; } Aws::Glue::Model::GetCrawlerRequest getCrawlerRequest; getCrawlerRequest.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome getCrawlerOutcome = client.GetCrawler( getCrawlerRequest); if (getCrawlerOutcome.IsSuccess()) { crawlerState = getCrawlerOutcome.GetResult().GetCrawler().GetState(); } else { std::cerr << "Error getting crawler. " << getCrawlerOutcome.GetError().GetMessage() << std::endl; break; } } if (Aws::Glue::Model::CrawlerState::READY == crawlerState) { std::cout << "Crawler finished running after " << iterations << " seconds." << std::endl; } } else { std::cerr << "Error starting a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 5. Get a database. { Aws::Glue::Model::GetDatabaseRequest request; request.SetName(CRAWLER_DATABASE_NAME); Aws::Glue::Model::GetDatabaseOutcome outcome = client.GetDatabase(request); if (outcome.IsSuccess()) { const Aws::Glue::Model::Database &database = outcome.GetResult().GetDatabase(); std::cout << "Successfully retrieve the database\n" << database.Jsonize().View().WriteReadable() << "'." << std::endl; } else { std::cerr << "Error getting the database. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 6. Get tables. Aws::String tableName; { Aws::Glue::Model::GetTablesRequest request; request.SetDatabaseName(CRAWLER_DATABASE_NAME); std::vector<Aws::Glue::Model::Table> all_tables; Aws::String nextToken; // Used for pagination. do { Aws::Glue::Model::GetTablesOutcome outcome = client.GetTables(request); if (outcome.IsSuccess()) { const std::vector<Aws::Glue::Model::Table> &tables = outcome.GetResult().GetTableList(); all_tables.insert(all_tables.end(), tables.begin(), tables.end()); nextToken = outcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting the tables. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } while (!nextToken.empty()); std::cout << "The database contains " << all_tables.size() << (all_tables.size() == 1 ? " table." : "tables.") << std::endl; std::cout << "Here is a list of the tables in the database."; for (size_t index = 0; index < all_tables.size(); ++index) { std::cout << " " << index + 1 << ": " << all_tables[index].GetName() << std::endl; } if (!all_tables.empty()) { int tableIndex = askQuestionForIntRange( "Enter an index to display the database detail ", 1, static_cast<int>(all_tables.size())); std::cout << all_tables[tableIndex - 1].Jsonize().View().WriteReadable() << std::endl; tableName = all_tables[tableIndex - 1].GetName(); } } // 7. Create a job. { Aws::Glue::Model::CreateJobRequest request; request.SetName(JOB_NAME); request.SetRole(roleArn); request.SetGlueVersion(GLUE_VERSION); Aws::Glue::Model::JobCommand command; command.SetName(JOB_COMMAND_NAME); command.SetPythonVersion(JOB_PYTHON_VERSION); command.SetScriptLocation( Aws::String("s3://") + bucketName + "/" + PYTHON_SCRIPT); request.SetCommand(command); Aws::Glue::Model::CreateJobOutcome outcome = client.CreateJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the job." << std::endl; } else { std::cerr << "Error creating the job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 8. Start a job run. { Aws::Glue::Model::StartJobRunRequest request; request.SetJobName(JOB_NAME); Aws::Map<Aws::String, Aws::String> arguments; arguments["--input_database"] = CRAWLER_DATABASE_NAME; arguments["--input_table"] = tableName; arguments["--output_bucket_url"] = Aws::String("s3://") + bucketName + "/"; request.SetArguments(arguments); Aws::Glue::Model::StartJobRunOutcome outcome = client.StartJobRun(request); if (outcome.IsSuccess()) { std::cout << "Successfully started the job." << std::endl; Aws::String jobRunId = outcome.GetResult().GetJobRunId(); int iterator = 0; bool done = false; while (!done) { ++iterator; std::this_thread::sleep_for(std::chrono::seconds(1)); Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(JOB_NAME); jobRunRequest.SetRunId(jobRunId); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { const Aws::Glue::Model::JobRun &jobRun = jobRunOutcome.GetResult().GetJobRun(); Aws::Glue::Model::JobRunState jobRunState = jobRun.GetJobRunState(); if ((jobRunState == Aws::Glue::Model::JobRunState::STOPPED) || (jobRunState == Aws::Glue::Model::JobRunState::FAILED) || (jobRunState == Aws::Glue::Model::JobRunState::TIMEOUT)) { std::cerr << "Error running job. " << jobRun.GetErrorMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } else if (jobRunState == Aws::Glue::Model::JobRunState::SUCCEEDED) { std::cout << "Job run succeeded after " << iterator << " seconds elapsed." << std::endl; done = true; } else if ((iterator % 10) == 0) { // Log status every 10 seconds. std::cout << "Job run status " << Aws::Glue::Model::JobRunStateMapper::GetNameForJobRunState( jobRunState) << ". " << iterator << " seconds elapsed." << std::endl; } } else { std::cerr << "Error retrieving job run state. " << jobRunOutcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } } else { std::cerr << "Error starting a job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 9. List the output data stored in the S3 bucket. { Aws::S3::S3Client s3Client; Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(bucketName); request.SetPrefix(OUTPUT_FILE_PREFIX); Aws::String continuationToken; // Used for pagination. std::vector<Aws::S3::Model::Object> allObjects; do { if (!continuationToken.empty()) { request.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome outcome = s3Client.ListObjectsV2( request); if (outcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = outcome.GetResult().GetContents(); allObjects.insert(allObjects.end(), objects.begin(), objects.end()); continuationToken = outcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << outcome.GetError().GetMessage() << std::endl; break; } } while (!continuationToken.empty()); std::cout << "Data from your job is in " << allObjects.size() << " files in the S3 bucket, " << bucketName << "." << std::endl; for (size_t i = 0; i < allObjects.size(); ++i) { std::cout << " " << i + 1 << ". " << allObjects[i].GetKey() << std::endl; } int objectIndex = askQuestionForIntRange( std::string( "Enter the number of a block to download it and see the first ") + std::to_string(LINES_OF_RUN_FILE_TO_DISPLAY) + " lines of JSON output in the block: ", 1, static_cast<int>(allObjects.size())); Aws::String objectKey = allObjects[objectIndex - 1].GetKey(); std::stringstream stringStream; if (getObjectFromBucket(bucketName, objectKey, stringStream, clientConfig)) { for (int i = 0; i < LINES_OF_RUN_FILE_TO_DISPLAY && stringStream; ++i) { std::string line; std::getline(stringStream, line); std::cout << " " << line << std::endl; } } else { deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 10. List all the jobs. Aws::String jobName; { Aws::Glue::Model::ListJobsRequest listJobsRequest; Aws::String nextToken; std::vector<Aws::String> allJobNames; do { if (!nextToken.empty()) { listJobsRequest.SetNextToken(nextToken); } Aws::Glue::Model::ListJobsOutcome listRunsOutcome = client.ListJobs( listJobsRequest); if (listRunsOutcome.IsSuccess()) { const std::vector<Aws::String> &jobNames = listRunsOutcome.GetResult().GetJobNames(); allJobNames.insert(allJobNames.end(), jobNames.begin(), jobNames.end()); nextToken = listRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error listing jobs. " << listRunsOutcome.GetError().GetMessage() << std::endl; } } while (!nextToken.empty()); std::cout << "Your account has " << allJobNames.size() << " jobs." << std::endl; for (size_t i = 0; i < allJobNames.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobNames[i] << std::endl; } int jobIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobNames.size()) + " to see the list of runs for a job: ", 1, static_cast<int>(allJobNames.size())); jobName = allJobNames[jobIndex - 1]; } // 11. Get the job runs for a job. Aws::String jobRunID; if (!jobName.empty()) { Aws::Glue::Model::GetJobRunsRequest getJobRunsRequest; getJobRunsRequest.SetJobName(jobName); Aws::String nextToken; // Used for pagination. std::vector<Aws::Glue::Model::JobRun> allJobRuns; do { if (!nextToken.empty()) { getJobRunsRequest.SetNextToken(nextToken); } Aws::Glue::Model::GetJobRunsOutcome jobRunsOutcome = client.GetJobRuns( getJobRunsRequest); if (jobRunsOutcome.IsSuccess()) { const std::vector<Aws::Glue::Model::JobRun> &jobRuns = jobRunsOutcome.GetResult().GetJobRuns(); allJobRuns.insert(allJobRuns.end(), jobRuns.begin(), jobRuns.end()); nextToken = jobRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting job runs. " << jobRunsOutcome.GetError().GetMessage() << std::endl; break; } } while (!nextToken.empty()); std::cout << "There are " << allJobRuns.size() << " runs in the job '" << jobName << "'." << std::endl; for (size_t i = 0; i < allJobRuns.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobRuns[i].GetJobName() << std::endl; } int runIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobRuns.size()) + " to see details for a run: ", 1, static_cast<int>(allJobRuns.size())); jobRunID = allJobRuns[runIndex - 1].GetId(); } // 12. Get a single job run. if (!jobRunID.empty()) { Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(jobName); jobRunRequest.SetRunId(jobRunID); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { std::cout << "Displaying the job run JSON description." << std::endl; std::cout << jobRunOutcome.GetResult().GetJobRun().Jsonize().View().WriteReadable() << std::endl; } else { std::cerr << "Error get a job run. " << jobRunOutcome.GetError().GetMessage() << std::endl; } } return deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); } //! Cleanup routine to delete created assets. /*! \\sa deleteAssets() \param crawler: Name of an AWS Glue crawler. \param database: The name of an AWS Glue database. \param job: The name of an AWS Glue job. \param bucketName: The name of an S3 bucket. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAssets(const Aws::String &crawler, const Aws::String &database, const Aws::String &job, const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { const Aws::Glue::GlueClient client(clientConfig); bool result = true; // 13. Delete a job. if (!job.empty()) { Aws::Glue::Model::DeleteJobRequest request; request.SetJobName(job); Aws::Glue::Model::DeleteJobOutcome outcome = client.DeleteJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the job." << std::endl; } else { std::cerr << "Error deleting the job. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 14. Delete a database. if (!database.empty()) { Aws::Glue::Model::DeleteDatabaseRequest request; request.SetName(database); Aws::Glue::Model::DeleteDatabaseOutcome outcome = client.DeleteDatabase( request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the database." << std::endl; } else { std::cerr << "Error deleting database. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 15. Delete a crawler. if (!crawler.empty()) { Aws::Glue::Model::DeleteCrawlerRequest request; request.SetName(crawler); Aws::Glue::Model::DeleteCrawlerOutcome outcome = client.DeleteCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the crawler." << std::endl; } else { std::cerr << "Error deleting the crawler. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 16. Delete the job script and run data from the S3 bucket. result &= AwsDoc::Glue::deleteAllObjectsInS3Bucket(bucketName, clientConfig); return result; } //! Routine which uploads a file to an S3 bucket. /*! \\sa uploadFile() \param bucketName: An S3 bucket created in the setup. \param filePath: The path of the file to upload. \param fileName The name for the uploaded file. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::uploadFile(const Aws::String &bucketName, const Aws::String &filePath, const Aws::String &fileName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client s3_client(clientConfig); Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucketName); request.SetKey(fileName); std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>("SampleAllocationTag", filePath.c_str(), std::ios_base::in | std::ios_base::binary); if (!*inputData) { std::cerr << "Error unable to read file " << filePath << std::endl; return false; } request.SetBody(inputData); Aws::S3::Model::PutObjectOutcome outcome = s3_client.PutObject(request); if (!outcome.IsSuccess()) { std::cerr << "Error: PutObject: " << outcome.GetError().GetMessage() << std::endl; } else { std::cout << "Added object '" << filePath << "' to bucket '" << bucketName << "'." << std::endl; } return outcome.IsSuccess(); } //! Routine which deletes all objects in an S3 bucket. /*! \\sa deleteAllObjectsInS3Bucket() \param bucketName: The S3 bucket name. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAllObjectsInS3Bucket(const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::ListObjectsV2Request listObjectsRequest; listObjectsRequest.SetBucket(bucketName); Aws::String continuationToken; // Used for pagination. bool result = true; do { if (!continuationToken.empty()) { listObjectsRequest.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome listObjectsOutcome = client.ListObjectsV2( listObjectsRequest); if (listObjectsOutcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = listObjectsOutcome.GetResult().GetContents(); if (!objects.empty()) { Aws::S3::Model::DeleteObjectsRequest deleteObjectsRequest; deleteObjectsRequest.SetBucket(bucketName); std::vector<Aws::S3::Model::ObjectIdentifier> objectIdentifiers; for (const Aws::S3::Model::Object &object: objects) { objectIdentifiers.push_back( Aws::S3::Model::ObjectIdentifier().WithKey( object.GetKey())); } Aws::S3::Model::Delete objectsDelete; objectsDelete.SetObjects(objectIdentifiers); objectsDelete.SetQuiet(true); deleteObjectsRequest.SetDelete(objectsDelete); Aws::S3::Model::DeleteObjectsOutcome deleteObjectsOutcome = client.DeleteObjects(deleteObjectsRequest); if (!deleteObjectsOutcome.IsSuccess()) { std::cerr << "Error deleting objects. " << deleteObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } else { std::cout << "Successfully deleted the objects." << std::endl; } } else { std::cout << "No objects to delete in '" << bucketName << "'." << std::endl; } continuationToken = listObjectsOutcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << listObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } } while (!continuationToken.empty()); return result; } //! Routine which retrieves an object from an S3 bucket. /*! \\sa getObjectFromBucket() \param bucketName: The S3 bucket name. \param objectKey: The object's name. \param objectStream: A stream to receive the retrieved data. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::getObjectFromBucket(const Aws::String &bucketName, const Aws::String &objectKey, std::ostream &objectStream, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::GetObjectRequest request; request.SetBucket(bucketName); request.SetKey(objectKey); Aws::S3::Model::GetObjectOutcome outcome = client.GetObject(request); if (outcome.IsSuccess()) { std::cout << "Successfully retrieved '" << objectKey << "'." << std::endl; auto &body = outcome.GetResult().GetBody(); objectStream << body.rdbuf(); } else { std::cerr << "Error retrieving object. " << outcome.GetError().GetMessage() << std::endl; } return outcome.IsSuccess(); }
Java
SDK for Java 2.x
참고

GitHub에 더 많은 내용이 있습니다. AWS코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

/** * Before running this Java V2 code example, set up your development * environment, including your credentials. * <p> * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * To set up the resources, see this documentation topic: * * https://docs.aws.amazon.com/glue/latest/ug/tutorial-add-crawler.html * * This example performs the following tasks: * * 1. Create a database. * 2. Create a crawler. * 3. Get a crawler. * 4. Start a crawler. * 5. Get a database. * 6. Get tables. * 7. Create a job. * 8. Start a job run. * 9. List all jobs. * 10. Get job runs. * 11. Delete a job. * 12. Delete a database. * 13. Delete a crawler. */ public class GlueScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) throws InterruptedException { final String usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName> <scriptLocation> <locationUri> <bucketNameSc>\s Where: iam - The ARN of the IAM role that has AWS Glue and S3 permissions.\s s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, s3://<bucket name>/read). cron - A cron expression used to specify the schedule (i.e., cron(15 12 * * ? *). dbName - The database name.\s crawlerName - The name of the crawler.\s jobName - The name you assign to this job definition. scriptLocation - The Amazon S3 path to a script that runs a job. locationUri - The location of the database (you can find this file in resources folder). bucketNameSc - The Amazon S3 bucket name used when creating a job """; if (args.length != 9) { System.out.println(usage); return; } Scanner scanner = new Scanner(System.in); String iam = args[0]; String s3Path = args[1]; String cron = args[2]; String dbName = args[3]; String crawlerName = args[4]; String jobName = args[5]; String scriptLocation = args[6]; String locationUri = args[7]; String bucketNameSc = args[8]; Region region = Region.US_EAST_1; GlueClient glueClient = GlueClient.builder() .region(region) .build(); System.out.println(DASHES); System.out.println("Welcome to the AWS Glue scenario."); System.out.println(""" AWS Glue is a fully managed extract, transform, and load (ETL) service provided by Amazon Web Services (AWS). It is designed to simplify the process of building, running, and maintaining ETL pipelines, which are essential for data integration and data warehousing tasks. One of the key features of AWS Glue is its ability to automatically discover and catalog data stored in various sources, such as Amazon S3, Amazon RDS, Amazon Redshift, and other databases. This cataloging process creates a central metadata repository, known as the AWS Glue Data Catalog, which provides a unified view of an organization's data assets. This metadata can then be used to create ETL jobs, which can be scheduled and run on-demand or on a regular basis. Lets get started. """); waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create a database."); try { createDatabase(glueClient, dbName, locationUri); } catch (GlueException e) { if (e.awsErrorDetails().errorMessage().equals("Database already exists.")) { System.out.println("Database " + dbName + " already exists. Skipping creation."); } else { System.err.println(e.awsErrorDetails().errorMessage()); return; } } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Create a crawler."); try { createGlueCrawler(glueClient, iam, s3Path, cron, dbName, crawlerName); } catch (GlueException e) { if (e.awsErrorDetails().errorMessage().contains("already exists")) { System.out.println("Crawler " + crawlerName + " already exists. Skipping creation."); } else { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Get a crawler."); try { getSpecificCrawler(glueClient, crawlerName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Start a crawler."); try { startSpecificCrawler(glueClient, crawlerName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Get a database."); try { getSpecificDatabase(glueClient, dbName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("*** Wait 5 min for the tables to become available"); TimeUnit.MINUTES.sleep(5); System.out.println("6. Get tables."); String myTableName; try { myTableName = getGlueTables(glueClient, dbName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Create a job."); try { createJob(glueClient, jobName, iam, scriptLocation); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("8. Start a Job run."); try { startJob(glueClient, jobName, dbName, myTableName, bucketNameSc); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("9. List all jobs."); try { getAllJobs(glueClient); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("10. Get job runs."); try { getJobRuns(glueClient, jobName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("11. Delete a job."); try { deleteJob(glueClient, jobName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } System.out.println("*** Wait 5 MIN for the " + crawlerName + " to stop"); TimeUnit.MINUTES.sleep(5); waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("12. Delete a database."); try { deleteDatabase(glueClient, dbName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Delete a crawler."); try { deleteSpecificCrawler(glueClient, crawlerName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Successfully completed the AWS Glue Scenario"); System.out.println(DASHES); } /** * Creates a Glue database with the specified name and location URI. * * @param glueClient The Glue client to use for the database creation. * @param dbName The name of the database to create. * @param locationUri The location URI for the database. */ public static void createDatabase(GlueClient glueClient, String dbName, String locationUri) { try { DatabaseInput input = DatabaseInput.builder() .description("Built with the AWS SDK for Java V2") .name(dbName) .locationUri(locationUri) .build(); CreateDatabaseRequest request = CreateDatabaseRequest.builder() .databaseInput(input) .build(); glueClient.createDatabase(request); System.out.println(dbName + " was successfully created"); } catch (GlueException e) { throw e; } } /** * Creates a new AWS Glue crawler using the AWS Glue Java API. * * @param glueClient the AWS Glue client used to interact with the AWS Glue service * @param iam the IAM role that the crawler will use to access the data source * @param s3Path the S3 path that the crawler will scan for data * @param cron the cron expression that defines the crawler's schedule * @param dbName the name of the AWS Glue database where the crawler will store the metadata * @param crawlerName the name of the crawler to be created */ public static void createGlueCrawler(GlueClient glueClient, String iam, String s3Path, String cron, String dbName, String crawlerName) { try { S3Target s3Target = S3Target.builder() .path(s3Path) .build(); List<S3Target> targetList = new ArrayList<>(); targetList.add(s3Target); CrawlerTargets targets = CrawlerTargets.builder() .s3Targets(targetList) .build(); CreateCrawlerRequest crawlerRequest = CreateCrawlerRequest.builder() .databaseName(dbName) .name(crawlerName) .description("Created by the AWS Glue Java API") .targets(targets) .role(iam) .schedule(cron) .build(); glueClient.createCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully created"); } catch (GlueException e) { throw e; } } /** * Retrieves a specific crawler from the AWS Glue service and waits for it to be in the "READY" state. * * @param glueClient the AWS Glue client used to interact with the Glue service * @param crawlerName the name of the crawler to be retrieved */ public static void getSpecificCrawler(GlueClient glueClient, String crawlerName) throws InterruptedException { try { GetCrawlerRequest crawlerRequest = GetCrawlerRequest.builder() .name(crawlerName) .build(); boolean ready = false; while (!ready) { GetCrawlerResponse response = glueClient.getCrawler(crawlerRequest); String status = response.crawler().stateAsString(); if (status.compareTo("READY") == 0) { ready = true; } Thread.sleep(3000); } System.out.println("The crawler is now ready"); } catch (GlueException | InterruptedException e) { throw e; } } /** * Starts a specific AWS Glue crawler. * * @param glueClient the AWS Glue client to use for the crawler operation * @param crawlerName the name of the crawler to start * @throws GlueException if there is an error starting the crawler */ public static void startSpecificCrawler(GlueClient glueClient, String crawlerName) { try { StartCrawlerRequest crawlerRequest = StartCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.startCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully started!"); } catch (GlueException e) { throw e; } } /** * Retrieves the specific database from the AWS Glue service. * * @param glueClient an instance of the AWS Glue client used to interact with the service * @param databaseName the name of the database to retrieve * @throws GlueException if there is an error retrieving the database from the AWS Glue service */ public static void getSpecificDatabase(GlueClient glueClient, String databaseName) { try { GetDatabaseRequest databasesRequest = GetDatabaseRequest.builder() .name(databaseName) .build(); GetDatabaseResponse response = glueClient.getDatabase(databasesRequest); Instant createDate = response.database().createTime(); // Convert the Instant to readable date. DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT) .withLocale(Locale.US) .withZone(ZoneId.systemDefault()); formatter.format(createDate); System.out.println("The create date of the database is " + createDate); } catch (GlueException e) { throw e; } } /** * Retrieves the names of the tables in the specified Glue database. * * @param glueClient the Glue client to use for the operation * @param dbName the name of the Glue database to retrieve the table names from * @return the name of the first table retrieved, or an empty string if no tables were found */ public static String getGlueTables(GlueClient glueClient, String dbName) { String myTableName = ""; try { GetTablesRequest tableRequest = GetTablesRequest.builder() .databaseName(dbName) .build(); GetTablesResponse response = glueClient.getTables(tableRequest); List<Table> tables = response.tableList(); if (tables.isEmpty()) { System.out.println("No tables were returned"); } else { for (Table table : tables) { myTableName = table.name(); System.out.println("Table name is: " + myTableName); } } } catch (GlueException e) { throw e; } return myTableName; } /** * Starts a job run in AWS Glue. * * @param glueClient the AWS Glue client to use for the job run * @param jobName the name of the Glue job to run * @param inputDatabase the name of the input database * @param inputTable the name of the input table * @param outBucket the URL of the output S3 bucket * @throws GlueException if there is an error starting the job run */ public static void startJob(GlueClient glueClient, String jobName, String inputDatabase, String inputTable, String outBucket) { try { Map<String, String> myMap = new HashMap<>(); myMap.put("--input_database", inputDatabase); myMap.put("--input_table", inputTable); myMap.put("--output_bucket_url", outBucket); StartJobRunRequest runRequest = StartJobRunRequest.builder() .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .arguments(myMap) .jobName(jobName) .build(); StartJobRunResponse response = glueClient.startJobRun(runRequest); System.out.println("The request Id of the job is " + response.responseMetadata().requestId()); } catch (GlueException e) { throw e; } } /** * Creates a new AWS Glue job. * * @param glueClient the AWS Glue client to use for the operation * @param jobName the name of the job to create * @param iam the IAM role to associate with the job * @param scriptLocation the location of the script to be used by the job * @throws GlueException if there is an error creating the job */ public static void createJob(GlueClient glueClient, String jobName, String iam, String scriptLocation) { try { JobCommand command = JobCommand.builder() .pythonVersion("3") .name("glueetl") .scriptLocation(scriptLocation) .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .description("A Job created by using the AWS SDK for Java V2") .glueVersion("2.0") .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .name(jobName) .role(iam) .command(command) .build(); glueClient.createJob(jobRequest); System.out.println(jobName + " was successfully created."); } catch (GlueException e) { throw e; } } /** * Retrieves and prints information about all the jobs in the Glue data catalog. * * @param glueClient the Glue client used to interact with the AWS Glue service */ public static void getAllJobs(GlueClient glueClient) { try { GetJobsRequest jobsRequest = GetJobsRequest.builder() .maxResults(10) .build(); GetJobsResponse jobsResponse = glueClient.getJobs(jobsRequest); List<Job> jobs = jobsResponse.jobs(); for (Job job : jobs) { System.out.println("Job name is : " + job.name()); System.out.println("The job worker type is : " + job.workerType().name()); } } catch (GlueException e) { throw e; } } /** * Retrieves the job runs for a given Glue job and prints the status of the job runs. * * @param glueClient the Glue client used to make API calls * @param jobName the name of the Glue job to retrieve the job runs for */ public static void getJobRuns(GlueClient glueClient, String jobName) { try { GetJobRunsRequest runsRequest = GetJobRunsRequest.builder() .jobName(jobName) .maxResults(20) .build(); boolean jobDone = false; while (!jobDone) { GetJobRunsResponse response = glueClient.getJobRuns(runsRequest); List<JobRun> jobRuns = response.jobRuns(); for (JobRun jobRun : jobRuns) { String jobState = jobRun.jobRunState().name(); if (jobState.compareTo("SUCCEEDED") == 0) { System.out.println(jobName + " has succeeded"); jobDone = true; } else if (jobState.compareTo("STOPPED") == 0) { System.out.println("Job run has stopped"); jobDone = true; } else if (jobState.compareTo("FAILED") == 0) { System.out.println("Job run has failed"); jobDone = true; } else if (jobState.compareTo("TIMEOUT") == 0) { System.out.println("Job run has timed out"); jobDone = true; } else { System.out.println("*** Job run state is " + jobRun.jobRunState().name()); System.out.println("Job run Id is " + jobRun.id()); System.out.println("The Glue version is " + jobRun.glueVersion()); } TimeUnit.SECONDS.sleep(5); } } } catch (GlueException e) { throw e; } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Deletes a Glue job. * * @param glueClient the Glue client to use for the operation * @param jobName the name of the job to be deleted * @throws GlueException if there is an error deleting the job */ public static void deleteJob(GlueClient glueClient, String jobName) { try { DeleteJobRequest jobRequest = DeleteJobRequest.builder() .jobName(jobName) .build(); glueClient.deleteJob(jobRequest); System.out.println(jobName + " was successfully deleted"); } catch (GlueException e) { throw e; } } /** * Deletes a AWS Glue Database. * * @param glueClient An instance of the AWS Glue client used to interact with the AWS Glue service. * @param databaseName The name of the database to be deleted. * @throws GlueException If an error occurs while deleting the database. */ public static void deleteDatabase(GlueClient glueClient, String databaseName) { try { DeleteDatabaseRequest request = DeleteDatabaseRequest.builder() .name(databaseName) .build(); glueClient.deleteDatabase(request); System.out.println(databaseName + " was successfully deleted"); } catch (GlueException e) { throw e; } } /** * Deletes a specific AWS Glue crawler. * * @param glueClient the AWS Glue client object * @param crawlerName the name of the crawler to be deleted * @throws GlueException if an error occurs during the deletion process */ public static void deleteSpecificCrawler(GlueClient glueClient, String crawlerName) { try { DeleteCrawlerRequest deleteCrawlerRequest = DeleteCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.deleteCrawler(deleteCrawlerRequest); System.out.println(crawlerName + " was deleted"); } catch (GlueException e) { throw e; } } private static void waitForInputToContinue(Scanner scanner) { while (true) { System.out.println(""); System.out.println("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { System.out.println("Continuing with the program..."); System.out.println(""); break; } else { // Handle invalid input. System.out.println("Invalid input. Please try again."); } } } }
JavaScript
SDK for JavaScript (v3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예제 리포지토리에서 전체 예제를 찾고 설정 및 실행하는 방법을 배워보세요.

공용 Amazon Simple Storage Service (S3) 버킷을 크롤링하고 검색한 CSV 형식의 데이터를 설명하는 메타데이터 데이터베이스를 생성하는 크롤러를 만들고 실행합니다.

const createCrawler = (name, role, dbName, tablePrefix, s3TargetPath) => { const client = new GlueClient({}); const command = new CreateCrawlerCommand({ Name: name, Role: role, DatabaseName: dbName, TablePrefix: tablePrefix, Targets: { S3Targets: [{ Path: s3TargetPath }], }, }); return client.send(command); }; const getCrawler = (name) => { const client = new GlueClient({}); const command = new GetCrawlerCommand({ Name: name, }); return client.send(command); }; const startCrawler = (name) => { const client = new GlueClient({}); const command = new StartCrawlerCommand({ Name: name, }); return client.send(command); }; const crawlerExists = async ({ getCrawler }, crawlerName) => { try { await getCrawler(crawlerName); return true; } catch { return false; } }; /** * @param {{ createCrawler: import('../../../actions/create-crawler.js').createCrawler}} actions */ const makeCreateCrawlerStep = (actions) => async (context) => { if (await crawlerExists(actions, process.env.CRAWLER_NAME)) { log("Crawler already exists. Skipping creation."); } else { await actions.createCrawler( process.env.CRAWLER_NAME, process.env.ROLE_NAME, process.env.DATABASE_NAME, process.env.TABLE_PREFIX, process.env.S3_TARGET_PATH, ); log("Crawler created successfully.", { type: "success" }); } return { ...context }; }; /** * @param {(name: string) => Promise<import('@aws-sdk/client-glue').GetCrawlerCommandOutput>} getCrawler * @param {string} crawlerName */ const waitForCrawler = async (getCrawler, crawlerName) => { const waitTimeInSeconds = 30; const { Crawler } = await getCrawler(crawlerName); if (!Crawler) { throw new Error(`Crawler with name ${crawlerName} not found.`); } if (Crawler.State === "READY") { return; } log(`Crawler is ${Crawler.State}. Waiting ${waitTimeInSeconds} seconds...`); await wait(waitTimeInSeconds); return waitForCrawler(getCrawler, crawlerName); }; const makeStartCrawlerStep = ({ startCrawler, getCrawler }) => async (context) => { log("Starting crawler."); await startCrawler(process.env.CRAWLER_NAME); log("Crawler started.", { type: "success" }); log("Waiting for crawler to finish running. This can take a while."); await waitForCrawler(getCrawler, process.env.CRAWLER_NAME); log("Crawler ready.", { type: "success" }); return { ...context }; };

AWS Glue Data Catalog의 데이터베이스 및 테이블에 대한 정보를 나열합니다.

const getDatabase = (name) => { const client = new GlueClient({}); const command = new GetDatabaseCommand({ Name: name, }); return client.send(command); }; const getTables = (databaseName) => { const client = new GlueClient({}); const command = new GetTablesCommand({ DatabaseName: databaseName, }); return client.send(command); }; const makeGetDatabaseStep = ({ getDatabase }) => async (context) => { const { Database: { Name }, } = await getDatabase(process.env.DATABASE_NAME); log(`Database: ${Name}`); return { ...context }; }; /** * @param {{ getTables: () => Promise<import('@aws-sdk/client-glue').GetTablesCommandOutput}} config */ const makeGetTablesStep = ({ getTables }) => async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME); log("Tables:"); log(TableList.map((table) => ` • ${table.Name}\n`)); return { ...context }; };

소스 Amazon S3 버킷에서 CSV 데이터를 추출하고, 필드를 제거하고 이름을 변경하여 변환하고, JSON 형식의 출력을 다른 Amazon S3 버킷으로 로드하는 작업을 만들고 실행합니다.

const createJob = (name, role, scriptBucketName, scriptKey) => { const client = new GlueClient({}); const command = new CreateJobCommand({ Name: name, Role: role, Command: { Name: "glueetl", PythonVersion: "3", ScriptLocation: `s3://${scriptBucketName}/${scriptKey}`, }, GlueVersion: "3.0", }); return client.send(command); }; const startJobRun = (jobName, dbName, tableName, bucketName) => { const client = new GlueClient({}); const command = new StartJobRunCommand({ JobName: jobName, Arguments: { "--input_database": dbName, "--input_table": tableName, "--output_bucket_url": `s3://${bucketName}/`, }, }); return client.send(command); }; const makeCreateJobStep = ({ createJob }) => async (context) => { log("Creating Job."); await createJob( process.env.JOB_NAME, process.env.ROLE_NAME, process.env.BUCKET_NAME, process.env.PYTHON_SCRIPT_KEY, ); log("Job created.", { type: "success" }); return { ...context }; }; /** * @param {(name: string, runId: string) => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput> } getJobRun * @param {string} jobName * @param {string} jobRunId */ const waitForJobRun = async (getJobRun, jobName, jobRunId) => { const waitTimeInSeconds = 30; const { JobRun } = await getJobRun(jobName, jobRunId); if (!JobRun) { throw new Error(`Job run with id ${jobRunId} not found.`); } switch (JobRun.JobRunState) { case "FAILED": case "TIMEOUT": case "STOPPED": case "ERROR": throw new Error( `Job ${JobRun.JobRunState}. Error: ${JobRun.ErrorMessage}`, ); case "SUCCEEDED": return; default: break; } log( `Job ${JobRun.JobRunState}. Waiting ${waitTimeInSeconds} more seconds...`, ); await wait(waitTimeInSeconds); return waitForJobRun(getJobRun, jobName, jobRunId); }; /** * @param {{ prompter: { prompt: () => Promise<{ shouldOpen: boolean }>} }} context */ const promptToOpen = async (context) => { const { shouldOpen } = await context.prompter.prompt({ name: "shouldOpen", type: "confirm", message: "Open the output bucket in your browser?", }); if (shouldOpen) { return open( `https://s3.console.aws.amazon.com/s3/buckets/${process.env.BUCKET_NAME} to view the output.`, ); } }; const makeStartJobRunStep = ({ startJobRun, getJobRun }) => async (context) => { log("Starting job."); const { JobRunId } = await startJobRun( process.env.JOB_NAME, process.env.DATABASE_NAME, process.env.TABLE_NAME, process.env.BUCKET_NAME, ); log("Job started.", { type: "success" }); log("Waiting for job to finish running. This can take a while."); await waitForJobRun(getJobRun, process.env.JOB_NAME, JobRunId); log("Job run succeeded.", { type: "success" }); await promptToOpen(context); return { ...context }; };

작업 실행에 대한 정보를 나열하고 변환된 데이터 중 일부를 볼 수 있습니다.

const getJobRuns = (jobName) => { const client = new GlueClient({}); const command = new GetJobRunsCommand({ JobName: jobName, }); return client.send(command); }; const getJobRun = (jobName, jobRunId) => { const client = new GlueClient({}); const command = new GetJobRunCommand({ JobName: jobName, RunId: jobRunId, }); return client.send(command); }; /** * @typedef {{ prompter: { prompt: () => Promise<{jobName: string}> } }} Context */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput>} getJobRun */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunsCommandOutput} getJobRuns */ /** * * @param {getJobRun} getJobRun * @param {string} jobName * @param {string} jobRunId */ const logJobRunDetails = async (getJobRun, jobName, jobRunId) => { const { JobRun } = await getJobRun(jobName, jobRunId); log(JobRun, { type: "object" }); }; /** * * @param {{getJobRuns: getJobRuns, getJobRun: getJobRun }} funcs */ const makePickJobRunStep = ({ getJobRuns, getJobRun }) => async (/** @type { Context } */ context) => { if (context.selectedJobName) { const { JobRuns } = await getJobRuns(context.selectedJobName); const { jobRunId } = await context.prompter.prompt({ name: "jobRunId", type: "list", message: "Select a job run to see details.", choices: JobRuns.map((run) => run.Id), }); logJobRunDetails(getJobRun, context.selectedJobName, jobRunId); } return { ...context }; };

데모 중에 생성된 모든 리소스를 삭제합니다.

const deleteJob = (jobName) => { const client = new GlueClient({}); const command = new DeleteJobCommand({ JobName: jobName, }); return client.send(command); }; const deleteTable = (databaseName, tableName) => { const client = new GlueClient({}); const command = new DeleteTableCommand({ DatabaseName: databaseName, Name: tableName, }); return client.send(command); }; const deleteDatabase = (databaseName) => { const client = new GlueClient({}); const command = new DeleteDatabaseCommand({ Name: databaseName, }); return client.send(command); }; const deleteCrawler = (crawlerName) => { const client = new GlueClient({}); const command = new DeleteCrawlerCommand({ Name: crawlerName, }); return client.send(command); }; /** * * @param {import('../../../actions/delete-job.js').deleteJob} deleteJobFn * @param {string[]} jobNames * @param {{ prompter: { prompt: () => Promise<any> }}} context */ const handleDeleteJobs = async (deleteJobFn, jobNames, context) => { /** * @type {{ selectedJobNames: string[] }} */ const { selectedJobNames } = await context.prompter.prompt({ name: "selectedJobNames", type: "checkbox", message: "Let's clean up jobs. Select jobs to delete.", choices: jobNames, }); if (selectedJobNames.length === 0) { log("No jobs selected."); } else { log("Deleting jobs."); await Promise.all( selectedJobNames.map((n) => deleteJobFn(n).catch(console.error)), ); log("Jobs deleted.", { type: "success" }); } }; /** * @param {{ * listJobs: import('../../../actions/list-jobs.js').listJobs, * deleteJob: import('../../../actions/delete-job.js').deleteJob * }} config */ const makeCleanUpJobsStep = ({ listJobs, deleteJob }) => async (context) => { const { JobNames } = await listJobs(); if (JobNames.length > 0) { await handleDeleteJobs(deleteJob, JobNames, context); } return { ...context }; }; /** * @param {import('../../../actions/delete-table.js').deleteTable} deleteTable * @param {string} databaseName * @param {string[]} tableNames */ const deleteTables = (deleteTable, databaseName, tableNames) => Promise.all( tableNames.map((tableName) => deleteTable(databaseName, tableName).catch(console.error), ), ); /** * @param {{ * getTables: import('../../../actions/get-tables.js').getTables, * deleteTable: import('../../../actions/delete-table.js').deleteTable * }} config */ const makeCleanUpTablesStep = ({ getTables, deleteTable }) => /** * @param {{ prompter: { prompt: () => Promise<any>}}} context */ async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME).catch( () => ({ TableList: null }), ); if (TableList && TableList.length > 0) { /** * @type {{ tableNames: string[] }} */ const { tableNames } = await context.prompter.prompt({ name: "tableNames", type: "checkbox", message: "Let's clean up tables. Select tables to delete.", choices: TableList.map((t) => t.Name), }); if (tableNames.length === 0) { log("No tables selected."); } else { log("Deleting tables."); await deleteTables(deleteTable, process.env.DATABASE_NAME, tableNames); log("Tables deleted.", { type: "success" }); } } return { ...context }; }; /** * @param {import('../../../actions/delete-database.js').deleteDatabase} deleteDatabase * @param {string[]} databaseNames */ const deleteDatabases = (deleteDatabase, databaseNames) => Promise.all( databaseNames.map((dbName) => deleteDatabase(dbName).catch(console.error)), ); /** * @param {{ * getDatabases: import('../../../actions/get-databases.js').getDatabases * deleteDatabase: import('../../../actions/delete-database.js').deleteDatabase * }} config */ const makeCleanUpDatabasesStep = ({ getDatabases, deleteDatabase }) => /** * @param {{ prompter: { prompt: () => Promise<any>}} context */ async (context) => { const { DatabaseList } = await getDatabases(); if (DatabaseList.length > 0) { /** @type {{ dbNames: string[] }} */ const { dbNames } = await context.prompter.prompt({ name: "dbNames", type: "checkbox", message: "Let's clean up databases. Select databases to delete.", choices: DatabaseList.map((db) => db.Name), }); if (dbNames.length === 0) { log("No databases selected."); } else { log("Deleting databases."); await deleteDatabases(deleteDatabase, dbNames); log("Databases deleted.", { type: "success" }); } } return { ...context }; }; const cleanUpCrawlerStep = async (context) => { log("Deleting crawler."); try { await deleteCrawler(process.env.CRAWLER_NAME); log("Crawler deleted.", { type: "success" }); } catch (err) { if (err.name === "EntityNotFoundException") { log("Crawler is already deleted."); } else { throw err; } } return { ...context }; };
Kotlin
SDK for Kotlin
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예제 리포지토리에서 전체 예제를 찾고 설정 및 실행하는 방법을 배워보세요.

suspend fun main(args: Array<String>) { val usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName> <scriptLocation> <locationUri> Where: iam - The Amazon Resource Name (ARN) of the AWS Identity and Access Management (IAM) role that has AWS Glue and Amazon Simple Storage Service (Amazon S3) permissions. s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, CSV data). cron - A cron expression used to specify the schedule (for example, cron(15 12 * * ? *). dbName - The database name. crawlerName - The name of the crawler. jobName - The name you assign to this job definition. scriptLocation - Specifies the Amazon S3 path to a script that runs a job. locationUri - Specifies the location of the database """ if (args.size != 8) { println(usage) exitProcess(1) } val iam = args[0] val s3Path = args[1] val cron = args[2] val dbName = args[3] val crawlerName = args[4] val jobName = args[5] val scriptLocation = args[6] val locationUri = args[7] println("About to start the AWS Glue Scenario") createDatabase(dbName, locationUri) createCrawler(iam, s3Path, cron, dbName, crawlerName) getCrawler(crawlerName) startCrawler(crawlerName) getDatabase(dbName) getGlueTables(dbName) createJob(jobName, iam, scriptLocation) startJob(jobName) getJobs() getJobRuns(jobName) deleteJob(jobName) println("*** Wait for 5 MIN so the $crawlerName is ready to be deleted") TimeUnit.MINUTES.sleep(5) deleteMyDatabase(dbName) deleteCrawler(crawlerName) } suspend fun createDatabase( dbName: String?, locationUriVal: String?, ) { val input = DatabaseInput { description = "Built with the AWS SDK for Kotlin" name = dbName locationUri = locationUriVal } val request = CreateDatabaseRequest { databaseInput = input } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> glueClient.createDatabase(request) println("The database was successfully created") } } suspend fun createCrawler( iam: String?, s3Path: String?, cron: String?, dbName: String?, crawlerName: String, ) { val s3Target = S3Target { path = s3Path } val targetList = ArrayList<S3Target>() targetList.add(s3Target) val targetOb = CrawlerTargets { s3Targets = targetList } val crawlerRequest = CreateCrawlerRequest { databaseName = dbName name = crawlerName description = "Created by the AWS Glue Java API" targets = targetOb role = iam schedule = cron } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> glueClient.createCrawler(crawlerRequest) println("$crawlerName was successfully created") } } suspend fun getCrawler(crawlerName: String?) { val request = GetCrawlerRequest { name = crawlerName } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> val response = glueClient.getCrawler(request) val role = response.crawler?.role println("The role associated with this crawler is $role") } } suspend fun startCrawler(crawlerName: String) { val crawlerRequest = StartCrawlerRequest { name = crawlerName } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> glueClient.startCrawler(crawlerRequest) println("$crawlerName was successfully started.") } } suspend fun getDatabase(databaseName: String?) { val request = GetDatabaseRequest { name = databaseName } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> val response = glueClient.getDatabase(request) val dbDesc = response.database?.description println("The database description is $dbDesc") } } suspend fun getGlueTables(dbName: String?) { val tableRequest = GetTablesRequest { databaseName = dbName } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> val response = glueClient.getTables(tableRequest) response.tableList?.forEach { tableName -> println("Table name is ${tableName.name}") } } } suspend fun startJob(jobNameVal: String?) { val runRequest = StartJobRunRequest { workerType = WorkerType.G1X numberOfWorkers = 10 jobName = jobNameVal } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> val response = glueClient.startJobRun(runRequest) println("The job run Id is ${response.jobRunId}") } } suspend fun createJob( jobName: String, iam: String?, scriptLocationVal: String?, ) { val commandOb = JobCommand { pythonVersion = "3" name = "MyJob1" scriptLocation = scriptLocationVal } val jobRequest = CreateJobRequest { description = "A Job created by using the AWS SDK for Java V2" glueVersion = "2.0" workerType = WorkerType.G1X numberOfWorkers = 10 name = jobName role = iam command = commandOb } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> glueClient.createJob(jobRequest) println("$jobName was successfully created.") } } suspend fun getJobs() { val request = GetJobsRequest { maxResults = 10 } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobs(request) response.jobs?.forEach { job -> println("Job name is ${job.name}") } } } suspend fun getJobRuns(jobNameVal: String?) { val request = GetJobRunsRequest { jobName = jobNameVal } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobRuns(request) response.jobRuns?.forEach { job -> println("Job name is ${job.jobName}") } } } suspend fun deleteJob(jobNameVal: String) { val jobRequest = DeleteJobRequest { jobName = jobNameVal } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> glueClient.deleteJob(jobRequest) println("$jobNameVal was successfully deleted") } } suspend fun deleteMyDatabase(databaseName: String) { val request = DeleteDatabaseRequest { name = databaseName } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> glueClient.deleteDatabase(request) println("$databaseName was successfully deleted") } } suspend fun deleteCrawler(crawlerName: String) { val request = DeleteCrawlerRequest { name = crawlerName } GlueClient.fromEnvironment { region = "us-east-1" }.use { glueClient -> glueClient.deleteCrawler(request) println("$crawlerName was deleted") } }
PHP
SDK for PHP
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예제 리포지토리에서 전체 예제를 찾고 설정 및 실행하는 방법을 배워보세요.

namespace Glue; use Aws\Glue\GlueClient; use Aws\S3\S3Client; use AwsUtilities\AWSServiceClass; use GuzzleHttp\Psr7\Stream; use Iam\IAMService; class GettingStartedWithGlue { public function run() { echo("\n"); echo("--------------------------------------\n"); print("Welcome to the AWS Glue getting started demo using PHP!\n"); echo("--------------------------------------\n"); $clientArgs = [ 'region' => 'us-west-2', 'version' => 'latest', 'profile' => 'default', ]; $uniqid = uniqid(); $glueClient = new GlueClient($clientArgs); $glueService = new GlueService($glueClient); $iamService = new IAMService(); $crawlerName = "example-crawler-test-" . $uniqid; AWSServiceClass::$waitTime = 5; AWSServiceClass::$maxWaitAttempts = 20; $role = $iamService->getRole("AWSGlueServiceRole-DocExample"); $databaseName = "doc-example-database-$uniqid"; $path = 's3://crawler-public-us-east-1/flight/2016/csv'; $glueService->createCrawler($crawlerName, $role['Role']['Arn'], $databaseName, $path); $glueService->startCrawler($crawlerName); echo "Waiting for crawler"; do { $crawler = $glueService->getCrawler($crawlerName); echo "."; sleep(10); } while ($crawler['Crawler']['State'] != "READY"); echo "\n"; $database = $glueService->getDatabase($databaseName); echo "Found a database named " . $database['Database']['Name'] . "\n"; //Upload job script $s3client = new S3Client($clientArgs); $bucketName = "test-glue-bucket-" . $uniqid; $s3client->createBucket([ 'Bucket' => $bucketName, 'CreateBucketConfiguration' => ['LocationConstraint' => 'us-west-2'], ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'run_job.py', 'SourceFile' => __DIR__ . '/flight_etl_job_script.py' ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'setup_scenario_getting_started.yaml', 'SourceFile' => __DIR__ . '/setup_scenario_getting_started.yaml' ]); $tables = $glueService->getTables($databaseName); $jobName = 'test-job-' . $uniqid; $scriptLocation = "s3://$bucketName/run_job.py"; $job = $glueService->createJob($jobName, $role['Role']['Arn'], $scriptLocation); $outputBucketUrl = "s3://$bucketName"; $runId = $glueService->startJobRun($jobName, $databaseName, $tables, $outputBucketUrl)['JobRunId']; echo "waiting for job"; do { $jobRun = $glueService->getJobRun($jobName, $runId); echo "."; sleep(10); } while (!array_intersect([$jobRun['JobRun']['JobRunState']], ['SUCCEEDED', 'STOPPED', 'FAILED', 'TIMEOUT'])); echo "\n"; $jobRuns = $glueService->getJobRuns($jobName); $objects = $s3client->listObjects([ 'Bucket' => $bucketName, ])['Contents']; foreach ($objects as $object) { echo $object['Key'] . "\n"; } echo "Downloading " . $objects[1]['Key'] . "\n"; /** @var Stream $downloadObject */ $downloadObject = $s3client->getObject([ 'Bucket' => $bucketName, 'Key' => $objects[1]['Key'], ])['Body']->getContents(); echo "Here is the first 1000 characters in the object."; echo substr($downloadObject, 0, 1000); $jobs = $glueService->listJobs(); echo "Current jobs:\n"; foreach ($jobs['JobNames'] as $jobsName) { echo "{$jobsName}\n"; } echo "Delete the job.\n"; $glueClient->deleteJob([ 'JobName' => $job['Name'], ]); echo "Delete the tables.\n"; foreach ($tables['TableList'] as $table) { $glueService->deleteTable($table['Name'], $databaseName); } echo "Delete the databases.\n"; $glueClient->deleteDatabase([ 'Name' => $databaseName, ]); echo "Delete the crawler.\n"; $glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); $deleteObjects = $s3client->listObjectsV2([ 'Bucket' => $bucketName, ]); echo "Delete all objects in the bucket.\n"; $deleteObjects = $s3client->deleteObjects([ 'Bucket' => $bucketName, 'Delete' => [ 'Objects' => $deleteObjects['Contents'], ] ]); echo "Delete the bucket.\n"; $s3client->deleteBucket(['Bucket' => $bucketName]); echo "This job was brought to you by the number $uniqid\n"; } } namespace Glue; use Aws\Glue\GlueClient; use Aws\Result; use function PHPUnit\Framework\isEmpty; class GlueService extends \AwsUtilities\AWSServiceClass { protected GlueClient $glueClient; public function __construct($glueClient) { $this->glueClient = $glueClient; } public function getCrawler($crawlerName) { return $this->customWaiter(function () use ($crawlerName) { return $this->glueClient->getCrawler([ 'Name' => $crawlerName, ]); }); } public function createCrawler($crawlerName, $role, $databaseName, $path): Result { return $this->customWaiter(function () use ($crawlerName, $role, $databaseName, $path) { return $this->glueClient->createCrawler([ 'Name' => $crawlerName, 'Role' => $role, 'DatabaseName' => $databaseName, 'Targets' => [ 'S3Targets' => [[ 'Path' => $path, ]] ], ]); }); } public function startCrawler($crawlerName): Result { return $this->glueClient->startCrawler([ 'Name' => $crawlerName, ]); } public function getDatabase(string $databaseName): Result { return $this->customWaiter(function () use ($databaseName) { return $this->glueClient->getDatabase([ 'Name' => $databaseName, ]); }); } public function getTables($databaseName): Result { return $this->glueClient->getTables([ 'DatabaseName' => $databaseName, ]); } public function createJob($jobName, $role, $scriptLocation, $pythonVersion = '3', $glueVersion = '3.0'): Result { return $this->glueClient->createJob([ 'Name' => $jobName, 'Role' => $role, 'Command' => [ 'Name' => 'glueetl', 'ScriptLocation' => $scriptLocation, 'PythonVersion' => $pythonVersion, ], 'GlueVersion' => $glueVersion, ]); } public function startJobRun($jobName, $databaseName, $tables, $outputBucketUrl): Result { return $this->glueClient->startJobRun([ 'JobName' => $jobName, 'Arguments' => [ 'input_database' => $databaseName, 'input_table' => $tables['TableList'][0]['Name'], 'output_bucket_url' => $outputBucketUrl, '--input_database' => $databaseName, '--input_table' => $tables['TableList'][0]['Name'], '--output_bucket_url' => $outputBucketUrl, ], ]); } public function listJobs($maxResults = null, $nextToken = null, $tags = []): Result { $arguments = []; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } if (!empty($tags)) { $arguments['Tags'] = $tags; } return $this->glueClient->listJobs($arguments); } public function getJobRuns($jobName, $maxResults = 0, $nextToken = ''): Result { $arguments = ['JobName' => $jobName]; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } return $this->glueClient->getJobRuns($arguments); } public function getJobRun($jobName, $runId, $predecessorsIncluded = false): Result { return $this->glueClient->getJobRun([ 'JobName' => $jobName, 'RunId' => $runId, 'PredecessorsIncluded' => $predecessorsIncluded, ]); } public function deleteJob($jobName) { return $this->glueClient->deleteJob([ 'JobName' => $jobName, ]); } public function deleteTable($tableName, $databaseName) { return $this->glueClient->deleteTable([ 'DatabaseName' => $databaseName, 'Name' => $tableName, ]); } public function deleteDatabase($databaseName) { return $this->glueClient->deleteDatabase([ 'Name' => $databaseName, ]); } public function deleteCrawler($crawlerName) { return $this->glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); } }
Python
SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예제 리포지토리에서 전체 예제를 찾고 설정 및 실행하는 방법을 배워보세요.

시나리오에 사용된 AWS Glue 함수를 래핑하는 클래스를 만듭니다.

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_crawler(self, name): """ Gets information about a crawler. :param name: The name of the crawler to look up. :return: Data about the crawler. """ crawler = None try: response = self.glue_client.get_crawler(Name=name) crawler = response["Crawler"] except ClientError as err: if err.response["Error"]["Code"] == "EntityNotFoundException": logger.info("Crawler %s doesn't exist.", name) else: logger.error( "Couldn't get crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return crawler def create_crawler(self, name, role_arn, db_name, db_prefix, s3_target): """ Creates a crawler that can crawl the specified target and populate a database in your AWS Glue Data Catalog with metadata that describes the data in the target. :param name: The name of the crawler. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param db_name: The name to give the database that is created by the crawler. :param db_prefix: The prefix to give any database tables that are created by the crawler. :param s3_target: The URL to an S3 bucket that contains data that is the target of the crawler. """ try: self.glue_client.create_crawler( Name=name, Role=role_arn, DatabaseName=db_name, TablePrefix=db_prefix, Targets={"S3Targets": [{"Path": s3_target}]}, ) except ClientError as err: logger.error( "Couldn't create crawler. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_crawler(self, name): """ Starts a crawler. The crawler crawls its configured target and creates metadata that describes the data it finds in the target data source. :param name: The name of the crawler to start. """ try: self.glue_client.start_crawler(Name=name) except ClientError as err: logger.error( "Couldn't start crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_database(self, name): """ Gets information about a database in your Data Catalog. :param name: The name of the database to look up. :return: Information about the database. """ try: response = self.glue_client.get_database(Name=name) except ClientError as err: logger.error( "Couldn't get database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Database"] def get_tables(self, db_name): """ Gets a list of tables in a Data Catalog database. :param db_name: The name of the database to query. :return: The list of tables in the database. """ try: response = self.glue_client.get_tables(DatabaseName=db_name) except ClientError as err: logger.error( "Couldn't get tables %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["TableList"] def create_job(self, name, description, role_arn, script_location): """ Creates a job definition for an extract, transform, and load (ETL) job that can be run by AWS Glue. :param name: The name of the job definition. :param description: The description of the job definition. :param role_arn: The ARN of an IAM role that grants AWS Glue the permissions it requires to run the job. :param script_location: The Amazon S3 URL of a Python ETL script that is run as part of the job. The script defines how the data is transformed. """ try: self.glue_client.create_job( Name=name, Description=description, Role=role_arn, Command={ "Name": "glueetl", "ScriptLocation": script_location, "PythonVersion": "3", }, GlueVersion="3.0", ) except ClientError as err: logger.error( "Couldn't create job %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_job_run(self, name, input_database, input_table, output_bucket_name): """ Starts a job run. A job run extracts data from the source, transforms it, and loads it to the output bucket. :param name: The name of the job definition. :param input_database: The name of the metadata database that contains tables that describe the source data. This is typically created by a crawler. :param input_table: The name of the table in the metadata database that describes the source data. :param output_bucket_name: The S3 bucket where the output is written. :return: The ID of the job run. """ try: # The custom Arguments that are passed to this function are used by the # Python ETL script to determine the location of input and output data. response = self.glue_client.start_job_run( JobName=name, Arguments={ "--input_database": input_database, "--input_table": input_table, "--output_bucket_url": f"s3://{output_bucket_name}/", }, ) except ClientError as err: logger.error( "Couldn't start job run %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRunId"] def list_jobs(self): """ Lists the names of job definitions in your account. :return: The list of job definition names. """ try: response = self.glue_client.list_jobs() except ClientError as err: logger.error( "Couldn't list jobs. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobNames"] def get_job_runs(self, job_name): """ Gets information about runs that have been performed for a specific job definition. :param job_name: The name of the job definition to look up. :return: The list of job runs. """ try: response = self.glue_client.get_job_runs(JobName=job_name) except ClientError as err: logger.error( "Couldn't get job runs for %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRuns"] def get_job_run(self, name, run_id): """ Gets information about a single job run. :param name: The name of the job definition for the run. :param run_id: The ID of the run. :return: Information about the run. """ try: response = self.glue_client.get_job_run(JobName=name, RunId=run_id) except ClientError as err: logger.error( "Couldn't get job run %s/%s. Here's why: %s: %s", name, run_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRun"] def delete_job(self, job_name): """ Deletes a job definition. This also deletes data about all runs that are associated with this job definition. :param job_name: The name of the job definition to delete. """ try: self.glue_client.delete_job(JobName=job_name) except ClientError as err: logger.error( "Couldn't delete job %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_table(self, db_name, table_name): """ Deletes a table from a metadata database. :param db_name: The name of the database that contains the table. :param table_name: The name of the table to delete. """ try: self.glue_client.delete_table(DatabaseName=db_name, Name=table_name) except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_database(self, name): """ Deletes a metadata database from your Data Catalog. :param name: The name of the database to delete. """ try: self.glue_client.delete_database(Name=name) except ClientError as err: logger.error( "Couldn't delete database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_crawler(self, name): """ Deletes a crawler. :param name: The name of the crawler to delete. """ try: self.glue_client.delete_crawler(Name=name) except ClientError as err: logger.error( "Couldn't delete crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

시나리오를 실행하는 클래스를 생성합니다.

class GlueCrawlerJobScenario: """ Encapsulates a scenario that shows how to create an AWS Glue crawler and job and use them to transform data from CSV to JSON format. """ def __init__(self, glue_client, glue_service_role, glue_bucket): """ :param glue_client: A Boto3 AWS Glue client. :param glue_service_role: An AWS Identity and Access Management (IAM) role that AWS Glue can assume to gain access to the resources it requires. :param glue_bucket: An S3 bucket that can hold a job script and output data from AWS Glue job runs. """ self.glue_client = glue_client self.glue_service_role = glue_service_role self.glue_bucket = glue_bucket @staticmethod def wait(seconds, tick=12): """ Waits for a specified number of seconds, while also displaying an animated spinner. :param seconds: The number of seconds to wait. :param tick: The number of frames per second used to animate the spinner. """ progress = "|/-\\" waited = 0 while waited < seconds: for frame in range(tick): sys.stdout.write(f"\r{progress[frame % len(progress)]}") sys.stdout.flush() time.sleep(1 / tick) waited += 1 def upload_job_script(self, job_script): """ Uploads a Python ETL script to an S3 bucket. The script is used by the AWS Glue job to transform data. :param job_script: The relative path to the job script. """ try: self.glue_bucket.upload_file(Filename=job_script, Key=job_script) print(f"Uploaded job script '{job_script}' to the example bucket.") except S3UploadFailedError as err: logger.error("Couldn't upload job script. Here's why: %s", err) raise def run(self, crawler_name, db_name, db_prefix, data_source, job_script, job_name): """ Runs the scenario. This is an interactive experience that runs at a command prompt and asks you for input throughout. :param crawler_name: The name of the crawler used in the scenario. If the crawler does not exist, it is created. :param db_name: The name to give the metadata database created by the crawler. :param db_prefix: The prefix to give tables added to the database by the crawler. :param data_source: The location of the data source that is targeted by the crawler and extracted during job runs. :param job_script: The job script that is used to transform data during job runs. :param job_name: The name to give the job definition that is created during the scenario. """ wrapper = GlueWrapper(self.glue_client) print(f"Checking for crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) if crawler is None: print(f"Creating crawler {crawler_name}.") wrapper.create_crawler( crawler_name, self.glue_service_role.arn, db_name, db_prefix, data_source, ) print(f"Created crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) pprint(crawler) print("-" * 88) print( f"When you run the crawler, it crawls data stored in {data_source} and " f"creates a metadata database in the AWS Glue Data Catalog that describes " f"the data in the data source." ) print("In this example, the source data is in CSV format.") ready = False while not ready: ready = Question.ask_question( "Ready to start the crawler? (y/n) ", Question.is_yesno ) wrapper.start_crawler(crawler_name) print("Let's wait for the crawler to run. This typically takes a few minutes.") crawler_state = None while crawler_state != "READY": self.wait(10) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler["State"] print(f"Crawler is {crawler['State']}.") print("-" * 88) database = wrapper.get_database(db_name) print(f"The crawler created database {db_name}:") pprint(database) print(f"The database contains these tables:") tables = wrapper.get_tables(db_name) for index, table in enumerate(tables): print(f"\t{index + 1}. {table['Name']}") table_index = Question.ask_question( f"Enter the number of a table to see more detail: ", Question.is_int, Question.in_range(1, len(tables)), ) pprint(tables[table_index - 1]) print("-" * 88) print(f"Creating job definition {job_name}.") wrapper.create_job( job_name, "Getting started example job.", self.glue_service_role.arn, f"s3://{self.glue_bucket.name}/{job_script}", ) print("Created job definition.") print( f"When you run the job, it extracts data from {data_source}, transforms it " f"by using the {job_script} script, and loads the output into " f"S3 bucket {self.glue_bucket.name}." ) print( "In this example, the data is transformed from CSV to JSON, and only a few " "fields are included in the output." ) job_run_status = None if Question.ask_question(f"Ready to run? (y/n) ", Question.is_yesno): job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["Name"], self.glue_bucket.name ) print(f"Job {job_name} started. Let's wait for it to run.") while job_run_status not in ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"]: self.wait(10) job_run = wrapper.get_job_run(job_name, job_run_id) job_run_status = job_run["JobRunState"] print(f"Job {job_name}/{job_run_id} is {job_run_status}.") print("-" * 88) if job_run_status == "SUCCEEDED": print( f"Data from your job run is stored in your S3 bucket '{self.glue_bucket.name}':" ) try: keys = [ obj.key for obj in self.glue_bucket.objects.filter(Prefix="run-") ] for index, key in enumerate(keys): print(f"\t{index + 1}: {key}") lines = 4 key_index = Question.ask_question( f"Enter the number of a block to download it and see the first {lines} " f"lines of JSON output in the block: ", Question.is_int, Question.in_range(1, len(keys)), ) job_data = io.BytesIO() self.glue_bucket.download_fileobj(keys[key_index - 1], job_data) job_data.seek(0) for _ in range(lines): print(job_data.readline().decode("utf-8")) except ClientError as err: logger.error( "Couldn't get job run data. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise print("-" * 88) job_names = wrapper.list_jobs() if job_names: print(f"Your account has {len(job_names)} jobs defined:") for index, job_name in enumerate(job_names): print(f"\t{index + 1}. {job_name}") job_index = Question.ask_question( f"Enter a number between 1 and {len(job_names)} to see the list of runs for " f"a job: ", Question.is_int, Question.in_range(1, len(job_names)), ) job_runs = wrapper.get_job_runs(job_names[job_index - 1]) if job_runs: print(f"Found {len(job_runs)} runs for job {job_names[job_index - 1]}:") for index, job_run in enumerate(job_runs): print( f"\t{index + 1}. {job_run['JobRunState']} on " f"{job_run['CompletedOn']:%Y-%m-%d %H:%M:%S}" ) run_index = Question.ask_question( f"Enter a number between 1 and {len(job_runs)} to see details for a run: ", Question.is_int, Question.in_range(1, len(job_runs)), ) pprint(job_runs[run_index - 1]) else: print(f"No runs found for job {job_names[job_index - 1]}") else: print("Your account doesn't have any jobs defined.") print("-" * 88) print( f"Let's clean up. During this example we created job definition '{job_name}'." ) if Question.ask_question( "Do you want to delete the definition and all runs? (y/n) ", Question.is_yesno, ): wrapper.delete_job(job_name) print(f"Job definition '{job_name}' deleted.") tables = wrapper.get_tables(db_name) print(f"We also created database '{db_name}' that contains these tables:") for table in tables: print(f"\t{table['Name']}") if Question.ask_question( "Do you want to delete the tables and the database? (y/n) ", Question.is_yesno, ): for table in tables: wrapper.delete_table(db_name, table["Name"]) print(f"Deleted table {table['Name']}.") wrapper.delete_database(db_name) print(f"Deleted database {db_name}.") print(f"We also created crawler '{crawler_name}'.") if Question.ask_question( "Do you want to delete the crawler? (y/n) ", Question.is_yesno ): wrapper.delete_crawler(crawler_name) print(f"Deleted crawler {crawler_name}.") print("-" * 88) def parse_args(args): """ Parse command line arguments. :param args: The command line arguments. :return: The parsed arguments. """ parser = argparse.ArgumentParser( description="Runs the AWS Glue getting started with crawlers and jobs scenario. " "Before you run this scenario, set up scaffold resources by running " "'python scaffold.py deploy'." ) parser.add_argument( "role_name", help="The name of an IAM role that AWS Glue can assume. This role must grant access " "to Amazon S3 and to the permissions granted by the AWSGlueServiceRole " "managed policy.", ) parser.add_argument( "bucket_name", help="The name of an S3 bucket that AWS Glue can access to get the job script and " "put job results.", ) parser.add_argument( "--job_script", default="flight_etl_job_script.py", help="The name of the job script file that is used in the scenario.", ) return parser.parse_args(args) def main(): args = parse_args(sys.argv[1:]) try: print("-" * 88) print( "Welcome to the AWS Glue getting started with crawlers and jobs scenario." ) print("-" * 88) scenario = GlueCrawlerJobScenario( boto3.client("glue"), boto3.resource("iam").Role(args.role_name), boto3.resource("s3").Bucket(args.bucket_name), ) scenario.upload_job_script(args.job_script) scenario.run( "doc-example-crawler", "doc-example-database", "doc-example-", "s3://crawler-public-us-east-1/flight/2016/csv", args.job_script, "doc-example-job", ) print("-" * 88) print( "To destroy scaffold resources, including the IAM role and S3 bucket " "used in this scenario, run 'python scaffold.py destroy'." ) print("\nThanks for watching!") print("-" * 88) except Exception: logging.exception("Something went wrong with the example.")

AWS Glue에서 사용된 ETL 스크립트를 생성하여 작업 실행 중에 데이터를 추출, 전환, 적재(ETL)합니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
Ruby
SDK for Ruby
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예제 리포지토리에서 전체 예제를 찾고 설정 및 실행하는 방법을 배워보세요.

시나리오에 사용된 AWS Glue 함수를 래핑하는 클래스를 만듭니다.

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves information about a specific crawler. # # @param name [String] The name of the crawler to retrieve information about. # @return [Aws::Glue::Types::Crawler, nil] The crawler object if found, or nil if not found. def get_crawler(name) @glue_client.get_crawler(name: name) rescue Aws::Glue::Errors::EntityNotFoundException @logger.info("Crawler #{name} doesn't exist.") false rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get crawler #{name}: \n#{e.message}") raise end # Creates a new crawler with the specified configuration. # # @param name [String] The name of the crawler. # @param role_arn [String] The ARN of the IAM role to be used by the crawler. # @param db_name [String] The name of the database where the crawler stores its metadata. # @param db_prefix [String] The prefix to be added to the names of tables that the crawler creates. # @param s3_target [String] The S3 path that the crawler will crawl. # @return [void] def create_crawler(name, role_arn, db_name, _db_prefix, s3_target) @glue_client.create_crawler( name: name, role: role_arn, database_name: db_name, targets: { s3_targets: [ { path: s3_target } ] } ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create crawler: \n#{e.message}") raise end # Starts a crawler with the specified name. # # @param name [String] The name of the crawler to start. # @return [void] def start_crawler(name) @glue_client.start_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not start crawler #{name}: \n#{e.message}") raise end # Deletes a crawler with the specified name. # # @param name [String] The name of the crawler to delete. # @return [void] def delete_crawler(name) @glue_client.delete_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete crawler #{name}: \n#{e.message}") raise end # Retrieves information about a specific database. # # @param name [String] The name of the database to retrieve information about. # @return [Aws::Glue::Types::Database, nil] The database object if found, or nil if not found. def get_database(name) response = @glue_client.get_database(name: name) response.database rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get database #{name}: \n#{e.message}") raise end # Retrieves a list of tables in the specified database. # # @param db_name [String] The name of the database to retrieve tables from. # @return [Array<Aws::Glue::Types::Table>] def get_tables(db_name) response = @glue_client.get_tables(database_name: db_name) response.table_list rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get tables #{db_name}: \n#{e.message}") raise end # Creates a new job with the specified configuration. # # @param name [String] The name of the job. # @param description [String] The description of the job. # @param role_arn [String] The ARN of the IAM role to be used by the job. # @param script_location [String] The location of the ETL script for the job. # @return [void] def create_job(name, description, role_arn, script_location) @glue_client.create_job( name: name, description: description, role: role_arn, command: { name: 'glueetl', script_location: script_location, python_version: '3' }, glue_version: '3.0' ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create job #{name}: \n#{e.message}") raise end # Starts a job run for the specified job. # # @param name [String] The name of the job to start the run for. # @param input_database [String] The name of the input database for the job. # @param input_table [String] The name of the input table for the job. # @param output_bucket_name [String] The name of the output S3 bucket for the job. # @return [String] The ID of the started job run. def start_job_run(name, input_database, input_table, output_bucket_name) response = @glue_client.start_job_run( job_name: name, arguments: { '--input_database': input_database, '--input_table': input_table, '--output_bucket_url': "s3://#{output_bucket_name}/" } ) response.job_run_id rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not start job run #{name}: \n#{e.message}") raise end # Retrieves a list of jobs in AWS Glue. # # @return [Aws::Glue::Types::ListJobsResponse] def list_jobs @glue_client.list_jobs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not list jobs: \n#{e.message}") raise end # Retrieves a list of job runs for the specified job. # # @param job_name [String] The name of the job to retrieve job runs for. # @return [Array<Aws::Glue::Types::JobRun>] def get_job_runs(job_name) response = @glue_client.get_job_runs(job_name: job_name) response.job_runs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Retrieves data for a specific job run. # # @param job_name [String] The name of the job run to retrieve data for. # @return [Glue::Types::GetJobRunResponse] def get_job_run(job_name, run_id) @glue_client.get_job_run(job_name: job_name, run_id: run_id) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Deletes a job with the specified name. # # @param job_name [String] The name of the job to delete. # @return [void] def delete_job(job_name) @glue_client.delete_job(job_name: job_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Deletes a table with the specified name. # # @param database_name [String] The name of the catalog database in which the table resides. # @param table_name [String] The name of the table to be deleted. # @return [void] def delete_table(database_name, table_name) @glue_client.delete_table(database_name: database_name, name: table_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Removes a specified database from a Data Catalog. # # @param database_name [String] The name of the database to delete. # @return [void] def delete_database(database_name) @glue_client.delete_database(name: database_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete database: \n#{e.message}") end # Uploads a job script file to an S3 bucket. # # @param file_path [String] The local path of the job script file. # @param bucket_resource [Aws::S3::Bucket] The S3 bucket resource to upload the file to. # @return [void] def upload_job_script(file_path, bucket_resource) File.open(file_path) do |file| bucket_resource.client.put_object({ body: file, bucket: bucket_resource.name, key: file_path }) end rescue Aws::S3::Errors::S3UploadFailedError => e @logger.error("S3 could not upload job script: \n#{e.message}") raise end end

시나리오를 실행하는 클래스를 생성합니다.

class GlueCrawlerJobScenario def initialize(glue_client, glue_service_role, glue_bucket, logger) @glue_client = glue_client @glue_service_role = glue_service_role @glue_bucket = glue_bucket @logger = logger end def run(crawler_name, db_name, db_prefix, data_source, job_script, job_name) wrapper = GlueWrapper.new(@glue_client, @logger) setup_crawler(wrapper, crawler_name, db_name, db_prefix, data_source) query_database(wrapper, crawler_name, db_name) create_and_run_job(wrapper, job_script, job_name, db_name) end private def setup_crawler(wrapper, crawler_name, db_name, db_prefix, data_source) new_step(1, 'Create a crawler') crawler = wrapper.get_crawler(crawler_name) unless crawler puts "Creating crawler #{crawler_name}." wrapper.create_crawler(crawler_name, @glue_service_role.arn, db_name, db_prefix, data_source) puts "Successfully created #{crawler_name}." end wrapper.start_crawler(crawler_name) monitor_crawler(wrapper, crawler_name) end def monitor_crawler(wrapper, crawler_name) new_step(2, 'Monitor Crawler') crawler_state = nil until crawler_state == 'READY' custom_wait(15) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler[0]['state'] print "Crawler status: #{crawler_state}".yellow end end def query_database(wrapper, _crawler_name, db_name) new_step(3, 'Query the database.') wrapper.get_database(db_name) puts "The crawler created database #{db_name}:" puts "Database contains tables: #{wrapper.get_tables(db_name).map { |t| t['name'] }}" end def create_and_run_job(wrapper, job_script, job_name, db_name) new_step(4, 'Create and run job.') wrapper.upload_job_script(job_script, @glue_bucket) wrapper.create_job(job_name, 'ETL Job', @glue_service_role.arn, "s3://#{@glue_bucket.name}/#{job_script}") run_job(wrapper, job_name, db_name) end def run_job(wrapper, job_name, db_name) new_step(5, 'Run the job.') wrapper.start_job_run(job_name, db_name, wrapper.get_tables(db_name)[0]['name'], @glue_bucket.name) job_run_status = nil until %w[SUCCEEDED FAILED STOPPED].include?(job_run_status) custom_wait(10) job_run = wrapper.get_job_runs(job_name) job_run_status = job_run[0]['job_run_state'] print "Job #{job_name} status: #{job_run_status}".yellow end end end def main banner('../../helpers/banner.txt') puts 'Starting AWS Glue demo...' # Load resource names from YAML. resource_names = YAML.load_file('resource_names.yaml') # Setup services and resources. iam_role = Aws::IAM::Resource.new(region: 'us-east-1').role(resource_names['glue_service_role']) s3_bucket = Aws::S3::Resource.new(region: 'us-east-1').bucket(resource_names['glue_bucket']) # Instantiate scenario and run. scenario = GlueCrawlerJobScenario.new(Aws::Glue::Client.new(region: 'us-east-1'), iam_role, s3_bucket, @logger) random_suffix = rand(10**4) scenario.run("crawler-#{random_suffix}", "db-#{random_suffix}", "prefix-#{random_suffix}-", 's3://data_source', 'job_script.py', "job-#{random_suffix}") puts 'Demo complete.' end

AWS Glue에서 사용된 ETL 스크립트를 생성하여 작업 실행 중에 데이터를 추출, 전환, 적재(ETL)합니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
Rust
SDK for Rust
참고

GitHub에 더 많은 내용이 있습니다. AWS코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

공용 Amazon Simple Storage Service (S3) 버킷을 크롤링하고 검색한 CSV 형식의 데이터를 설명하는 메타데이터 데이터베이스를 생성하는 크롤러를 만들고 실행합니다.

let create_crawler = glue .create_crawler() .name(self.crawler()) .database_name(self.database()) .role(self.iam_role.expose_secret()) .targets( CrawlerTargets::builder() .s3_targets(S3Target::builder().path(CRAWLER_TARGET).build()) .build(), ) .send() .await; match create_crawler { Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::AlreadyExistsException(_) => { info!("Using existing crawler"); Ok(()) } _ => Err(GlueMvpError::GlueSdk(glue_err)), } } Ok(_) => Ok(()), }?; let start_crawler = glue.start_crawler().name(self.crawler()).send().await; match start_crawler { Ok(_) => Ok(()), Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::CrawlerRunningException(_) => Ok(()), _ => Err(GlueMvpError::GlueSdk(glue_err)), } } }?;

AWS Glue Data Catalog의 데이터베이스 및 테이블에 대한 정보를 나열합니다.

let database = glue .get_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)? .to_owned(); let database = database .database() .ok_or_else(|| GlueMvpError::Unknown("Could not find database".into()))?; let tables = glue .get_tables() .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let tables = tables.table_list();

소스 Amazon S3 버킷에서 CSV 데이터를 추출하고, 필드를 제거하고 이름을 변경하여 변환하고, JSON 형식의 출력을 다른 Amazon S3 버킷으로 로드하는 작업을 만들고 실행합니다.

let create_job = glue .create_job() .name(self.job()) .role(self.iam_role.expose_secret()) .command( JobCommand::builder() .name("glueetl") .python_version("3") .script_location(format!("s3://{}/job.py", self.bucket())) .build(), ) .glue_version("3.0") .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job_name = create_job.name().ok_or_else(|| { GlueMvpError::Unknown("Did not get job name after creating job".into()) })?; let job_run_output = glue .start_job_run() .job_name(self.job()) .arguments("--input_database", self.database()) .arguments( "--input_table", self.tables .first() .ok_or_else(|| GlueMvpError::Unknown("Missing crawler table".into()))? .name(), ) .arguments("--output_bucket_url", self.bucket()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job = job_run_output .job_run_id() .ok_or_else(|| GlueMvpError::Unknown("Missing run id from just started job".into()))? .to_string();

데모 중에 생성된 모든 리소스를 삭제합니다.

glue.delete_job() .job_name(self.job()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; for t in &self.tables { glue.delete_table() .name(t.name()) .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; } glue.delete_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; glue.delete_crawler() .name(self.crawler()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?;
Swift
SDK for Swift
참고

GitHub에 더 많은 내용이 있습니다. AWS코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

Package.swift 파일.

// swift-tools-version: 5.9 // // The swift-tools-version declares the minimum version of Swift required to // build this package. import PackageDescription let package = Package( name: "glue-scenario", // Let Xcode know the minimum Apple platforms supported. platforms: [ .macOS(.v13), .iOS(.v15) ], dependencies: [ // Dependencies declare other packages that this package depends on. .package( url: "https://github.com/awslabs/aws-sdk-swift", from: "1.0.0"), .package( url: "https://github.com/apple/swift-argument-parser.git", branch: "main" ) ], targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. // Targets can depend on other targets in this package and products // from dependencies. .executableTarget( name: "glue-scenario", dependencies: [ .product(name: "AWSGlue", package: "aws-sdk-swift"), .product(name: "AWSS3", package: "aws-sdk-swift"), .product(name: "ArgumentParser", package: "swift-argument-parser") ], path: "Sources") ] )

Swift 코드 파일 entry.swift.

// An example that shows how to use the AWS SDK for Swift to demonstrate // creating and using crawlers and jobs using AWS Glue. // // 0. Upload the Python job script to Amazon S3 so it can be used when // calling `startJobRun()` later. // 1. Create a crawler, pass it the IAM role and the URL of the public Amazon // S3 bucket that contains the source data: // s3://crawler-public-us-east-1/flight/2016/csv. // 2. Start the crawler. This takes time, so after starting it, use a loop // that calls `getCrawler()` until the state is "READY". // 3. Get the database created by the crawler, and the tables in the // database. Display them to the user. // 4. Create a job. Pass it the IAM role and the URL to a Python ETL script // previously uploaded to the user's S3 bucket. // 5. Start a job run, passing the following custom arguments. These are // expected by the ETL script, so must exactly match. // * `--input_database: <name of the database created by the crawler>` // * `--input_table: <name of the table created by the crawler>` // * `--output_bucket_url: <URL to the scaffold bucket created for the // user>` // 6. Loop and get the job run until it returns one of the following states: // "SUCCEEDED", "STOPPED", "FAILED", or "TIMEOUT". // 7. Output data is stored in a group of files in the user's S3 bucket. // Either direct the user to their location or download a file and display // the results inline. // 8. List the jobs for the user's account. // 9. Get job run details for a job run. // 10. Delete the demo job. // 11. Delete the database and tables created by the example. // 12. Delete the crawler created by the example. import ArgumentParser import AWSS3 import Foundation import Smithy import AWSClientRuntime import AWSGlue struct ExampleCommand: ParsableCommand { @Option(help: "The AWS IAM role to use for AWS Glue calls.") var role: String @Option(help: "The Amazon S3 bucket to use for this example.") var bucket: String @Option(help: "The Amazon S3 URL of the data to crawl.") var s3url: String = "s3://crawler-public-us-east-1/flight/2016/csv" @Option(help: "The Python script to run as a job with AWS Glue.") var script: String = "./flight_etl_job_script.py" @Option(help: "The AWS Region to run AWS API calls in.") var awsRegion = "us-east-1" @Option(help: "A prefix string to use when naming tables.") var tablePrefix = "swift-glue-basics-table" @Option( help: ArgumentHelp("The level of logging for the Swift SDK to perform."), completion: .list([ "critical", "debug", "error", "info", "notice", "trace", "warning" ]) ) var logLevel: String = "error" static var configuration = CommandConfiguration( commandName: "glue-scenario", abstract: """ Demonstrates various features of AWS Glue. """, discussion: """ An example showing how to use AWS Glue to create, run, and monitor crawlers and jobs. """ ) /// Generate and return a unique file name that begins with the specified /// string. /// /// - Parameters: /// - prefix: Text to use at the beginning of the returned name. /// /// - Returns: A string containing a unique filename that begins with the /// specified `prefix`. /// /// The returned name uses a random number between 1 million and 1 billion to /// provide reasonable certainty of uniqueness for the purposes of this /// example. func tempName(prefix: String) -> String { return "\(prefix)-\(Int.random(in: 1000000..<1000000000))" } /// Upload a file to an Amazon S3 bucket. /// /// - Parameters: /// - s3Client: The S3 client to use when uploading the file. /// - path: The local path of the source file to upload. /// - toBucket: The name of the S3 bucket into which to upload the file. /// - key: The key (name) to give the file in the S3 bucket. /// /// - Returns: `true` if the file is uploaded successfully, otherwise `false`. func uploadFile(s3Client: S3Client, path: String, toBucket: String, key: String) async -> Bool { do { let fileData: Data = try Data(contentsOf: URL(fileURLWithPath: path)) let dataStream = ByteStream.data(fileData) _ = try await s3Client.putObject( input: PutObjectInput( body: dataStream, bucket: toBucket, key: key ) ) } catch { print("*** An unexpected error occurred uploading the script to the Amazon S3 bucket \"\(bucket)\".") return false } return true } /// Create a new AWS Glue crawler. /// /// - Parameters: /// - glueClient: An AWS Glue client to use for the crawler. /// - crawlerName: A name for the new crawler. /// - iamRole: The name of an Amazon IAM role for the crawler to use. /// - s3Path: The path of an Amazon S3 folder to use as a target location. /// - cronSchedule: A `cron` schedule indicating when to run the crawler. /// - databaseName: The name of an AWS Glue database to operate on. /// /// - Returns: `true` if the crawler is created successfully, otherwise `false`. func createCrawler(glueClient: GlueClient, crawlerName: String, iamRole: String, s3Path: String, cronSchedule: String, databaseName: String) async -> Bool { let s3Target = GlueClientTypes.S3Target(path: s3url) let targetList = GlueClientTypes.CrawlerTargets(s3Targets: [s3Target]) do { _ = try await glueClient.createCrawler( input: CreateCrawlerInput( databaseName: databaseName, description: "Created by the AWS SDK for Swift Scenario Example for AWS Glue.", name: crawlerName, role: iamRole, schedule: cronSchedule, tablePrefix: tablePrefix, targets: targetList ) ) } catch _ as AlreadyExistsException { print("*** A crawler named \"\(crawlerName)\" already exists.") return false } catch _ as OperationTimeoutException { print("*** The attempt to create the AWS Glue crawler timed out.") return false } catch { print("*** An unexpected error occurred creating the AWS Glue crawler: \(error.localizedDescription)") return false } return true } /// Delete an AWS Glue crawler. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - name: The name of the crawler to delete. /// /// - Returns: `true` if successful, otherwise `false`. func deleteCrawler(glueClient: GlueClient, name: String) async -> Bool { do { _ = try await glueClient.deleteCrawler( input: DeleteCrawlerInput(name: name) ) } catch { return false } return true } /// Start running an AWS Glue crawler. /// /// - Parameters: /// - glueClient: The AWS Glue client to use when starting the crawler. /// - name: The name of the crawler to start running. /// /// - Returns: `true` if the crawler is started successfully, otherwise `false`. func startCrawler(glueClient: GlueClient, name: String) async -> Bool { do { _ = try await glueClient.startCrawler( input: StartCrawlerInput(name: name) ) } catch { print("*** An unexpected error occurred starting the crawler.") return false } return true } /// Get the state of the specified AWS Glue crawler. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - name: The name of the crawler whose state should be returned. /// /// - Returns: A `GlueClientTypes.CrawlerState` value describing the /// state of the crawler. func getCrawlerState(glueClient: GlueClient, name: String) async -> GlueClientTypes.CrawlerState { do { let output = try await glueClient.getCrawler( input: GetCrawlerInput(name: name) ) // If the crawler or its state is `nil`, report that the crawler // is stopping. This may not be what you want for your // application but it works for this one! guard let crawler = output.crawler else { return GlueClientTypes.CrawlerState.stopping } guard let state = crawler.state else { return GlueClientTypes.CrawlerState.stopping } return state } catch { return GlueClientTypes.CrawlerState.stopping } } /// Wait until the specified crawler is ready to run. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - name: The name of the crawler to wait for. /// /// - Returns: `true` if the crawler is ready, `false` if the client is /// stopping (and will therefore never be ready). func waitUntilCrawlerReady(glueClient: GlueClient, name: String) async -> Bool { while true { let state = await getCrawlerState(glueClient: glueClient, name: name) if state == .ready { return true } else if state == .stopping { return false } // Wait four seconds before trying again. do { try await Task.sleep(for: .seconds(4)) } catch { print("*** Error pausing the task.") } } } /// Create a new AWS Glue job. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - jobName: The name to give the new job. /// - role: The IAM role for the job to use when accessing AWS services. /// - scriptLocation: The AWS S3 URI of the script to be run by the job. /// /// - Returns: `true` if the job is created successfully, otherwise `false`. func createJob(glueClient: GlueClient, name jobName: String, role: String, scriptLocation: String) async -> Bool { let command = GlueClientTypes.JobCommand( name: "glueetl", pythonVersion: "3", scriptLocation: scriptLocation ) do { _ = try await glueClient.createJob( input: CreateJobInput( command: command, description: "Created by the AWS SDK for Swift Glue basic scenario example.", glueVersion: "3.0", name: jobName, numberOfWorkers: 10, role: role, workerType: .g1x ) ) } catch { return false } return true } /// Return a list of the AWS Glue jobs listed on the user's account. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - maxJobs: The maximum number of jobs to return (default: 100). /// /// - Returns: An array of strings listing the names of all available AWS /// Glue jobs. func listJobs(glueClient: GlueClient, maxJobs: Int = 100) async -> [String] { var jobList: [String] = [] var nextToken: String? repeat { do { let output = try await glueClient.listJobs( input: ListJobsInput( maxResults: maxJobs, nextToken: nextToken ) ) guard let jobs = output.jobNames else { return jobList } jobList = jobList + jobs nextToken = output.nextToken } catch { return jobList } } while (nextToken != nil) return jobList } /// Delete an AWS Glue job. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - jobName: The name of the job to delete. /// /// - Returns: `true` if the job is successfully deleted, otherwise `false`. func deleteJob(glueClient: GlueClient, name jobName: String) async -> Bool { do { _ = try await glueClient.deleteJob( input: DeleteJobInput(jobName: jobName) ) } catch { return false } return true } /// Create an AWS Glue database. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - databaseName: The name to give the new database. /// - location: The URL of the source data to use with AWS Glue. /// /// - Returns: `true` if the database is created successfully, otherwise `false`. func createDatabase(glueClient: GlueClient, name databaseName: String, location: String) async -> Bool { let databaseInput = GlueClientTypes.DatabaseInput( description: "Created by the AWS SDK for Swift Glue basic scenario example.", locationUri: location, name: databaseName ) do { _ = try await glueClient.createDatabase( input: CreateDatabaseInput( databaseInput: databaseInput ) ) } catch { return false } return true } /// Get the AWS Glue database with the specified name. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - name: The name of the database to return. /// /// - Returns: The `GlueClientTypes.Database` object describing the /// specified database, or `nil` if an error occurs or the database /// isn't found. func getDatabase(glueClient: GlueClient, name: String) async -> GlueClientTypes.Database? { do { let output = try await glueClient.getDatabase( input: GetDatabaseInput(name: name) ) return output.database } catch { return nil } } /// Returns a list of the tables in the specified database. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - databaseName: The name of the database whose tables are to be /// returned. /// /// - Returns: An array of `GlueClientTypes.Table` objects, each /// describing one table in the named database. An empty array indicates /// that there are either no tables in the database, or an error /// occurred before any tables could be found. func getTablesInDatabase(glueClient: GlueClient, databaseName: String) async -> [GlueClientTypes.Table] { var tables: [GlueClientTypes.Table] = [] var nextToken: String? repeat { do { let output = try await glueClient.getTables( input: GetTablesInput( databaseName: databaseName, nextToken: nextToken ) ) guard let tableList = output.tableList else { return tables } tables = tables + tableList nextToken = output.nextToken } catch { return tables } } while nextToken != nil return tables } /// Delete the specified database. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - databaseName: The name of the database to delete. /// - deleteTables: A Bool indicating whether or not to delete the /// tables in the database before attempting to delete the database. /// /// - Returns: `true` if the database (and optionally its tables) are /// deleted, otherwise `false`. func deleteDatabase(glueClient: GlueClient, name databaseName: String, withTables deleteTables: Bool = false) async -> Bool { if deleteTables { var tableNames: [String] = [] // Get a list of the names of all of the tables in the database. let tableList = await self.getTablesInDatabase(glueClient: glueClient, databaseName: databaseName) for table in tableList { guard let name = table.name else { continue } tableNames.append(name) } // Delete the tables. If there's only one table, use // `deleteTable()`, otherwise, use `batchDeleteTable()`. You can // use `batchDeleteTable()` for a single table, but this // demonstrates the use of `deleteTable()`. if tableNames.count == 1 { do { print(" Deleting table...") _ = try await glueClient.deleteTable( input: DeleteTableInput( databaseName: databaseName, name: tableNames[0] ) ) } catch { print("*** Unable to delete the table.") } } else { do { print(" Deleting tables...") _ = try await glueClient.batchDeleteTable( input: BatchDeleteTableInput( databaseName: databaseName, tablesToDelete: tableNames ) ) } catch { print("*** Unable to delete the tables.") } } } // Delete the database itself. do { print(" Deleting the database itself...") _ = try await glueClient.deleteDatabase( input: DeleteDatabaseInput(name: databaseName) ) } catch { print("*** Unable to delete the database.") return false } return true } /// Start an AWS Glue job run. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - jobName: The name of the job to run. /// - databaseName: The name of the AWS Glue database to run the job against. /// - tableName: The name of the table in the database to run the job against. /// - outputURL: The AWS S3 URI of the bucket location into which to /// write the resulting output. /// /// - Returns: `true` if the job run is started successfully, otherwise `false`. func startJobRun(glueClient: GlueClient, name jobName: String, databaseName: String, tableName: String, outputURL: String) async -> String? { do { let output = try await glueClient.startJobRun( input: StartJobRunInput( arguments: [ "--input_database": databaseName, "--input_table": tableName, "--output_bucket_url": outputURL ], jobName: jobName, numberOfWorkers: 10, workerType: .g1x ) ) guard let id = output.jobRunId else { return nil } return id } catch { return nil } } /// Return a list of the job runs for the specified job. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - jobName: The name of the job for which to return its job runs. /// - maxResults: The maximum number of job runs to return (default: /// 1000). /// /// - Returns: An array of `GlueClientTypes.JobRun` objects describing /// each job run. func getJobRuns(glueClient: GlueClient, name jobName: String, maxResults: Int? = nil) async -> [GlueClientTypes.JobRun] { do { let output = try await glueClient.getJobRuns( input: GetJobRunsInput( jobName: jobName, maxResults: maxResults ) ) guard let jobRuns = output.jobRuns else { print("*** No job runs found.") return [] } return jobRuns } catch is EntityNotFoundException { print("*** The specified job name, \(jobName), doesn't exist.") return [] } catch { print("*** Unexpected error getting job runs:") dump(error) return [] } } /// Get information about a specific AWS Glue job run. /// /// - Parameters: /// - glueClient: The AWS Glue client to use. /// - jobName: The name of the job to return job run data for. /// - id: The run ID of the specific job run to return. /// /// - Returns: A `GlueClientTypes.JobRun` object describing the state of /// the job run, or `nil` if an error occurs. func getJobRun(glueClient: GlueClient, name jobName: String, id: String) async -> GlueClientTypes.JobRun? { do { let output = try await glueClient.getJobRun( input: GetJobRunInput( jobName: jobName, runId: id ) ) return output.jobRun } catch { return nil } } /// Called by ``main()`` to run the bulk of the example. func runAsync() async throws { // A name to give the Python script upon upload to the Amazon S3 // bucket. let scriptName = "jobscript.py" // Schedule string in `cron` format, as described here: // https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html let cron = "cron(15 12 * * ? *)" let glueConfig = try await GlueClient.GlueClientConfiguration(region: awsRegion) let glueClient = GlueClient(config: glueConfig) let s3Config = try await S3Client.S3ClientConfiguration(region: awsRegion) let s3Client = S3Client(config: s3Config) // Create random names for things that need them. let crawlerName = tempName(prefix: "swift-glue-basics-crawler") let databaseName = tempName(prefix: "swift-glue-basics-db") // Create a name for the AWS Glue job. let jobName = tempName(prefix: "scenario-job") // The URL of the Python script on S3. let scriptURL = "s3://\(bucket)/\(scriptName)" print("Welcome to the AWS SDK for Swift basic scenario for AWS Glue!") //===================================================================== // 0. Upload the Python script to the target bucket so it's available // for use by the Amazon Glue service. //===================================================================== print("Uploading the Python script: \(script) as key \(scriptName)") print("Destination bucket: \(bucket)") if !(await uploadFile(s3Client: s3Client, path: script, toBucket: bucket, key: scriptName)) { return } //===================================================================== // 1. Create the database and crawler using the randomized names // generated previously. //===================================================================== print("Creating database \"\(databaseName)\"...") if !(await createDatabase(glueClient: glueClient, name: databaseName, location: s3url)) { print("*** Unable to create the database.") return } print("Creating crawler \"\(crawlerName)\"...") if !(await createCrawler(glueClient: glueClient, crawlerName: crawlerName, iamRole: role, s3Path: s3url, cronSchedule: cron, databaseName: databaseName)) { return } //===================================================================== // 2. Start the crawler, then wait for it to be ready. //===================================================================== print("Starting the crawler and waiting until it's ready...") if !(await startCrawler(glueClient: glueClient, name: crawlerName)) { _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) return } if !(await waitUntilCrawlerReady(glueClient: glueClient, name: crawlerName)) { _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) } //===================================================================== // 3. Get the database and table created by the crawler. //===================================================================== print("Getting the crawler's database...") let database = await getDatabase(glueClient: glueClient, name: databaseName) guard let database else { print("*** Unable to get the database.") return } print("Database URI: \(database.locationUri ?? "<unknown>")") let tableList = await getTablesInDatabase(glueClient: glueClient, databaseName: databaseName) print("Found \(tableList.count) table(s):") for table in tableList { print(" \(table.name ?? "<unnamed>")") } if tableList.count != 1 { print("*** Incorrect number of tables found. There should only be one.") _ = await deleteDatabase(glueClient: glueClient, name: databaseName, withTables: true) _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) return } guard let tableName = tableList[0].name else { print("*** Table is unnamed.") _ = await deleteDatabase(glueClient: glueClient, name: databaseName, withTables: true) _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) return } //===================================================================== // 4. Create a job. //===================================================================== print("Creating a job...") if !(await createJob(glueClient: glueClient, name: jobName, role: role, scriptLocation: scriptURL)) { _ = await deleteDatabase(glueClient: glueClient, name: databaseName, withTables: true) _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) return } //===================================================================== // 5. Start a job run. //===================================================================== print("Starting the job...") // Construct the Amazon S3 URL for the job run's output. This is in // the bucket specified on the command line, with a folder name that's // unique for this job run. let timeStamp = Date().timeIntervalSince1970 let jobPath = "\(jobName)-\(Int(timeStamp))" let outputURL = "s3://\(bucket)/\(jobPath)" // Start the job run. let jobRunID = await startJobRun(glueClient: glueClient, name: jobName, databaseName: databaseName, tableName: tableName, outputURL: outputURL) guard let jobRunID else { print("*** Job run ID is invalid.") _ = await deleteJob(glueClient: glueClient, name: jobName) _ = await deleteDatabase(glueClient: glueClient, name: databaseName, withTables: true) _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) return } //===================================================================== // 6. Wait for the job run to indicate that the run is complete. //===================================================================== print("Waiting for job run to end...") var jobRunFinished = false var jobRunState: GlueClientTypes.JobRunState repeat { let jobRun = await getJobRun(glueClient: glueClient, name: jobName, id: jobRunID) guard let jobRun else { print("*** Unable to get the job run.") _ = await deleteJob(glueClient: glueClient, name: jobName) _ = await deleteDatabase(glueClient: glueClient, name: databaseName, withTables: true) _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) return } jobRunState = jobRun.jobRunState ?? .failed //===================================================================== // 7. Output where to find the data if the job run was successful. // If the job run failed for any reason, output an appropriate // error message. //===================================================================== switch jobRunState { case .succeeded: print("Job run succeeded. JSON files are in the Amazon S3 path:") print(" \(outputURL)") jobRunFinished = true case .stopped: jobRunFinished = true case .error: print("*** Error: Job run ended in an error. \(jobRun.errorMessage ?? "")") jobRunFinished = true case .failed: print("*** Error: Job run failed. \(jobRun.errorMessage ?? "")") jobRunFinished = true case .timeout: print("*** Warning: Job run timed out.") jobRunFinished = true default: do { try await Task.sleep(for: .milliseconds(250)) } catch { print("*** Error pausing the task.") } } } while jobRunFinished != true //===================================================================== // 7.5. List the job runs for this job, showing each job run's ID and // its execution time. //===================================================================== print("Getting all job runs for the job \(jobName):") let jobRuns = await getJobRuns(glueClient: glueClient, name: jobName) if jobRuns.count == 0 { print(" <no job runs found>") } else { print("Found \(jobRuns.count) job runs... listing execution times:") for jobRun in jobRuns { print(" \(jobRun.id ?? "<unnamed>"): \(jobRun.executionTime) seconds") } } //===================================================================== // 8. List the jobs for the user's account. //===================================================================== print("\nThe account has the following jobs:") let jobs = await listJobs(glueClient: glueClient) if jobs.count == 0 { print(" <no jobs found>") } else { for job in jobs { print(" \(job)") } } //===================================================================== // 9. Get the job run details for a job run. //===================================================================== print("Information about the job run:") let jobRun = await getJobRun(glueClient: glueClient, name: jobName, id: jobRunID) guard let jobRun else { print("*** Unable to retrieve the job run.") _ = await deleteJob(glueClient: glueClient, name: jobName) _ = await deleteDatabase(glueClient: glueClient, name: databaseName, withTables: true) _ = await deleteCrawler(glueClient: glueClient, name: crawlerName) return } let startDate = jobRun.startedOn ?? Date(timeIntervalSince1970: 0) let endDate = jobRun.completedOn ?? Date(timeIntervalSince1970: 0) let dateFormatter: DateFormatter = DateFormatter() dateFormatter.dateStyle = .long dateFormatter.timeStyle = .long print(" Started at: \(dateFormatter.string(from: startDate))") print(" Completed at: \(dateFormatter.string(from: endDate))") //===================================================================== // 10. Delete the job. //===================================================================== print("\nDeleting the job...") _ = await deleteJob(glueClient: glueClient, name: jobName) //===================================================================== // 11. Delete the database and tables created by this example. //===================================================================== print("Deleting the database...") _ = await deleteDatabase(glueClient: glueClient, name: databaseName, withTables: true) //===================================================================== // 12. Delete the crawler. //===================================================================== print("Deleting the crawler...") if !(await deleteCrawler(glueClient: glueClient, name: crawlerName)) { return } } } /// The program's asynchronous entry point. @main struct Main { static func main() async { let args = Array(CommandLine.arguments.dropFirst()) do { let command = try ExampleCommand.parse(args) try await command.runAsync() } catch { ExampleCommand.exit(withError: error) } } }

AWS SDK 개발자 가이드 및 코드 예시의 전체 목록은 AWS SDK와 함께 이 서비스 사용 섹션을 참조하세요. 이 주제에는 시작하기에 대한 정보와 이전 SDK 버전에 대한 세부 정보도 포함되어 있습니다.