

# 在 AWS Glue ETL 中通过下推优化读取
<a name="aws-glue-programming-pushdown"></a>

 下推是一种优化技术，它可以将检索数据的逻辑推向离数据源更近的地方。源可以是数据库或文件系统，例如 Amazon S3。直接在源端执行某些操作时，无需将所有数据通过网络传送到由 AWS Glue 管理的 Spark 引擎，从而节省时间和处理能力。

换言之，下推可以减少数据扫描量。要详细了解确定何时适合使用这种技术的过程，请参阅《AWS 规范性指南》中“优化 AWS Glue for Apache Spark 作业性能的最佳实践”**指南中的 [减少数据扫描量](https://docs.aws.amazon.com/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/reduce-data-scan.html)。

## 对存储在 Amazon S3 上的文件的谓词下推
<a name="aws-glue-programming-pushdown-s3"></a>

 在 Amazon S3 上处理按前缀组织的文件时，可以通过定义下推谓词来筛选目标 Amazon S3 路径。可以直接将筛选器应用于 AWS Glue Data Catalog 中存储的分区元数据，而不必读取完整的数据集并在 `DynamicFrame` 中应用筛选器。这种方法允许您有选择地列出和只读必要的数据。有关此过程的更多信息，包括按分区写入存储桶，请参阅 [管理 AWS Glue 中用于 ETL 输出的分区](aws-glue-programming-etl-partitions.md)。

通过使用 `push_down_predicate` 参数，可以在 Amazon S3 中实现谓词下推。假设按年、月和日分区的 Amazon S3 中的一个存储桶。如果您想检索 2022 年 6 月的客户数据，可以指示 AWS Glue 仅读取相关的 Amazon S3 路径。在本例中，`push_down_predicate` 为 `year='2022' and month='06'`。综上所述，可以实现读取操作，如下所示：

------
#### [ Python ]

```
customer_records = glueContext.create_dynamic_frame.from_catalog( 
    database = "customer_db", 
    table_name = "customer_tbl",
    push_down_predicate = "year='2022' and month='06'"
)
```

------
#### [ Scala ]

```
val customer_records = glueContext.getCatalogSource(
database="customer_db", 
tableName="customer_tbl", 
pushDownPredicate="year='2022' and month='06'"
).getDynamicFrame()
```

------

在前面的场景中，`push_down_predicate` 从 AWS Glue Data Catalog 中检索所有分区的列表，并在读取底层 Amazon S3 文件之前对其进行筛选。尽管这在大多数情况下都有帮助，但在处理具有数百万个分区的数据集时，列出分区的过程可能很耗时。为了解决这个问题，可以使用服务器端的分区修剪来提高性能。这通过在 AWS Glue Data Catalog 中为数据建立**分区索引**来完成。有关分区索引的更多信息，请参阅 [创建分区索引](partition-indexes.md)。然后，您可以使用 `catalogPartitionPredicate` 选项来引用索引。有关使用 `catalogPartitionPredicate` 检索分区的示例，请参阅 [使用目录分区谓词进行服务器端筛选](aws-glue-programming-etl-partitions.md#aws-glue-programming-etl-partitions-cat-predicates)。

## 使用 JDBC 源时下推
<a name="aws-glue-programming-pushdown-jdbc"></a>

`GlueContext` 中使用的 AWS Glue JDBC 读取器通过提供可以直接在源上运行的自定义 SQL 查询，支持对支持的数据库进行下推。这可以通过设置 `sampleQuery` 参数来实现。您的示例查询可以指定要选择的列，还可以提供下推谓词来限制传输到 Spark 引擎的数据。

默认情况下，示例查询在单个节点上运行，这可能会在处理大量数据时导致作业失败。要使用此功能大规模查询数据，您应该通过设置 `enablePartitioningForSampleQuery` 为 true 来配置查询分区，这将通过您选择的键将查询分发到多个节点。查询分区还需要一些其他必要的配置参数。有关查询分区的更多信息，请参阅 [从 JDBC 表并行读取](run-jdbc-parallel-read-job.md)。

设置 `enablePartitioningForSampleQuery` 时，AWS Glue 会在查询数据库时将您的下推谓词与分区谓词组合在一起。`sampleQuery` 必须以 `AND` for AWS Glue 结尾才能附加分区条件。（如果您未提供下推谓词，则 `sampleQuery` 必须以 `WHERE` 结尾）。请参阅下面的示例，其中我们下推一个谓词以仅检索 `id` 大于 1000 的行。此 `sampleQuery` 将仅返回 `id` 大于指定值的行的名称和位置列：

------
#### [ Python ]

```
sample_query = "select name, location from customer_tbl WHERE id>=1000 AND"
customer_records = glueContext.create_dynamic_frame.from_catalog(
    database="customer_db",
    table_name="customer_tbl",
    sample_query = "select name, location from customer_tbl WHERE id>=1000 AND",

    additional_options = { 
                           "hashpartitions": 36 , 
                           "hashfield":"id",
                           "enablePartitioningForSampleQuery":True, 
                           "sampleQuery":sample_query
                          }
)
```

------
#### [ Scala ]

```
val additionalOptions = Map( 
        "hashpartitions" -> "36", 
        "hashfield" -> "id", 
        "enablePartitioningForSampleQuery" -> "true", 
        "sampleQuery" -> "select name, location from customer_tbl WHERE id >= 1000 AND"
        )
 
    val customer_records = glueContext.getCatalogSource(
        database="customer_db", 
        tableName="customer_tbl").getDynamicFrame()
```

------

**注意**  
如果 `customer_tbl` 在数据目录和底层数据存储中的名称不同，则必须在 sample\$1query 中提供底层表的名称，因为查询将传递到底层数据存储。

您也可以在不与 AWS Glue Data Catalog 集成的情况下对 JDBC 表进行查询。您可以通过提供 `useConnectionProperties` 和 `connectionName` 来重用来自先前存在连接的凭证，而不必提供用户名和密码作为该方法的参数。在本例中，我们从名为 `my_postgre_connection` 的连接检索凭证。

------
#### [ Python ]

```
connection_options_dict = {
    "useConnectionProperties": True,
    "connectionName": "my_postgre_connection",
    "dbtable":"customer_tbl",
    "sampleQuery":"select name, location from customer_tbl WHERE id>=1000 AND",
    "enablePartitioningForSampleQuery":True,
    "hashfield":"id",
    "hashpartitions":36
    }

customer_records = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options=connection_options_dict
    )
```

------
#### [ Scala ]

```
val connectionOptionsJson = """
      {
        "useConnectionProperties": true,
        "connectionName": "my_postgre_connection",
        "dbtable": "customer_tbl",
        "sampleQuery": "select name, location from customer_tbl WHERE id>=1000 AND",
        "enablePartitioningForSampleQuery" : true,
        "hashfield" : "id",
        "hashpartitions" : 36
      }
    """
    
    val connectionOptions = new JsonOptions(connectionOptionsJson)
    
    val dyf = glueContext.getSource("postgresql", connectionOptions).getDynamicFrame()
```

------

## AWS Glue 中下推的注意事项和限制
<a name="aws-glue-programming-pushdown-other"></a>

作为一个概念，“下推”适用于从非串流源读取数据。AWSGlue 支持多种信号源，下推的能力取决于源和连接器。
+ 连接到 Snowflake 时，您可以使用 `query` 选项。AWS Glue 4.0 及更高版本的 Redshift 连接器中也有类似的功能。有关使用 `query` 从 Snowflake 读取内容的更多信息，请参阅 [从 Snowflake 表中读取](aws-glue-programming-etl-connect-snowflake-home.md#aws-glue-programming-etl-connect-snowflake-read)。
+ DynamoDB ETL 读取器不支持筛选条件或下推谓词。MongoDB 和 DocumentDB 也不支持这种功能。
+ 从以开放表格式存储在 Amazon S3 中的数据中读取数据时，Amazon S3 中文件的分区方法已不再足够。要使用开放表格式从分区读取和写入，请查阅格式文档。
+ DynamicFrame 方法不执行 Amazon S3 投影下推。所有列都将从传递谓词筛选器的文件中读取。
+ 在 AWS Glue 中使用 `custom.jdbc` 连接器时，下推的能力取决于源和连接器。请查看相应的连接器文档，以确认它是否以及如何支持 AWS Glue 中的下推。