

# AWS Glue 中 Apache Spark 的生成式人工智能升级
<a name="upgrade-analysis"></a>

 AWS Glue 中的 Spark 升级功能使数据工程师和开发者能够使用生成式人工智能将其现有的 AWS Glue Spark 作业升级和迁移到最新的 Spark 版本。数据工程师可以使用该功能来扫描其 AWS Glue Spark 作业、生成升级计划、执行计划和验证输出。该功能通过自动执行识别和更新 Spark 脚本、配置、依赖项、方法和功能的无差别工作，缩短了 Spark 升级的时间并降低了升级成本。

![\[GIF 显示示例升级分析工作流的端到端实现。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/demo_lumos.gif)


## 工作原理
<a name="upgrade-analysis-how-it-works"></a>

 使用升级分析功能时，AWS Glue 会识别作业务代码中版本和配置之间的差异，从而生成升级计划。升级计划会详细说明所有代码更改以及所需的迁移步骤。接下来，AWS Glue 会在环境中构建并运行升级后的应用程序以验证更改，并生成代码更改列表供您迁移作业。您可以查看更新的脚本以及详细说明建议更改的摘要。运行自己的测试后，接受更改，AWS Glue 作业将自动更新为采用新脚本的最新版本。

 升级分析过程可能需要一些时间才能完成，具体时间取决于作业的复杂程度和工作负载。升级分析结果将存储在指定的 Amazon S3 路径中，可以查看该路径以了解升级情况和任何潜在的兼容性问题。查看升级分析结果后，您可以决定是继续进行实际升级，还是在升级之前对作业进行任何必要的更改。

## 先决条件
<a name="upgrade-analysis-prerequisites"></a>

 要在 AWS Glue 中使用生成式人工智能升级作业，需要满足以下先决条件：
+  AWS Glue 2 PySpark 作业：只有 AWS Glue 2 作业可以升级到 AWS Glue 5。
+  需要 IAM 权限才能开始分析、查看结果和升级作业。有关更多信息，请参阅下面[权限](#auto-upgrade-permissions)部分中的示例。
+  如果使用 AWS KMS 加密分析构件，则需要额外的 AWS AWS KMS 权限。有关更多信息，请参阅下面[AWS KMS 策略](#auto-upgrade-kms-policy)部分中的示例。

### 权限
<a name="auto-upgrade-permissions"></a>

#### 要开始新的升级分析，您需要以下权限：
<a name="collapsible-section-1"></a>

1.  使用以下权限调用者的 IAM 策略：

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "glue:StartJobUpgradeAnalysis",
                   "glue:StartJobRun",
                   "glue:GetJobRun",
                   "glue:GetJob",
                   "glue:BatchStopJobRun"
               ],
               "Resource": [
                   "arn:aws:glue:us-east-1:111122223333:job/jobName"
               ]
           },
           {
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject"
               ],
               "Resource": [
              		 "arn:aws:s3:::amzn-s3-demo-bucket/script-location/*"
               ]
           },
           {
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject",
                   "s3:GetObject"
               ],
               "Resource": [
               		"arn:aws:s3:::amzn-s3-demo-bucket/results/*"
               ]
           },
           {
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt",
                   "kms:GenerateDataKey"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id"
           }
       ]
   }
   ```

------

1.  更新要升级的作业的执行角色，包含以下内联策略：

   ```
       {
         "Effect": "Allow",
         "Action": ["s3:GetObject"],    
         "Resource": [
           "ARN of the Amazon S3 path provided on API",
           "ARN of the Amazon S3 path provided on API/*"
         ]
       }
   ```

    例如，如果您使用 Amazon S3 路径 `s3://amzn-s3-demo-bucket/upgraded-result`，则该策略将为：

   ```
   {
         "Effect": "Allow",
         "Action": ["s3:GetObject"],
         "Resource": [
           "arn:aws:s3:::amzn-s3-demo-bucket/upgraded-result/",
           "arn:aws:s3:::amzn-s3-demo-bucket/upgraded-result/*"
         ]
       }
   ```

#### 要检索分析的详细信息，您需要以下权限：
<a name="collapsible-section-2"></a>

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "glue:GetJobUpgradeAnalysis"
      ],
      "Resource": [
        "arn:aws:glue:us-east-1:111122223333:job/jobName"
      ]
    }
  ]
}
```

------

#### 要停止正在进行的分析，您需要以下权限：
<a name="collapsible-section-3"></a>

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "glue:StopJobUpgradeAnalysis",
        "glue:BatchStopJobRun"
      ],
      "Resource": [
        "arn:aws:glue:us-east-1:111122223333:job/jobName"
      ]
    }
  ]
}
```

------

#### 要列出针对特定作业提交的所有分析，您需要以下权限：
<a name="collapsible-section-4"></a>

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "glue:ListJobUpgradeAnalyses"
      ],
      "Resource": [
        "arn:aws:glue:us-east-1:111122223333:job/jobName"
      ]
    }
  ]
}
```

------

#### 要接受分析的更改并升级作业，您需要以下权限：
<a name="collapsible-section-5"></a>

### AWS KMS 策略
<a name="auto-upgrade-kms-policy"></a>

 要在开始分析时传递自己的自定义 AWS KMS 密钥，请参阅以下部分以配置对 AWS KMS 密钥的相应权限。

#### 使用 AWS KMS 密钥配置结果构件加密：
<a name="w2aac37b7c20c13c13b5b5"></a>

 此策略可确保您拥有 AWS KMS 密钥的加密和解密权限。

```
{
    "Effect": "Allow",
    "Principal":{
        "AWS": "<IAM Customer caller ARN>"
    },
    "Action": [
      "kms:Decrypt",
      "kms:GenerateDataKey",
    ],
    "Resource": "<key-arn-passed-on-start-api>"
}
```

## 运行升级分析并应用升级脚本
<a name="auto-upgrade-procedure"></a>

 您可以运行升级分析，这将为您从**作业**视图中选择的作业生成升级计划。

1.  从**作业**中选择 AWS Glue 2.0 作业，然后从**操作**菜单中选择**运行升级分析**。  
![\[屏幕截图显示从操作菜单使用人工智能进行升级分析。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/upgrade-analysis-run-action-menu.png)

1.  在模态中，在**结果路径**中选择存储生成的升级计划的路径。此路径必须是您可以访问和写入的 Amazon S3 存储桶。  
![\[屏幕截图显示已完成的升级分析。应用升级的脚本按钮可见。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/upgrade-analysis-configuration-options.png)

1.  如果需要，配置其他选项：
   +  **运行配置**（可选）：运行配置是一项可选的设置，允许您自定义升级分析期间执行的验证运行的各个方面。此配置用于执行升级后的脚本，并允许您选择计算环境属性（工作线程类型、工作线程数量等）。请注意，在审查、接受更改并将其应用于生产环境之前，您应该使用非生产开发者账户对示例数据集进行验证。运行配置包括以下可自定义的参数：
     + 工作线程类型：您可以指定用于验证运行的工作线程类型，从而允许您根据自己的要求选择适当的计算资源。
     + 工作线程数量：您可以定义要为验证运行预置的工作线程数量，从而使您能够根据工作负载需求扩展资源。
     + 作业超时（分钟）：此参数允许您设置验证运行的时间限制，从而确保作业在指定的持续时间后终止，以防止过度消耗资源。
     + 安全配置：您可以配置加密和访问控制等安全设置，以确保在验证运行期间保护您的数据和资源。
     + 其他作业参数：如果需要，则您可以添加新的作业参数，以进一步自定义验证运行的执行环境。

      通过利用运行配置，您可以量身定制验证运行以满足自己的特定要求。例如，您可以将验证运行配置为使用较小的数据集，这样可以更快地完成分析并优化成本。此方法可确保高效执行升级分析，同时最大限度地降低验证阶段的资源利用率和相关成本。
   +  **加密配置**（可选）：
     + **启用升级构件加密**：在将数据写入结果路径时启用静态加密。如果您不想加密升级构件，则请取消选中此选项。

1.  选择**运行**开始升级分析。分析运行时，您可以在**升级分析**选项卡中查看结果。分析详细信息窗口将显示有关分析的信息以及指向升级构件的链接。
   +  **结果路径**：此项是存储结果摘要和升级脚本的位置。
   +  **Amazon S3 中的升级脚本**：升级脚本在 Amazon S3 中的位置。您可以在应用升级之前查看该脚本。
   +  **Amazon S3 中的升级摘要**：升级摘要在 Amazon S3 中的位置。您可以在应用升级之前查看升级摘要。

1.  成功完成升级分析后，您可以通过选择**应用升级的脚本**，应用升级的脚本以自动升级作业。

    应用后，AWS Glue 版本将更新为 4.0。您可以在**脚本**选项卡中查看新脚本。  
![\[屏幕截图显示已完成的升级分析。应用升级的脚本按钮可见。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/upgrade-analysis-analysis-details-preview.png)

## 了解升级摘要
<a name="auto-upgrade-analysis-summary"></a>

 此示例演示将 AWS Glue 作业从 2.0 版升级到 4.0 版的过程。该示例作业从 Amazon S3 存储桶读取产品数据，使用 Spark SQL 对数据进行多次转换，然后将转换后的结果保存回 Amazon S3 存储桶。

### 原始代码（AWS Glue 2.0）：升级前
<a name="w2aac37b7c20c21b5b1"></a>

```
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.job import Job
import json
from pyspark.sql.types import StructType

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

gdc_database = "s3://aws-glue-scripts-us-east-1-gamma/demo-database/"
schema_location = (
    "s3://aws-glue-scripts-us-east-1-gamma/DataFiles/"
)

products_schema_string = spark.read.text(
    f"{schema_location}schemas/products_schema"
).first()[0]

product_schema = StructType.fromJson(json.loads(products_schema_string))

products_source_df = (
    spark.read.option("header", "true")
    .schema(product_schema)
    .option(
        "path",
        f"{gdc_database}products/",
    )
    .csv(f"{gdc_database}products/")
)

products_source_df.show()
products_temp_view_name = "spark_upgrade_demo_product_view"
products_source_df.createOrReplaceTempView(products_temp_view_name)

query = f"select {products_temp_view_name}.*, format_string('%0$s-%0$s', category, subcategory) as unique_category from {products_temp_view_name}"
products_with_combination_df = spark.sql(query)
products_with_combination_df.show()

products_with_combination_df.createOrReplaceTempView(products_temp_view_name)
product_df_attribution = spark.sql(
    f"""
SELECT *,
unbase64(split(product_name, ' ')[0]) as product_name_decoded,
unbase64(split(unique_category, '-')[1]) as subcategory_decoded
FROM {products_temp_view_name}
"""
)
product_df_attribution.show()


product_df_attribution.write.mode("overwrite").option("header", "true").option(
    "path", f"{gdc_database}spark_upgrade_demo_product_agg/"
).saveAsTable("spark_upgrade_demo_product_agg", external=True)

spark_upgrade_demo_product_agg_table_df = spark.sql(
    f"SHOW TABLE EXTENDED in default like 'spark_upgrade_demo_product_agg'"
)
spark_upgrade_demo_product_agg_table_df.show()
job.commit()
```

### 新代码（Glue 4.0）：升级后
<a name="upgrade-analysis-example-new-code-glue-4"></a>

```
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.job import Job
import json
from pyspark.sql.types import StructType

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# change 1
spark.conf.set("spark.sql.adaptive.enabled", "false")
# change 2
spark.conf.set("spark.sql.legacy.pathOptionBehavior.enabled", "true")
job = Job(glueContext)

gdc_database = "s3://aws-glue-scripts-us-east-1-gamma/demo-database/"
schema_location = (
    "s3://aws-glue-scripts-us-east-1-gamma/DataFiles/"
)

products_schema_string = spark.read.text(
    f"{schema_location}schemas/products_schema"
).first()[0]

product_schema = StructType.fromJson(json.loads(products_schema_string))

products_source_df = (
    spark.read.option("header", "true")
    .schema(product_schema)
    .option(
        "path",
        f"{gdc_database}products/",
    )
    .csv(f"{gdc_database}products/")
)

products_source_df.show()
products_temp_view_name = "spark_upgrade_demo_product_view"
products_source_df.createOrReplaceTempView(products_temp_view_name)

# change 3
query = f"select {products_temp_view_name}.*, format_string('%1$s-%1$s', category, subcategory) as unique_category from {products_temp_view_name}"
products_with_combination_df = spark.sql(query)
products_with_combination_df.show()

products_with_combination_df.createOrReplaceTempView(products_temp_view_name)
# change 4
product_df_attribution = spark.sql(
    f"""
SELECT *,
try_to_binary(split(product_name, ' ')[0], 'base64') as product_name_decoded,
try_to_binary(split(unique_category, '-')[1], 'base64') as subcategory_decoded
FROM {products_temp_view_name}
"""
)
product_df_attribution.show()


product_df_attribution.write.mode("overwrite").option("header", "true").option(
    "path", f"{gdc_database}spark_upgrade_demo_product_agg/"
).saveAsTable("spark_upgrade_demo_product_agg", external=True)

spark_upgrade_demo_product_agg_table_df = spark.sql(
    f"SHOW TABLE EXTENDED in default like 'spark_upgrade_demo_product_agg'"
)
spark_upgrade_demo_product_agg_table_df.show()
job.commit()
```

### 分析摘要说明
<a name="upgrade-analysis-explanation-summary"></a>

![\[屏幕截图显示升级分析摘要。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/upgrade-analysis-explanation-summary.png)


 根据摘要，AWS Glue 提出了四项更改，以便将脚本从 AWS Glue 2.0 成功升级到 AWS Glue 4.0：

1.  **Spark SQL 配置（spark.sql.adaptive.enabled）**：由于从 Spark 3.2 开始引入了 Spark SQL 自适应查询执行的新功能，此更改旨在恢复应用程序行为。您可以检查此配置更改，并可以根据其偏好进一步启用或禁用它。

1.  **DataFrame API 更改**：路径选项不能与 `load()` 等其他 DataFrameReader 操作共存。为保留之前的行为，AWS Glue 更新了该脚本以添加新的 SQL 配置**（spark.sql.legacy.pathOptionBehavior.enabled）**。

1.  **Spark SQL API 更改**：`format_string(strfmt, obj, ...)` 中 `strfmt` 的行为已更新为不允许 `0$` 作为第一个参数。为确保兼容性，AWS Glue 修改了该脚本以使用 `1$` 作为第一个参数。

1.  **Spark SQL API 更改**：`unbase64` 函数不允许输入格式错误的字符串。为了保留之前的行为，AWS Glue 更新了该脚本以使用 `try_to_binary` 函数。

## 停止正在进行的升级分析
<a name="auto-upgrade-stopping-analysis"></a>

 您可以取消正在进行的升级分析，也可以直接停止分析。

1.  选择**升级分析**选项卡。

1.  选择正在运行的作业，然后选择**停止**。这将停止分析。然后，您可以对同一作业进行其他升级分析。  
![\[屏幕截图显示已选择作业的升级分析选项卡。作业仍在运行。\]](http://docs.aws.amazon.com/zh_cn/glue/latest/dg/images/upgrade-analysis-tab.png)

## 注意事项
<a name="upgrade-analysis-considerations"></a>

 开始使用 Spark 升级功能时，要优化使用服务，需要考虑几个重要方面。
+  **服务范围和限制**：当前版本针对从 AWS Glue 版本 2.0 到版本 5.0 的 PySpark 代码升级。目前，该服务处理不依赖于其他库依赖项的 PySpark 代码。您可以在一个 AWS 账户中同时运行最多 10 个作业的自动升级，从而在保持系统稳定性的同时高效升级多个作业。
  +  仅支持 PySpark 作业。
  +  升级分析将在 24 小时后超时。
  +  一个作业一次只能运行一个主动升级分析。在账户级别，最多可以同时运行 10 个主动升级分析。
+  **在升级过程中优化成本**：由于 Spark 升级功能使用生成式人工智能通过多次迭代来验证升级计划，并且每次迭代都在您的账户中以 AWS Glue 作业的形式运行，因此优化验证作业运行配置以实现成本效率至关重要。为此，建议在开始升级分析时指定运行配置如下：
  +  使用非生产开发者账户，选择代表您的生产数据但规模较小的示例模拟数据集，以使用 Spark 升级功能进行验证。
  +  使用规模合适的计算资源，例如 G.1X 工作线程，并选择适当数量的工作线程来处理您的示例数据。
  +  在适用时启用 AWS Glue 作业自动扩缩功能，以根据工作负载自动调整资源。

   例如，如果您的生产作业使用 20 个 G.2X 工作线程处理数 TB 的数据，则可以将升级作业配置为使用 2 个 G.2X 工作线程中处理几 GB 的代表性数据，并启用自动扩缩以进行验证。
+  **最佳实践**：强烈建议从非生产作业开始升级之旅。这种方法使您可以熟悉升级工作流，并了解该服务如何处理不同类型的 Spark 代码模式。
+  **警报和通知**：对作业使用生成式人工智能升级功能时，请确保关闭作业运行失败的警报/通知。在升级过程中，提供升级的构件之前，您的账户中最多可有 10 次失败的作业运行。
+  **异常检测规则**：关闭对进行升级的作业的任何异常检测规则，因为在进行升级验证时，中间作业运行期间写入输出文件夹的数据可能并非预期的格式。
+  **对幂等作业使用升级分析**：对幂等作业使用升级分析，可确保每次后续验证作业的运行尝试都与前一次一致，避免出现问题。幂等作业是指可以使用相同输入数据多次运行，且每次运行都会产生相同输出的作业。在 AWS Glue 中使用适用于 Apache Spark 的生成式人工智能升级时，该服务会在验证流程中对作业运行多次迭代。在每次迭代期间，都会更改 Spark 代码和配置来验证升级计划。如果 Spark 作业不是幂等作业，则使用相同的输入数据多次运行该作业可能会导致问题。

## 支持的区域
<a name="upgrade-analysis-supported-regions"></a>

适用于 Apache Spark 的生成式人工智能升级已在以下区域推出：
+ **亚太地区**：东京（ap-northeast-1）、首尔（ap-northeast-2）、孟买（ap-south-1）、新加坡（ap-southeast-1）和悉尼（ap-southeast-2）
+ **北美洲**：加拿大（ca-central-1）
+ **欧洲地区**：法兰克福（eu-central-1）、斯德哥尔摩（eu-north-1）、爱尔兰（eu-west-1）、伦敦（eu-west-2）和巴黎（eu-west-3）
+ **南美洲**：圣保罗（sa-east-1）
+ **美国**：弗吉尼亚州北部（us-east-1）、俄亥俄州（us-east-2）和俄勒冈州（us-west-2）

## Spark Upgrades 中的跨区域推理
<a name="w2aac37b7c20c37"></a>

 Spark Upgrades 由 Amazon Bedrock 提供支持并利用跨区域推理 (CRIS)。利用 CRIS，Spark Upgrades 将自动选择您所在地理位置的最佳区域（详见[此处](https://docs.aws.amazon.com/bedrock/latest/userguide/cross-region-inference.html)）来处理您的推理请求，最大限度地提高可用计算资源和模型可用性，并提供最佳客户体验。使用跨区域推理不会产生额外成本。

 跨区域推理请求保留在属于数据原始所在地理位置的 AWS 区域内。例如，在美国境内提出的请求将保留在美国境内的 AWS 区域内。尽管数据仍然只存储在主区域中，但在使用跨区域推理时，您的输入提示和输出结果可能会移出主区域。所有数据都将通过 Amazon 的安全网络进行加密传输。