

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Contoh proyek cetak biru
<a name="developing-blueprints-sample"></a>

Konversi format data adalah kasus penggunaan extract, transform, and load (ETL) yang sering terjadi. Pada beban kerja analitik yang khas, format file berbasis kolom seperti Parquet atau ORC lebih disukai daripada format teks seperti CSV atau JSON. Cetak biru sampel ini memungkinkan Anda mengonversi data dariCSV/JSON/etc. menjadi Parket untuk file di Amazon S3. 

Cetak biru ini mengambil daftar path S3 yang didefinisikan oleh parameter cetak biru, mengkonversi data ke format Parquet, dan menulis ke lokasi S3 yang ditentukan oleh parameter cetak biru lain. Skrip tata letak menciptakan sebuah crawler dan tugas untuk setiap path. Skrip tata letak juga mengunggah skrip ETL di `Conversion.py` ke bucket S3 yang ditentukan oleh parameter cetak biru lain. Skrip tata letak kemudian menentukan skrip yang sudah diunggah sebagai skrip ETL untuk setiap tugas. Arsip ZIP untuk proyek berisi skrip tata letak, skrip ETL, dan file konfigurasi cetak biru.

Untuk informasi tentang contoh proyek cetak biru lainnya, lihat [Sampel cetak biru](developing-blueprints-samples.md).

Berikut ini adalah skrip tata letak, dalam file `Layout.py`.

```
from awsglue.blueprint.workflow import *
from awsglue.blueprint.job import *
from awsglue.blueprint.crawler import *
import boto3

s3_client = boto3.client('s3')

# Ingesting all the S3 paths as Glue table in parquet format
def generate_layout(user_params, system_params):
    #Always give the full path for the file
    with open("ConversionBlueprint/Conversion.py", "rb") as f:
        s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py")
    etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket'])    
    crawlers = []
    jobs = []
    workflowName = user_params['WorkflowName']
    for path in user_params['S3Paths']:
      tablePrefix = "source_" 
      crawler = Crawler(Name="{}_crawler".format(workflowName),
                        Role=user_params['PassRole'],
                        DatabaseName=user_params['TargetDatabase'],
                        TablePrefix=tablePrefix,
                        Targets= {"S3Targets": [{"Path": path}]})
      crawlers.append(crawler)
      transform_job = Job(Name="{}_transform_job".format(workflowName),
                         Command={"Name": "glueetl",
                                  "ScriptLocation": etlScriptLocation,
                                  "PythonVersion": "3"},
                         Role=user_params['PassRole'],
                         DefaultArguments={"--database_name": user_params['TargetDatabase'],
                                           "--table_prefix": tablePrefix,
                                           "--region_name": system_params['region'],
                                           "--output_path": user_params['TargetS3Location']},
                         DependsOn={crawler: "SUCCEEDED"},
                         WaitForDependencies="AND")
      jobs.append(transform_job)
    conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers))
    return conversion_workflow
```

Berikut ini adalah file `blueprint.cfg` konfigurasi cetak biru yang sesuai.

```
{
    "layoutGenerator": "ConversionBlueprint.Layout.generate_layout",
    "parameterSpec" : {
        "WorkflowName" : {
            "type": "String",
            "collection": false,
            "description": "Name for the workflow."
        },
        "S3Paths" : {
            "type": "S3Uri",
            "collection": true,
            "description": "List of Amazon S3 paths for data ingestion."
        },
        "PassRole" : {
            "type": "IAMRoleName",
            "collection": false,
            "description": "Choose an IAM role to be used in running the job/crawler"
        },
        "TargetDatabase": {
            "type": "String",
            "collection" : false,
            "description": "Choose a database in the Data Catalog."
        },
        "TargetS3Location": {
            "type": "S3Uri",
            "collection" : false,
            "description": "Choose an Amazon S3 output path: ex:s3://<target_path>/."
        },
        "ScriptsBucket": {
            "type": "S3Bucket",
            "collection": false,
            "description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts."
        }
    }
}
```

Skrip yang ada dalam file `Conversion.py` berikut adalah skrip ETL yang telah diunggah. Perhatikan bahwa ia mempertahankan skema partisi selama konversi. 

```
import sys
from pyspark.sql.functions import *
from pyspark.context import SparkContext
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import boto3

args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'region_name',
    'database_name',
    'table_prefix',
    'output_path'])
databaseName = args['database_name']
tablePrefix = args['table_prefix']
outputPath = args['output_path']

glue = boto3.client('glue', region_name=args['region_name'])

glue_context = GlueContext(SparkContext.getOrCreate())
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args)

def get_tables(database_name, table_prefix):
    tables = []
    paginator = glue.get_paginator('get_tables')
    for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"):
        tables.extend(page['TableList'])
    return tables

for table in get_tables(databaseName, tablePrefix):
    tableName = table['Name']
    partitionList = table['PartitionKeys']
    partitionKeys = []
    for partition in partitionList:
        partitionKeys.append(partition['Name'])

    # Create DynamicFrame from Catalog
    dyf = glue_context.create_dynamic_frame.from_catalog(
        name_space=databaseName,
        table_name=tableName,
        additional_options={
            'useS3ListImplementation': True
        },
        transformation_ctx='dyf'
    )

    # Resolve choice type with make_struct
    dyf = ResolveChoice.apply(
        frame=dyf,
        choice='make_struct',
        transformation_ctx='resolvechoice_' + tableName
    )

    # Drop null fields
    dyf = DropNullFields.apply(
        frame=dyf,
        transformation_ctx="dropnullfields_" + tableName
    )

    # Write DynamicFrame to S3 in glueparquet
    sink = glue_context.getSink(
        connection_type="s3",
        path=outputPath,
        enableUpdateCatalog=True,
        partitionKeys=partitionKeys
    )
    sink.setFormat("glueparquet")

    sink.setCatalogInfo(
        catalogDatabase=databaseName,
        catalogTableName=tableName[len(tablePrefix):]
    )
    sink.writeFrame(dyf)

job.commit()
```

**catatan**  
Hanya dua path Amazon S3 yang dapat diberikan sebagai masukan untuk cetak biru sampel. Ini karena AWS Glue pemicu terbatas untuk memanggil hanya dua tindakan crawler.