

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Spark Connect を使用して Amazon EMR Serverless でインタラクティブセッションを実行する
<a name="spark-connect"></a>

Amazon EMR リリース `emr-7.13.0`以降では、Apache PySpark Connect で EMR Serverless セッション APIs を使用して、VS Code、PyCharm、Jupyter Notebooks などのセルフマネージド PySpark クライアントから Amazon EMR Serverless アプリケーションに接続できます。Spark Connect は、アプリケーションコードを Spark ドライバープロセスから切り離すクライアント/サーバーアーキテクチャを使用します。Spark オペレーションが EMR Serverless コンピューティングで実行される間、ローカル IDE で PySpark コードを開発およびデバッグします。Spark Connect には次の利点があります。
+ VS Code、PySpark クライアントから EMR Serverless に接続します。 PyCharm
+ DataFrames が本番環境のデータをリモートで実行している間、ブレークポイントを設定し、IDE で PySpark コードをステップスルーします。

Spark Connect セッションは、ローカル PySpark クライアントと Amazon EMR Serverless で実行されている Spark ドライバー間のマネージド接続です。セッションを開始すると、EMR Serverless はユーザーに代わって Spark ドライバーとエグゼキュターをプロビジョニングします。ローカルクライアントは DataFrame および SQL オペレーションをドライバーに送信し、ドライバーはリモートで実行します。セッションは終了するかアイドルタイムアウトに達するまで保持されるため、Spark を再起動せずに複数のクエリをインタラクティブに実行できます。各セッションには、接続に使用する独自のエンドポイント URL と認証トークンがあります。

## 必要なアクセス許可
<a name="spark-connect-permissions"></a>

Amazon EMR Serverless にアクセスするために必要なアクセス許可に加えて、IAM ロールに次のアクセス許可を追加して Spark Connect エンドポイントにアクセスし、Spark Connect セッションを管理します。

`emr-serverless:StartSession`  
として指定したアプリケーションに Spark Connect セッションを作成するアクセス許可を付与します`Resource`。

`emr-serverless:GetSessionEndpoint`  
セッションの Spark Connect エンドポイント URL と認証トークンを取得するアクセス許可を付与します。

`emr-serverless:GetSession`  
セッションのステータスを取得するアクセス許可を付与します。

`emr-serverless:ListSessions`  
アプリケーション上のセッションを一覧表示するアクセス許可を付与します。

`emr-serverless:TerminateSession`  
セッションを終了するアクセス許可を付与します。

`iam:PassRole`  
Spark Connect セッションの作成時に IAM 実行ロールにアクセスするアクセス許可を付与します。Amazon EMR Serverless は、このロールを使用してワークロードを実行します。

`emr-serverless:GetResourceDashboard`  
Spark UI URL を生成するアクセス許可を付与し、セッションのログへのアクセスを提供します。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "EMRServerlessApplicationLevelAccess",
            "Effect": "Allow",
            "Action": [
                "emr-serverless:StartSession",
                "emr-serverless:ListSessions"
            ],
            "Resource": [
                "arn:aws:emr-serverless:{{region}}:{{account-id}}:/applications/{{application-id}}"
            ]
        },
        {
            "Sid": "EMRServerlessSessionLevelAccess",
            "Effect": "Allow",
            "Action": [
                "emr-serverless:GetSession",
                "emr-serverless:GetSessionEndpoint",
                "emr-serverless:TerminateSession",
                "emr-serverless:GetResourceDashboard"
            ],
            "Resource": [
                "arn:aws:emr-serverless:{{region}}:{{account-id}}:/applications/{{application-id}}/sessions/*"
            ]
        },
        {
            "Sid": "EMRServerlessRuntimeRoleAccess",
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [
                "arn:aws:iam::{{account-id}}:role/{{EMRServerlessExecutionRole}}"
            ],
            "Condition": {
                "StringLike": {
                    "iam:PassedToService": "emr-serverless.amazonaws.com"
                }
            }
        }
    ]
}
```

## インタラクティブセッションの使用
<a name="spark-connect-working"></a>

Spark Connect 対応アプリケーションを作成して接続するには、次の手順に従います。

**Spark Connect セッションを開始するには**

1. Spark Connect セッションを使用してアプリケーションを作成します。

   ```
   aws emr-serverless create-application \
     --type "SPARK" \
     --name "spark-connect-app" \
     --release-label emr-7.13.0 \
     --interactive-configuration '{"sessionEnabled": true}'
   ```

1. Amazon EMR Serverless がアプリケーションを作成したら、Spark Connect セッションを受け入れる自動起動を有効にしていない場合は、アプリケーションを起動します。

   ```
   aws emr-serverless start-application \
     --application-id {{APPLICATION_ID}}
   ```

1. 次のコマンドを使用して、アプリケーションのステータスを確認します。ステータスが になったら`STARTED`、セッションを開始します。

   ```
   aws emr-serverless get-application \
     --application-id {{APPLICATION_ID}}
   ```

1. データへのアクセスを許可する IAM 実行ロールを使用してセッションを開始します。

   ```
   aws emr-serverless start-session \
     --application-id {{APPLICATION_ID}} \
     --execution-role-arn arn:aws:iam::{{account-id}}:role/{{EMRServerlessExecutionRole}}
   ```

1. `get-session` API を使用してセッションの状態をモニタリングし、セッションが `STARTED`または `IDLE`状態になるまで待ちます。

   ```
   aws emr-serverless get-session \
     --application-id {{APPLICATION_ID}} \
     --session-id {{SESSION_ID}}
   ```

1. Spark Connect エンドポイントと認証トークンを取得します。によって返されるエンドポイント URL にはポート番号`GetSessionEndpoint`は含まれません。`sc://` 接続 URL を構築するときは、`sc://`{{hostname}}`:443/;use_ssl=true;x-aws-proxy-auth=`{{}}token `:443` などの を追加する必要があります。これがないと、PySpark クライアントはデフォルトでポート 15002 になり、EMR Serverless では到達できません。

   ```
   aws emr-serverless get-session-endpoint \
     --application-id {{APPLICATION_ID}} \
     --session-id {{SESSION_ID}}
   ```

   レスポンスには、エンドポイント URL と認証トークンが含まれます。

   ```
   {
     "endpoint": "{{ENDPOINT_URL}}",
     "authToken": "{{AUTH_TOKEN}}",
     "authTokenExpiresAt": "{{AUTH_TOKEN_EXPIRY_TIME}}"
   }
   ```

1. エンドポイントの準備ができたら、PySpark クライアントから接続します。EMR Serverless アプリケーションの Spark バージョンと AWS SDK for Python に一致する PySpark クライアントをインストールします。

   ```
   # Match the PySpark version to your EMR Serverless release version (3.5.6 for emr-7.13.0)
   pip install pyspark[connect]==3.5.6
   pip install boto3
   ```

以下は、セッションを開始し、セッションエンドポイントに直接リクエストを送信するための Python スクリプトの例です。

```
import boto3
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

client = boto3.client('emr-serverless', region_name='{{REGION}}')

APPLICATION_ID = '{{APPLICATION_ID}}'
EXECUTION_ROLE = 'arn:aws:iam::{{account-id}}:role/{{EMRServerlessExecutionRole}}'

# Start the session
response = client.start_session(
    applicationId=APPLICATION_ID,
    executionRoleArn=EXECUTION_ROLE
)
session_id = response['sessionId']
print(f"Session {session_id} starting...")

# Wait for the session to be ready
while True:
    response = client.get_session(
        applicationId=APPLICATION_ID,
        sessionId=session_id
    )
    state = response['session']['state']
    print(f"Session state: {state}")
    if state in ('STARTED', 'IDLE'):
        break
    if state in ('FAILED', 'TERMINATED'):
        raise Exception(f"Session failed: {response['session'].get('stateDetails', 'Unknown error')}")
    time.sleep(5)

# Retrieve the Spark Connect endpoint and authentication token
response = client.get_session_endpoint(
    applicationId=APPLICATION_ID,
    sessionId=session_id
)

# Construct the authenticated remote URL
auth_token = response['authToken']
endpoint_url = response['endpoint']
connect_url = endpoint_url.replace("https://", "sc://", 1) + ":443/;use_ssl=true;"
connect_url += f"x-aws-proxy-auth={auth_token}"

# Start the Spark session
spark = SparkSession.builder.remote(connect_url).getOrCreate()
print(f"Connected. Spark version: {spark.version}")

# Run SQL
spark.sql("SELECT 1+1 AS result").show()

# Run DataFrame operations
df = spark.range(100).withColumn("squared", col("id") * col("id"))
df.show(10)
print(f"Count: {df.count()}")

# Stop the Spark session (disconnects the client only)
spark.stop()

# Terminate the EMR Serverless session to stop billing.
# spark.stop() only closes the local client connection. The remote session
# continues running and incurring charges until you explicitly terminate it
# or it reaches the idle timeout.
client.terminate_session(
    applicationId=APPLICATION_ID,
    sessionId=session_id
)
print(f"Session {session_id} terminated.")
```

セッションのライブ Spark UI または Spark History Server にアクセスするには、 `GetResourceDashboard` API を使用します。

```
response = client.get_resource_dashboard(
    applicationId=APPLICATION_ID,
    resourceId=session_id,
    resourceType='SESSION'
)
response['url']
```

セッションがアクティブな間、URL はライブ Apache Spark UI を開き、クエリ、ステージ、エグゼキュターをリアルタイムでモニタリングします。セッションが終了すると、Spark History Server は Amazon EMR Serverless コンソールを介してセッション後の分析に引き続き使用できます。

## 考慮事項と制限事項
<a name="spark-connect-considerations"></a>

Spark Connect を使用してインタラクティブワークロードを実行する場合は、次の点を考慮してください。
+ Spark Connect は Amazon EMR Serverless リリース `emr-7.13.0` 以降でサポートされています。
+ Spark Connect は Apache Spark エンジンでのみサポートされています。
+ Spark Connect は、PySpark の DataFrame API と SQL APIs をサポートしています。RDD ベースの APIsはサポートされていません。
+ 認証トークンは 1 時間に制限されています。トークンの有効期限が切れると、gRPC 呼び出しは認証エラーで失敗します。`GetSessionEndpoint` を呼び出して新しいトークンを取得し、更新されたトークン`SparkSession`で新しい を作成します。
+ セッションは、設定可能なアイドルタイムアウト後に終了します。デフォルトのタイムアウトは 1 時間に設定されています。
+ 各セッションにはデフォルトで 24 時間のハード制限があり、タスクをアクティブに実行している場合でも自動的に終了します。
+ 各 EMR Serverless アプリケーションは、デフォルトで最大 25 の同時セッションをサポートします。制限の引き上げをリクエストするには、 AWS サポートにお問い合わせください。
+ デフォルトでは、アプリケーションでは `autoStopConfig`がオンになっています。アプリケーションは、アクティブなセッションやジョブの実行なしで 15 分後に自動的に停止します。この設定は、`create-application` または `update-application` リクエストの一部として変更できます。
+ 最適なスタートアップエクスペリエンスを得るには、ドライバーとエグゼキュター用に事前に初期化された容量を設定します。
+ EMR Serverless セッションを開始する前に、AutoStart を有効にするか、アプリケーションを手動で起動する必要があります。
+ ローカルにインストールされる PySpark バージョンは、Amazon EMR Serverless アプリケーションの Apache Spark バージョン ( の場合は 3.5.6`emr-7.13.0`) と一致する必要があります。バージョンが一致しない場合、`ImportError`または予期しない動作が発生します。
+ Lake Formation によるきめ細かなアクセスコントロールは、Spark Connect セッションではサポートされていません。
+ Trusted Identity Propagation は、Spark Connect のインタラクティブセッションではサポートされていません。
+ EMR Serverless のサーバーレスストレージは、Spark Connect のインタラクティブセッションではサポートされていません。
+ Spark Connect の使用には追加料金はかかりません。料金は、セッション中に消費された EMR Serverless コンピューティングリソース (vCPU、メモリ、ストレージ) に対してのみ発生します。
+ Spark 設定`spark.connect.grpc.binding.address`は EMR Serverless によって予約されており、ユーザーが上書きすることはできません。
+ ローカルにインストールする PySpark パッケージは、EMR Serverless アプリケーションの Spark バージョンと一致する必要があります。バージョンが一致しない場合、接続エラーが発生します。Python UDFs (`@udf`、`spark.udf.register`) では、ワーカーと一致するローカル Python マイナーバージョンも必要です。そうしないと、 で失敗します`PYTHON_VERSION_MISMATCH`。組み込み SQL 関数と DataFrame オペレーションでは、Python バージョンの一致は必要ありません。
+ で Spark 設定を渡すには`start-session`、 `--configuration-overrides`パラメータ`runtimeConfiguration`の の下に設定します。`start-job-run` API は`applicationConfiguration`代わりに を使用します。