View a markdown version of this page

通过 Spark Connect 与亚马逊 EMR Serverless 进行交互式会话 - Amazon EMR

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

通过 Spark Connect 与亚马逊 EMR Serverless 进行交互式会话

在 Amazon EMR 版本emr-7.13.0及更高版本中,您可以使用与 Apache Spark Connect 的 EMR 无服务器会话从 VS Code 等自我管理的 PySpark 客户端(例如 VS Code)和 Jupyter 笔记本连接到 Amazon EMR 无服务器应用程序。 PyCharm APIs Spark Connect 使用客户端-服务器架构,将您的应用程序代码与 Spark 驱动程序进程分离。当 Spark 操作在 EMR Serverless 计算上运行时,你可以在本地 IDE 中开发和调试 PySpark 代码。Spark Connect 提供以下好处:

  • 从任何 PySpark 客户端(包括 VS Code 和 Jupyter 笔记本电脑)连接到 EMR Serverless。 PyCharm

  • 在生产规模的数据上远程 DataFrames运行时,在 IDE 中设置断点并逐步执行 PySpark 代码。

Spark Connect 会话是您的本地 PySpark 客户端与在 Amazon EMR Serverless 上运行的 Spark 驱动程序之间的托管连接。当您启动会话时,EMR Serverless 会代表您配置 Spark 驱动程序和执行器。您的本地客户端向驱动程序发送 DataFrame和 SQL 操作,驱动程序会远程运行它们。会话将一直持续到您终止会话或达到空闲超时为止,因此您无需重新启动 Spark 即可交互式运行多个查询。每个会话都有自己的终端节点 URL 和用于连接的身份验证令牌。

所需的权限

除了访问亚马逊 EMR Serverless 所需的权限外,还要向你的 IAM 角色添加以下权限,以访问 Spark Connect 终端节点和管理 Spark Connect 会话:

emr-serverless:StartSession

授予在您指定为的应用程序上创建 Spark Connect 会话的权限Resource

emr-serverless:GetSessionEndpoint

授予检索 Spark Connect 端点网址和会话身份验证令牌的权限。

emr-serverless:GetSession

授予获取会话状态的权限。

emr-serverless:ListSessions

授予在应用程序上列出会话的权限。

emr-serverless:TerminateSession

授予终止会话的权限。

iam:PassRole

授予在创建 Spark Connect 会话时访问 IAM 执行角色的权限。Amazon EMR Serverless 使用此角色来运行您的工作负载。

emr-serverless:GetResourceDashboard

授予生成 Spark 用户界面网址的权限并提供对会话日志的访问权限。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "EMRServerlessApplicationLevelAccess", "Effect": "Allow", "Action": [ "emr-serverless:StartSession", "emr-serverless:ListSessions" ], "Resource": [ "arn:aws:emr-serverless:region:account-id:/applications/application-id" ] }, { "Sid": "EMRServerlessSessionLevelAccess", "Effect": "Allow", "Action": [ "emr-serverless:GetSession", "emr-serverless:GetSessionEndpoint", "emr-serverless:TerminateSession", "emr-serverless:GetResourceDashboard" ], "Resource": [ "arn:aws:emr-serverless:region:account-id:/applications/application-id/sessions/*" ] }, { "Sid": "EMRServerlessRuntimeRoleAccess", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam::account-id:role/EMRServerlessExecutionRole" ], "Condition": { "StringLike": { "iam:PassedToService": "emr-serverless.amazonaws.com" } } } ] }

使用交互式会话

要创建支持 Spark Connect 的应用程序并连接到该应用程序,请按照以下步骤操作。

启动 Spark Connect 会话
  1. 使用 Spark Connect 会话创建应用程序。

    aws emr-serverless create-application \ --type "SPARK" \ --name "spark-connect-app" \ --release-label emr-7.13.0 \ --interactive-configuration '{"sessionEnabled": true}'
  2. 在 Amazon EMR Serverless 创建您的应用程序后,如果您尚未启用自动启动功能以接受 Spark Connect 会话,请启动该应用程序。

    aws emr-serverless start-application \ --application-id APPLICATION_ID
  3. 使用以下命令检查应用程序的状态。状态变为后STARTED,开始会话。

    aws emr-serverless get-application \ --application-id APPLICATION_ID
  4. 使用授予数据访问权限的 IAM 执行角色启动会话。

    aws emr-serverless start-session \ --application-id APPLICATION_ID \ --execution-role-arn arn:aws:iam::account-id:role/EMRServerlessExecutionRole
  5. 使用 get-session API 监控会话状态,然后等待会话进入STARTEDIDLE状态。

    aws emr-serverless get-session \ --application-id APPLICATION_ID \ --session-id SESSION_ID
  6. 检索 Spark Connect 端点和身份验证令牌。返回的终端节点 URL GetSessionEndpoint 不包含端口号。在构造sc://连接 URL 时,必须附加:443sc://hostname:443/;use_ssl=true;x-aws-proxy-auth=token例如。没有它, PySpark 客户端默认为端口 15002,在 EMR Serverless 上无法访问该端口。

    aws emr-serverless get-session-endpoint \ --application-id APPLICATION_ID \ --session-id SESSION_ID

    响应包括终端节点 URL 和身份验证令牌:

    { "endpoint": "ENDPOINT_URL", "authToken": "AUTH_TOKEN", "authTokenExpiresAt": "AUTH_TOKEN_EXPIRY_TIME" }
  7. 终端节点准备就绪后,从 PySpark 客户端进行连接。在你的 EMR Serverless 应用程序上安装与 Spark 版本匹配的 PySpark 客户端,并安装适用于 Python AWS 的软件开发工具包。

    # Match the PySpark version to your EMR Serverless release version (3.5.6 for emr-7.13.0) pip install pyspark[connect]==3.5.6 pip install boto3

以下是启动会话并将请求直接发送到会话端点的 Python 脚本示例:

import boto3 import time from pyspark.sql import SparkSession from pyspark.sql.functions import col client = boto3.client('emr-serverless', region_name='REGION') APPLICATION_ID = 'APPLICATION_ID' EXECUTION_ROLE = 'arn:aws:iam::account-id:role/EMRServerlessExecutionRole' # Start the session response = client.start_session( applicationId=APPLICATION_ID, executionRoleArn=EXECUTION_ROLE ) session_id = response['sessionId'] print(f"Session {session_id} starting...") # Wait for the session to be ready while True: response = client.get_session( applicationId=APPLICATION_ID, sessionId=session_id ) state = response['session']['state'] print(f"Session state: {state}") if state in ('STARTED', 'IDLE'): break if state in ('FAILED', 'TERMINATED'): raise Exception(f"Session failed: {response['session'].get('stateDetails', 'Unknown error')}") time.sleep(5) # Retrieve the Spark Connect endpoint and authentication token response = client.get_session_endpoint( applicationId=APPLICATION_ID, sessionId=session_id ) # Construct the authenticated remote URL auth_token = response['authToken'] endpoint_url = response['endpoint'] connect_url = endpoint_url.replace("https://", "sc://", 1) + ":443/;use_ssl=true;" connect_url += f"x-aws-proxy-auth={auth_token}" # Start the Spark session spark = SparkSession.builder.remote(connect_url).getOrCreate() print(f"Connected. Spark version: {spark.version}") # Run SQL spark.sql("SELECT 1+1 AS result").show() # Run DataFrame operations df = spark.range(100).withColumn("squared", col("id") * col("id")) df.show(10) print(f"Count: {df.count()}") # Stop the Spark session (disconnects the client only) spark.stop() # Terminate the EMR Serverless session to stop billing. # spark.stop() only closes the local client connection. The remote session # continues running and incurring charges until you explicitly terminate it # or it reaches the idle timeout. client.terminate_session( applicationId=APPLICATION_ID, sessionId=session_id ) print(f"Session {session_id} terminated.")

要访问实时的 Spark 用户界面或 Spark 历史服务器以进行会话,请使用 GetResourceDashboard API。

response = client.get_resource_dashboard( applicationId=APPLICATION_ID, resourceId=session_id, resourceType='SESSION' ) response['url']

当会话处于活动状态时,URL 会打开实时 Apache Spark 用户界面,用于实时监控查询、阶段和执行器。会话结束后,Spark 历史服务器仍可通过 Amazon EMR Serverless 控制台进行会话后分析。

注意事项和限制

通过 Spark Connect 运行交互式工作负载时,请考虑以下几点。

  • 亚马逊 EMR 无服务器版本及更高版本emr-7.13.0支持 Spark Connect。

  • 只有 Apache Spark 引擎支持 Spark Connect。

  • Spark Connect 支持 DataFrame 和 SQL APIs 进 PySpark来。不支持基 APIs 于 RDD。

  • 身份验证令牌的时间限制为 1 小时。令牌过期后,gRPC 调用失败并出现身份验证错误。调GetSessionEndpoint用获取新令牌并SparkSession使用更新的令牌创建新令牌。

  • 会话在可配置的空闲超时后结束。默认超时设置为 1 小时。

  • 默认情况下,每个会话的硬限制为 24 小时,之后即使它正在运行任务,它也会自动终止。

  • 默认情况下,每个 EMR Serverless 应用程序最多支持 25 个并发会话。要申请提高限额,请联系 Supp AWS ort。

  • 默认情况下,应用程序autoStopConfig处于开启状态。应用程序会在 15 分钟后自动停止,且没有活动会话或作业运行。您可以将此配置作为 create-applicationupdate-application 请求的一部分进行更改。

  • 为获得最佳启动体验,请为驱动程序和执行程序配置预初始化的容量。

  • 在启动 EMR Serverless 会话之前,您应该启用 AutoStart 或手动启动应用程序。

  • 本地安装的 PySpark 版本必须与亚马逊 EMR 无服务器应用程序上的 Apache Spark 版本匹配(适用于 3.5.6)。emr-7.13.0版本不匹配会导致意外ImportError行为。

  • Spark Connect 会话不支持通过 Lake Formation 进行精细访问控制。

  • 使用 Spark Connect 的交互式会话不支持可信身份传播。

  • 使用 Spark Connect 的交互式会话不支持 EMR Serverless 上的无服务器存储。

  • 使用 Spark Connect 不收取额外费用。您只需为会话期间消耗的 EMR 无服务器计算资源(vCPU、内存和存储)付费。

  • Spark 配置spark.connect.grpc.binding.address由 EMR Serverless 保留,用户无法覆盖。

  • 您在本地安装的 PySpark 软件包必须与 EMR Serverless 应用程序上的 Spark 版本相匹配。版本不匹配会导致连接错误。Python UDFs (@udf,spark.udf.register) 还需要本地 Python 次要版本来匹配工作程序,否则它们就会失败PYTHON_VERSION_MISMATCH。内置 SQL 函数和 DataFrame操作不需要 Python 版本匹配。

  • 要使用传递 Spark 配置start-session,请在--configuration-overrides参数中将其设置runtimeConfiguration在下方。start-job-runAPI applicationConfiguration 改为使用。