翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
データスキャンの量を減らす
まず、必要なデータのみをロードすることを検討してください。各データソースの Spark クラスターにロードされるデータの量を減らすだけで、パフォーマンスを向上させることができます。このアプローチが適切かどうかを評価するには、次のメトリクスを使用します。
Spark UI セクションで説明されているように、CloudWatch メトリクスの Amazon S3 からの読み取りバイト数と Spark UI の詳細を確認できます。
CloudWatch メトリクス
Amazon S3 からのおおよその読み取りサイズは、ETL データ移動 (バイト) で確認できます。このメトリクスは、前のレポート以降のすべてのエグゼキュターによって Amazon S3 から読み取られたバイト数を示します。これを使用して Amazon S3 からの ETL データ移動をモニタリングし、読み取りと外部データソースからの取り込みレートを比較できます。
S3 バイト読み取りデータポイントが予想よりも大きい場合は、次の解決策を検討してください。
Spark UI
for Spark UI AWS Glue のステージタブで、入力サイズと出力サイズを確認できます。次の例では、ステージ 2 は 47.4 GiB の入力と 47.7 GiB の出力を読み取り、ステージ 5 は 61.2 MiB の入力と 56.6 MiB の出力を読み取ります。
AWS Glue ジョブで Spark SQL または DataFrame アプローチを使用する場合、SQL /D ataFrame タブには、これらのステージに関するより多くの統計が表示されます。この場合、ステージ 2 には、読み取りファイル数: 430、読み取りファイルサイズ: 47.4 GiB、出力行数: 160,796,570 が表示されます。
で読み取っているデータと使用しているデータのサイズに大きな違いがあることがわかった場合は、次のソリューションを試してください。
Amazon S3
Amazon S3 から読み取るときにジョブにロードされるデータの量を減らすには、データセット のファイルサイズ、圧縮、ファイル形式、ファイルレイアウト (パーティション) を考慮してください。Spark ジョブ AWS Glue の場合、生データの ETL によく使用されますが、効率的な分散処理のためには、データソース形式の機能を検査する必要があります。
-
ファイルサイズ – 入力と出力のファイルサイズを中程度の範囲 (128 MB など) に維持することをお勧めします。ファイルが小さすぎたり、大きすぎたりすると、問題が発生する可能性があります。
小さなファイルが多数あると、次の問題が発生します。
-
多くのオブジェクト (同じ量のデータを保存するいくつかのオブジェクトと比較して
Head) に対してリクエストを行うために必要なオーバーヘッド (List、Get、 など) により、Amazon S3 へのネットワーク I/O 負荷が高い。 -
Spark ドライバーの負荷の高い I/O と処理負荷。これにより、多くのパーティションとタスクが生成され、過剰な並列処理が発生します。
一方、ファイルタイプが分割可能でなく (gzip など)、ファイルが大きすぎる場合、Spark アプリケーションは 1 つのタスクでファイル全体の読み取りが完了するまで待つ必要があります。
小さなファイルごとに Apache Spark タスクを作成するときに発生する過剰な並列処理を減らすには、DynamicFrames のファイルグループを使用します。このアプローチにより、Spark ドライバーから OOM 例外が発生する可能性が低くなります。ファイルグループ化を設定するには、 パラメータ
groupFilesとgroupSizeパラメータを設定します。次のコード例では、これらのパラメータを含む ETL スクリプトで AWS Glue DynamicFrame API を使用しています。dyf = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://input-s3-path/"], 'recurse':True, 'groupFiles': 'inPartition', 'groupSize': '1048576'}, format="json") -
-
圧縮 — S3 オブジェクトが数百メガバイト内にある場合は、圧縮することを検討してください。さまざまな圧縮形式があり、2 つのタイプに大別できます。
-
gzip などの分割不可能な 圧縮形式では、ファイル全体を 1 人のワーカーが解凍する必要があります。
-
bzip2 や LZO (インデックス付き) などの分割可能な圧縮形式を使用すると、ファイルの部分的な解凍が可能になり、並列化できます。
Spark (およびその他の一般的な分散処理エンジン) では、ソースデータファイルをエンジンが並行して処理できるチャンクに分割します。これらの単位は分割と呼ばれることがよくあります。データが分割可能な形式になると、最適化された AWS Glue リーダーは、特定のブロックのみを取得する
RangeオプションをGetObjectAPI に提供することで、S3 オブジェクトから分割を取得できます。これが実際にどのように機能するかを確認するには、次の図を検討してください。
圧縮されたデータは、ファイルが最適なサイズであるか、ファイルが分割可能である限り、アプリケーションを大幅に高速化できます。データサイズが小さいほど、Amazon S3 からスキャンされるデータとAmazon S3 から Spark クラスターへのネットワークトラフィックが減少します。一方、データを圧縮および解凍するには、より多くの CPU が必要です。必要なコンピューティングの量は、圧縮アルゴリズムの圧縮率でスケールされます。分割可能な圧縮形式を選択するときは、このトレードオフを考慮してください。
注記
gzip ファイルは一般に分割できませんが、gzip を使用して個々の Parquet ブロックを圧縮し、それらのブロックを並列化できます。
-
-
ファイル形式 – 列形式を使用します。Apache Parquet
と Apache ORC は一般的な列データ形式です。Parquet と ORC は、列ベースの圧縮、データ型に基づく各列のエンコードと圧縮を採用することで、データを効率的に保存します。Parquet エンコーディングの詳細については、「Parquet エンコーディング定義 」を参照してください。Parquet ファイルも分割可能です。 Columnar は、グループ値を列ごとにフォーマットし、ブロックにまとめて保存します。列形式を使用する場合、使用しない列に対応するデータのブロックをスキップできます。Spark アプリケーションは、必要な列のみを取得できます。一般的に、圧縮率やデータブロックをスキップすると、Amazon S3 から読み取るバイト数が少なくなり、パフォーマンスが向上します。どちらの形式も、I/O を減らすための以下のプッシュダウンアプローチをサポートしています。
-
射影プッシュダウン – 射影プッシュダウンは、アプリケーションで指定された列のみを取得する手法です。次の例に示すように、Spark アプリケーションの列を指定します。
-
DataFrame の例:
df.select("star_rating") -
Spark SQL の例:
spark.sql("select start_rating from <table>")
-
-
述語プッシュダウン – 述語プッシュダウンは、
WHEREおよびGROUP BY句を効率的に処理するための手法です。どちらの形式にも、列値を表すデータのブロックがあります。各ブロックは、最大値や最小値など、ブロックの統計を保持します。Spark は、これらの統計を使用して、アプリケーションで使用されるフィルター値に応じてブロックを読み取るかスキップするかを決定できます。この機能を使用するには、次の例に示すように、条件にフィルターを追加します。-
DataFrame の例:
df.select("star_rating").filter("star_rating < 2") -
Spark SQL の例:
spark.sql("select * from <table> where star_rating < 2")
-
-
-
ファイルレイアウト – データの使用方法に基づいて S3 データを異なるパスのオブジェクトに保存することで、関連するデータを効率的に取得できます。詳細については、「Amazon S3 ドキュメント」の「プレフィックスを使用してオブジェクトを整理する」を参照してください。 AWS Glue は、キーと値を 形式で Amazon S3 プレフィックスに保存し
key=value、データを Amazon S3 パスでパーティション化することをサポートしています。 Amazon S3 データをパーティション化することで、各ダウンストリーム分析アプリケーションでスキャンされるデータ量を制限し、パフォーマンスを向上させ、コストを削減できます。詳細については、「 での ETL 出力のパーティションの管理 AWS Glue」を参照してください。パーティション分割はテーブルをさまざまな部分に分割し、次の例に示すように、年、月、日などの列値に基づいて関連するデータをグループ化されたファイルに保持します。
# Partitioning by /YYYY/MM/DD s3://<YourBucket>/year=2023/month=03/day=31/0000.gz s3://<YourBucket>/year=2023/month=03/day=01/0000.gz s3://<YourBucket>/year=2023/month=03/day=02/0000.gz s3://<YourBucket>/year=2023/month=03/day=03/0000.gz ...データセットのパーティションは、 のテーブルでモデリングすることで定義できます AWS Glue Data Catalog。その後、次のようにパーティションプルーニングを使用してデータスキャンの量を制限できます。
-
For AWS Glue DynamicFrame の場合は、
push_down_predicate(または ) を設定しますcatalogPartitionPredicate。dyf = Glue_context.create_dynamic_frame.from_catalog( database=src_database_name, table_name=src_table_name, push_down_predicate = "year='2023' and month ='03'", ) -
Spark DataFrame の場合、パーティションをプルーニングする固定パスを設定します。
df = spark.read.format("json").load("s3://<YourBucket>/year=2023/month=03/*/*.gz") -
Spark SQL では、データカタログからパーティションをプルーニングする where 句を設定できます。
df = spark.sql("SELECT * FROM <Table> WHERE year= '2023' and month = '03'") -
を使用してデータを書き込むときに日付でパーティション分割するには AWS Glue、次のようにDataFrame DynamicFrame の DynamicFrame または partitionBy()
に partitionKeys を列の日付情報とともに設定します。 -
DynamicFrame
glue_context.write_dynamic_frame_from_options( frame= dyf, connection_type='s3',format='parquet' connection_options= { 'partitionKeys': ["year", "month", "day"], 'path': 's3://<YourBucket>/<Prefix>/' } ) -
DataFrame
df.write.mode('append')\ .partitionBy('year','month','day')\ .parquet('s3://<YourBucket>/<Prefix>/')
これにより、出力データのコンシューマーのパフォーマンスを向上させることができます。
入力データセットを作成するパイプラインを変更するアクセス権限がない場合、パーティショニングはオプションではありません。代わりに、glob パターンを使用して不要な S3 パスを除外できます。DynamicFrame で読み取るときに除外を設定します。たとえば、次のコードでは、2023 年の 01~09 か月の日数を除外しています。
dyf = glueContext.create_dynamic_frame.from_catalog( database=db, table_name=table, additional_options = { "exclusions":"[\"**year=2023/month=0[1-9]/**\"]" }, transformation_ctx='dyf' )データカタログのテーブルプロパティで除外を設定することもできます。
-
キー:
exclusions -
値:
["**year=2023/month=0[1-9]/**"]
-
-
-
Amazon S3 パーティションが多すぎる – 数千の値を持つ ID 列など、幅広い値を含む列で Amazon S3 データをパーティション化することは避けてください。これにより、バケット内のパーティションの数が大幅に増加する可能性があります。可能なパーティションの数は、パーティション分割したすべてのフィールドの積であるためです。パーティションが多すぎると、以下が発生する可能性があります。
-
データカタログからパーティションメタデータを取得するためのレイテンシーの増加
-
Amazon S3 API リクエスト (、
List、GetおよびHead) を増やす必要がある小さなファイルの数の増加
たとえば、
partitionByまたは で日付タイプを設定する場合partitionKeys、 などの日付レベルのパーティショニングyyyy/mm/ddは多くのユースケースに適しています。ただし、yyyy/mm/dd/<ID>は多数のパーティションを生成するため、パフォーマンス全体に悪影響を与える可能性があります。一方、リアルタイム処理アプリケーションなどの一部のユースケースでは、 などの多くのパーティションが必要です
yyyy/mm/dd/hh。ユースケースで大量のパーティションが必要な場合は、AWS Glue パーティションインデックスを使用して、データカタログからパーティションメタデータを取得するためのレイテンシーを短縮することを検討してください。 -
データベースと JDBC
データベースから情報を取得するときにデータスキャンを減らすには、SQL クエリでwhere述語 (または 句) を指定できます。SQL インターフェイスを提供しないデータベースは、クエリまたはフィルタリングのための独自のメカニズムを提供します。
Java Database Connectivity (JDBC) 接続を使用する場合は、次のパラメータの where句を含む Select クエリを指定します。
-
DynamicFrame の場合は、sampleQuery オプションを使用します。を使用する場合は
create_dynamic_frame.from_catalog、次のように引additional_options数を設定します。query = "SELECT * FROM <TableName> where id = 'XX' AND" datasource0 = glueContext.create_dynamic_frame.from_catalog( database = db, table_name = table, additional_options={ "sampleQuery": query, "hashexpression": key, "hashpartitions": 10, "enablePartitioningForSampleQuery": True }, transformation_ctx = "datasource0" )の場合
using create_dynamic_frame.from_options、引connection_options数を次のように設定します。query = "SELECT * FROM <TableName> where id = 'XX' AND" datasource0 = glueContext.create_dynamic_frame.from_options( connection_type = connection, connection_options={ "url": url, "user": user, "password": password, "dbtable": table, "sampleQuery": query, "hashexpression": key, "hashpartitions": 10, "enablePartitioningForSampleQuery": True } ) -
DataFrame の場合は、 クエリ
オプションを使用します。 query = "SELECT * FROM <TableName> where id = 'XX'" jdbcDF = spark.read \ .format('jdbc') \ .option('url', url) \ .option('user', user) \ .option('password', pwd) \ .option('query', query) \ .load() -
Amazon Redshift の場合は、 AWS Glue 4.0 以降を使用して Amazon Redshift Spark コネクタのプッシュダウンサポートを活用します。
dyf = glueContext.create_dynamic_frame.from_catalog( database = "redshift-dc-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["temp-s3-dir"], additional_options = {"aws_iam_role": "arn:aws:iam::role-account-id:role/rs-role-name"} ) -
他のデータベースについては、そのデータベースのドキュメントを参照してください。
AWS Glue オプション
-
すべての連続ジョブ実行の完全なスキャンを回避し、前回のジョブ実行時に存在しなかったデータのみを処理するには、ジョブのブックマークを有効にします。
-
処理する入力データの量を制限するには、ジョブブックマークを使用して制限付き実行を有効にします。これにより、ジョブ実行ごとにスキャンされるデータの量を減らすことができます。