

# 샘플 블루프린트 프로젝트
<a name="developing-blueprints-sample"></a>

데이터 포맷 변환은 빈번한 추출, 변환, 로드 사용 사례입니다. 일반적인 분석 워크로드에서는 Parquet 또는 ORC와 같은 열 기반 파일 포맷이 CSV 또는 JSON과 같은 텍스트 포맷보다 선호됩니다. 이 샘플 블루프린트를 사용하면 CSV/JSON 등의 데이터를 Amazon S3의 파일용 Parquet로 변환할 수 있습니다.

이 블루프린트는 블루프린트 파라미터로 정의된 S3 경로 목록을 가져와 데이터를 Parquet 포맷으로 변환한 다음 다른 블루프린트 파라미터로 지정된 S3 위치에 씁니다. 레이아웃 스크립트는 각 경로에 대한 크롤러 및 작업을 생성합니다. 또한 레이아웃 스크립트는 `Conversion.py`의 ETL 스크립트를 다른 블루프린트 파라미터로 지정된 S3 버킷에 업로드합니다. 그런 다음 레이아웃 스크립트는 업로드된 스크립트를 각 작업에 대한 ETL 스크립트로 지정합니다. 프로젝트의 ZIP 아카이브에는 레이아웃 스크립트, ETL 스크립트 및 Blueprint 구성 파일이 포함되어 있습니다.

추가 샘플 블루프린트 프로젝트에 대한 자세한 내용은 [블루프린트 샘플](developing-blueprints-samples.md) 섹션을 참조하세요.

다음은 `Layout.py` 파일에 있는 레이아웃 스크립트입니다.

```
from awsglue.blueprint.workflow import *
from awsglue.blueprint.job import *
from awsglue.blueprint.crawler import *
import boto3

s3_client = boto3.client('s3')

# Ingesting all the S3 paths as Glue table in parquet format
def generate_layout(user_params, system_params):
    #Always give the full path for the file
    with open("ConversionBlueprint/Conversion.py", "rb") as f:
        s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py")
    etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket'])    
    crawlers = []
    jobs = []
    workflowName = user_params['WorkflowName']
    for path in user_params['S3Paths']:
      tablePrefix = "source_" 
      crawler = Crawler(Name="{}_crawler".format(workflowName),
                        Role=user_params['PassRole'],
                        DatabaseName=user_params['TargetDatabase'],
                        TablePrefix=tablePrefix,
                        Targets= {"S3Targets": [{"Path": path}]})
      crawlers.append(crawler)
      transform_job = Job(Name="{}_transform_job".format(workflowName),
                         Command={"Name": "glueetl",
                                  "ScriptLocation": etlScriptLocation,
                                  "PythonVersion": "3"},
                         Role=user_params['PassRole'],
                         DefaultArguments={"--database_name": user_params['TargetDatabase'],
                                           "--table_prefix": tablePrefix,
                                           "--region_name": system_params['region'],
                                           "--output_path": user_params['TargetS3Location']},
                         DependsOn={crawler: "SUCCEEDED"},
                         WaitForDependencies="AND")
      jobs.append(transform_job)
    conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers))
    return conversion_workflow
```

다음은 해당 블루프린트 구성 파일인 `blueprint.cfg`입니다.

```
{
    "layoutGenerator": "ConversionBlueprint.Layout.generate_layout",
    "parameterSpec" : {
        "WorkflowName" : {
            "type": "String",
            "collection": false,
            "description": "Name for the workflow."
        },
        "S3Paths" : {
            "type": "S3Uri",
            "collection": true,
            "description": "List of Amazon S3 paths for data ingestion."
        },
        "PassRole" : {
            "type": "IAMRoleName",
            "collection": false,
            "description": "Choose an IAM role to be used in running the job/crawler"
        },
        "TargetDatabase": {
            "type": "String",
            "collection" : false,
            "description": "Choose a database in the Data Catalog."
        },
        "TargetS3Location": {
            "type": "S3Uri",
            "collection" : false,
            "description": "Choose an Amazon S3 output path: ex:s3://<target_path>/."
        },
        "ScriptsBucket": {
            "type": "S3Bucket",
            "collection": false,
            "description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts."
        }
    }
}
```

`Conversion.py` 파일의 다음 스크립트는 업로드된 ETL 스크립트입니다. 변환하는 동안 파티션 구성표를 유지합니다.

```
import sys
from pyspark.sql.functions import *
from pyspark.context import SparkContext
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import boto3

args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'region_name',
    'database_name',
    'table_prefix',
    'output_path'])
databaseName = args['database_name']
tablePrefix = args['table_prefix']
outputPath = args['output_path']

glue = boto3.client('glue', region_name=args['region_name'])

glue_context = GlueContext(SparkContext.getOrCreate())
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args)

def get_tables(database_name, table_prefix):
    tables = []
    paginator = glue.get_paginator('get_tables')
    for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"):
        tables.extend(page['TableList'])
    return tables

for table in get_tables(databaseName, tablePrefix):
    tableName = table['Name']
    partitionList = table['PartitionKeys']
    partitionKeys = []
    for partition in partitionList:
        partitionKeys.append(partition['Name'])

    # Create DynamicFrame from Catalog
    dyf = glue_context.create_dynamic_frame.from_catalog(
        name_space=databaseName,
        table_name=tableName,
        additional_options={
            'useS3ListImplementation': True
        },
        transformation_ctx='dyf'
    )

    # Resolve choice type with make_struct
    dyf = ResolveChoice.apply(
        frame=dyf,
        choice='make_struct',
        transformation_ctx='resolvechoice_' + tableName
    )

    # Drop null fields
    dyf = DropNullFields.apply(
        frame=dyf,
        transformation_ctx="dropnullfields_" + tableName
    )

    # Write DynamicFrame to S3 in glueparquet
    sink = glue_context.getSink(
        connection_type="s3",
        path=outputPath,
        enableUpdateCatalog=True,
        partitionKeys=partitionKeys
    )
    sink.setFormat("glueparquet")

    sink.setCatalogInfo(
        catalogDatabase=databaseName,
        catalogTableName=tableName[len(tablePrefix):]
    )
    sink.writeFrame(dyf)

job.commit()
```

**참고**  
2개의 Amazon S3 경로만 샘플 블루프린트에 대한 입력으로 제공될 수 있습니다. 이는 AWS Glue 트리거가 2개의 크롤러 작업만 호출하도록 제한되기 때문입니다.