Tiga jenis pekerjaan AWS Glue ETL untuk mengonversi data ke Apache Parquet - AWS Prescriptive Guidance

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Tiga jenis pekerjaan AWS Glue ETL untuk mengonversi data ke Apache Parquet

Adnan Alvee, Nith Govindasivan, dan Karthikeyan Ramachandran, Amazon Web Services

Ringkasan

Di Amazon Web Services (AWS) Cloud, AWS Glue adalah layanan ekstrak, transformasi, dan pemuatan (ETL) yang dikelola sepenuhnya. AWS Glue membuatnya hemat biaya untuk mengkategorikan data Anda, membersihkannya, memperkayanya, dan memindahkannya dengan andal di antara berbagai penyimpanan data dan aliran data.

Pola ini menyediakan jenis pekerjaan yang berbeda di AWS Glue dan menggunakan tiga skrip berbeda untuk mendemonstrasikan pembuatan pekerjaan ETL.

Anda dapat menggunakan AWS Glue untuk menulis pekerjaan ETL di lingkungan shell Python. Anda juga dapat membuat pekerjaan ETL batch dan streaming dengan menggunakan Python PySpark () atau Scala di lingkungan Apache Spark yang dikelola. Untuk membantu Anda memulai penulisan pekerjaan ETL, pola ini berfokus pada pekerjaan ETL batch menggunakan Python shell,, dan Scala. PySpark Pekerjaan shell Python dimaksudkan untuk beban kerja yang membutuhkan daya komputasi yang lebih rendah. Lingkungan Apache Spark yang dikelola dimaksudkan untuk beban kerja yang membutuhkan daya komputasi tinggi.

Apache Parquet dibangun untuk mendukung skema kompresi dan pengkodean yang efisien. Ini dapat mempercepat beban kerja analitik Anda karena menyimpan data secara kolumnar. Mengonversi data ke Parket dapat menghemat ruang penyimpanan, biaya, dan waktu Anda dalam jangka panjang. Untuk mempelajari lebih lanjut tentang Parket, lihat posting blog Apache Parquet: Cara menjadi pahlawan dengan format data kolumnar sumber terbuka.

Prasyarat dan batasan

Prasyarat

  • Peran AWS Identity and Access Management (IAM) (Jika Anda tidak memiliki peran, lihat bagian Informasi tambahan.)

Arsitektur

Tumpukan teknologi target

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

Otomatisasi dan skala

  • Alur kerja AWS Glue mendukung otomatisasi penuh dari pipeline ETL.

  • Anda dapat mengubah jumlah unit pemrosesan data (DPUs), atau tipe pekerja, untuk menskalakan secara horizontal dan vertikal.

Alat

Layanan AWS

  • Amazon Simple Storage Service (Amazon S3) adalah layanan penyimpanan objek berbasis cloud yang membantu Anda menyimpan, melindungi, dan mengambil sejumlah data.

  • AWS Glue adalah layanan ETL yang dikelola sepenuhnya untuk mengkategorikan, membersihkan, memperkaya, dan memindahkan data Anda antara berbagai penyimpanan data dan aliran data.

Alat lainnya

  • Apache Parquet adalah format file data berorientasi kolom sumber terbuka yang dirancang untuk penyimpanan dan pengambilan.

Konfigurasi

Gunakan pengaturan berikut untuk mengonfigurasi daya komputasi AWS Glue ETL. Untuk mengurangi biaya, gunakan pengaturan minimal saat Anda menjalankan beban kerja yang disediakan dalam pola ini. 

  • Python shell — Anda dapat menggunakan 1 DPU untuk memanfaatkan 16 GB memori atau 0.0625 DPU untuk memanfaatkan 1 GB memori. Pola ini menggunakan 0,0625 DPU, yang merupakan default di konsol AWS Glue.

  • Python atau Scala untuk Spark - Jika Anda memilih jenis pekerjaan terkait Spark di konsol, AWS Glue secara default menggunakan 10 pekerja dan tipe pekerja G.1X. Pola ini menggunakan dua pekerja, yang merupakan jumlah minimum yang diperbolehkan, dengan tipe pekerja standar, yang cukup dan hemat biaya.

Tabel berikut menampilkan berbagai jenis pekerja AWS Glue untuk lingkungan Apache Spark. Karena pekerjaan shell Python tidak menggunakan lingkungan Apache Spark untuk menjalankan Python, itu tidak termasuk dalam tabel.

Standar

G.1X

G.2X

vCPU

4

4

8

Memori

16 GB

16 GB

32 GB

Ruang disk

50 GB

64 GB

128 GB

Pelaksana per pekerja

2

1

Kode

Untuk kode yang digunakan dalam pola ini, termasuk peran IAM dan konfigurasi parameter, lihat bagian Informasi tambahan.

Epik

TugasDeskripsiKeterampilan yang dibutuhkan

Unggah data ke bucket S3 baru atau yang sudah ada.

Buat atau gunakan bucket S3 yang ada di akun Anda. Unggah file sample_data.csv dari bagian Lampiran, dan perhatikan bucket S3 dan lokasi awalan.

AWS Umum
TugasDeskripsiKeterampilan yang dibutuhkan

Buat pekerjaan AWS Glue.

Di bawah bagian ETL konsol AWS Glue, tambahkan pekerjaan AWS Glue. Pilih jenis pekerjaan yang sesuai, versi AWS Glue, dan DPU/Worker jenis serta jumlah pekerja yang sesuai. Untuk detailnya, lihat bagian Konfigurasi.

Pengembang, cloud atau data

Ubah lokasi input dan output.

Salin kode yang sesuai dengan tugas AWS Glue Anda, dan ubah lokasi input dan output yang Anda catat dalam epik Unggah data.

Pengembang, cloud atau data

Konfigurasikan parameter.

Anda dapat menggunakan cuplikan yang disediakan di bagian Informasi tambahan untuk mengatur parameter untuk pekerjaan ETL Anda. AWS Glue menggunakan empat nama argumen secara internal:

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

--JOB_NAMEParameter harus dimasukkan secara eksplisit pada konsol AWS Glue. Pilih Pekerjaan, Edit Job, Konfigurasi keamanan, pustaka skrip, dan parameter pekerjaan (opsional). Masukkan --JOB_NAME sebagai kunci dan berikan nilai. Anda juga dapat menggunakan AWS Command Line Interface (AWS CLI) atau AWS Glue API untuk mengatur parameter ini. --JOB_NAMEParameter ini digunakan oleh Spark dan tidak diperlukan dalam pekerjaan lingkungan shell Python.

Anda harus menambahkan -- sebelum setiap nama parameter; jika tidak, kode tidak akan berfungsi. Misalnya, untuk cuplikan kode, parameter lokasi harus dipanggil oleh dan. --input_loc --output_loc

Pengembang, cloud atau data

Jalankan pekerjaan ETL.

Jalankan pekerjaan Anda dan periksa hasilnya. Perhatikan berapa banyak ruang yang dikurangi dari file asli.

Pengembang, cloud atau data

Sumber daya terkait

Referensi

Tutorial dan video

Informasi tambahan

Peran IAM

Saat membuat pekerjaan AWS Glue, Anda dapat menggunakan peran IAM yang ada yang memiliki izin yang ditampilkan dalam cuplikan kode berikut atau peran baru.

Untuk membuat peran baru, gunakan kode YAMB berikut.

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

AWS Glue Python Shell

Kode Python menggunakan Pandas dan PyArrow pustaka untuk mengkonversi data ke Parquet. Perpustakaan Pandas sudah tersedia. PyArrow Pustaka diunduh saat Anda menjalankan pola, karena ini adalah proses satu kali. Anda dapat menggunakan file roda PyArrow untuk mengonversi ke perpustakaan dan menyediakan file sebagai paket perpustakaan. Untuk informasi selengkapnya tentang mengemas file roda, lihat Menyediakan pustaka Python Anda sendiri.

Parameter cangkang AWS Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

Kode shell AWS Glue Python

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "s3://bucket-name/prefix/sample_data.csv" output_loc = "s3://bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1] s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False) s3_resource.Object(output_loc_bucket, output_loc_prefix + 'data' + '.parquet').put(Body=parquet_buffer.getvalue())

Pekerjaan AWS Glue Spark dengan Python

Untuk menggunakan jenis pekerjaan AWS Glue Spark dengan Python, pilih Spark sebagai jenis pekerjaan. Pilih Spark 3.1, Python 3 dengan waktu startup pekerjaan yang ditingkatkan (Glue Version 3.0) sebagai versi AWS Glue.

Parameter AWS Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

Pekerjaan AWS Glue Spark dengan kode Python

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "s3://bucket-name/prefix/sample_data.csv" output_loc = "s3://bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }) outputDF = glueContext.write_dynamic_frame.from_options(\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet")

Untuk sejumlah besar file besar terkompresi (misalnya, 1.000 file yang masing-masing sekitar 3 MB), gunakan compressionType parameter dengan recurse parameter untuk membaca semua file yang tersedia dalam awalan, seperti yang ditunjukkan pada kode berikut.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Untuk sejumlah besar file kecil terkompresi (misalnya, 1.000 file yang masing-masing sekitar 133 KB), gunakan groupFiles parameter, bersama dengan parameter compressionType dan parameter. recurse groupFilesParameter mengelompokkan file kecil menjadi beberapa file besar, dan groupSize parameter mengontrol pengelompokan ke ukuran yang ditentukan dalam byte (misalnya, 1 MB). Cuplikan kode berikut memberikan contoh penggunaan parameter ini dalam kode.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Tanpa perubahan apa pun pada node pekerja, pengaturan ini memungkinkan pekerjaan AWS Glue untuk membaca banyak file (besar atau kecil, dengan atau tanpa kompresi) dan menuliskannya ke target dalam format Parket.

Pekerjaan AWS Glue Spark dengan Scala

Untuk menggunakan jenis pekerjaan AWS Glue Spark dengan Scala, pilih Spark sebagai tipe pekerjaan dan Bahasa sebagai Scala. Pilih Spark 3.1, Scala 2 dengan waktu startup pekerjaan yang ditingkatkan (Glue Version 3.0) sebagai versi AWS Glue. Untuk menghemat ruang penyimpanan, sampel AWS Glue with Scala berikut juga menggunakan applyMapping fitur untuk mengonversi tipe data.

Parameter AWS Glue Scala

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

Pekerjaan AWS Glue Spark dengan kode Scala

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp { def main(sysArgs: Array[String]) { @transient val spark: SparkContext = SparkContext.getOrCreate() val glueContext: GlueContext = new GlueContext(spark) val inputLoc = "s3://bucket-name/prefix/sample_data.csv" val outputLoc = "s3://bucket-name/prefix/" val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame() val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"), ("_c2", "string", "profit", "double")), caseSensitive = false) val formatPartition = applyMapping.toDF().coalesce(1) val dynamicFrame = DynamicFrame(formatPartition, glueContext) val dataSink = glueContext.getSinkWithFormat( connectionType = "s3", options = JsonOptions(Map("path" -> outputLoc )), transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame) } }

Lampiran

Untuk mengakses konten tambahan yang terkait dengan dokumen ini, unzip file berikut: attachment.zip