OIDC 認証用の AWS Lambda オーソライザーを設定する - AWS HealthImaging

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

OIDC 認証用の AWS Lambda オーソライザーを設定する

このガイドでは、選択した ID プロバイダー (IdP) が HealthImaging OIDC 認証機能の要件と互換性のあるアクセストークンを提供するように設定されていることを前提としています。

1. DICOMWeb API アクセス用の IAM ロールを設定する

Lambda オーソライザーを設定する前に、DICOMWeb API リクエストを処理するときに HealthImaging が引き受ける IAM ロールを作成します。オーソライザー Lambda 関数は、トークンの検証に成功した後、これらのロール ARN のいずれかを返します。これにより、HealthImaging は適切なアクセス許可でリクエストを実行できます。

  1. 目的の DICOMWeb API 権限を定義する IAM ポリシーを作成します。利用可能なアクセス許可については、HealthImaging ドキュメントのDICOMweb の使用」セクションを参照してください。

  2. 以下の IAM ロールを作成します。

    • これらのポリシーをアタッチする

    • AWS HealthImaging サービスプリンシパル (medical-imaging.amazonaws.com) がこれらのロールを引き受けることを許可する信頼関係を含めます。

以下は、関連付けられたロールが HealthImaging DICOMWeb 読み取り専用 API にアクセスすることを許可するポリシーの例です。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "MedicalImagingDicomWebOperations", "Effect": "Allow", "Action": [ "medical-imaging:SearchDICOMInstances", "medical-imaging:GetImageSetMetadata", "medical-imaging:GetDICOMSeriesMetadata", "medical-imaging:SearchDICOMStudies", "medical-imaging:GetDICOMBulkdata", "medical-imaging:SearchDICOMSeries", "medical-imaging:GetDICOMInstanceMetadata", "medical-imaging:GetDICOMInstance", "medical-imaging:GetDICOMInstanceFrames" ], "Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } ] }

ロール (複数可) に関連付ける必要がある信頼関係ポリシーの例を次に示します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "OIDCRoleFederation", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

次のステップで作成する Lambda オーソライザーは、トークンクレームを評価し、適切なロールの ARN を返すことができます。その後、AWS HealthImaging はこのロールを偽装して、対応するアクセス許可で DICOMWeb API リクエストを実行します。

例:

  • 「管理者」クレームを持つトークンは、フルアクセスを持つロールの ARN を返す場合があります

  • 「リーダー」クレームを持つトークンは、読み取り専用アクセスを持つロールの ARN を返す場合があります

  • 「department_A」クレームを持つトークンは、その部門のアクセスレベルに固有のロールの ARN を返す場合があります

このメカニズムにより、IAM ロールを通じて IdP の認可モデルを特定の AWS HealthImaging アクセス許可にマッピングできます。

2. Lambda オーソライザー関数の作成と設定

JWT トークンを検証し、トークンクレーム評価に基づいて適切な IAM ロール ARN を返す Lambda 関数を作成します。この関数はヘルスイメージングサービスによって呼び出され、HealthImaging データストア ID、DICOMWeb オペレーション、HTTP リクエストで見つかったアクセストークンを含むイベントを渡します。

{ "datastoreId": "{datastore id}", "operation": "{Healthimaging API name e.g. GetDICOMInstance}", "bearerToken": "{access token}" }

Lambda オーソライザー関数は、次の構造を持つ JSON レスポンスを返す必要があります。

{ "isTokenValid": {true or false}, "roleArn": "{role arn or empty string meaning to deny the request explicitly}" }

詳細については、実装例を参照してください。

注記

DICOMWeb リクエストは、アクセストークンが Lambda オーソライザーによって検証された後にのみ応答されるため、この関数の実行を可能な限り速くして、最適な DICOMWeb API 応答時間を提供することが重要です。

HealthImaging サービスが Lambda オーソライザー関数を呼び出す権限を付与されるようにするには、HealthImaging サービスが関数を呼び出すことを許可するリソースポリシーが必要です。このリソースポリシーは、Lambda 設定タブのアクセス許可メニューまたは以下を使用して作成できます AWS CLI。

aws lambda add-permission \ --function-name YourAuthorizerFunctionName \ --statement-id HealthImagingInvoke \ --action lambda:InvokeFunction \ --principal medical-imaging.amazonaws.com

このリソースポリシーにより、HealthImaging サービスは DICOMWeb API リクエストを認証するときに Lambda オーソライザーを呼び出すことができます。

注記

Lambda リソースポリシーは、特定の HealthImaging データストアの ARN に一致するArnLike」条件で後で更新できます。

Lambda リソースポリシーの例を次に示します。

{ "Version": "2012-10-17", "Id": "default", "Statement": [ { "Sid": "LambaAuthorizer-HealthImagingInvokePermission", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}", "Condition": { "ArnLike": { "AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } } } ] }

3. OIDC 認証を使用して新しいデータストアを作成する

OIDC 認証を有効にするには、lambda-authorizer-arn」パラメータ AWS CLI を持つ を使用して新しいデータストアを作成する必要があります。サポートに連絡して AWS いなければ、既存のデータストアで OIDC 認証を有効にすることはできません。

OIDC 認証を有効にして新しいデータストアを作成する方法の例を次に示します。

aws medical-imaging create-datastore \ --datastore-name YourDatastoreName \ --lambda-authorizer-arn YourAuthorizerFunctionArn

get-datastore コマンドを使用して AWS CLI 、特定のデータストアで OIDC 認証機能が有効になっているかどうか、および属性lambdaAuthorizerArn」が存在するかどうかを確認できます。

aws medical-imaging get-datastore --datastore-id YourDatastoreId
{ "datastoreProperties": { "datastoreId": YourdatastoreId, "datastoreName": YourDatastoreName, "datastoreStatus": "ACTIVE", "lambdaAuthorizerArn": YourAuthorizerFunctionArn, "datastoreArn": YourDatastoreArn, "createdAt": "2025-09-30T14:16:04.015000-05:00", "updatedAt": "2025-09-30T14:16:04.015000-05:00" } }
注記

AWS CLI データストア作成コマンドの実行ロールには、Lambda オーソライザー関数を呼び出すための適切なアクセス許可が必要です。これにより、悪意のあるユーザーがデータストアオーソライザー設定を通じて不正な Lambda 関数を実行できる特権エスカレーション攻撃が軽減されます。

例外コード

認証に失敗した場合、HealthImaging は次の HTTP エラーレスポンスコードと本文メッセージを返します。

条件 AHI レスポンス
Lambda オーソライザーが存在しないか、無効です 424 オーソライザーの設定ミス
実行の失敗によりオーソライザーが終了しました 424 オーソライザーが失敗しました
マッピングされていない他のオーソライザーエラー 424 オーソライザーが失敗しました
オーソライザーが無効な/不正な形式のレスポンスを返しました 424 オーソライザーの設定ミス
オーソライザーが 1 秒以上実行されました 408 オーソライザータイムアウト
トークンの有効期限が切れているか、無効です 403 無効または期限切れのトークン
オーソライザーの設定ミスにより、AHI は返された IAM ロールをフェデレーションできません 424 オーソライザーの設定ミス
オーソライザーが空のロールを返しました 403 アクセスが拒否されました
返されたロールは呼び出せません (assume-role/trust misconfig) 424 オーソライザーの設定ミス
リクエストレートが DICOMweb Gateway の制限を超えています 429 リクエストが多すぎる
データストア、リターンロール、またはオーソライザーのクロスアカウント/クロスリージョン 424 オーソライザークロスアカウント/クロスリージョンアクセス

実装例

この Python の例は、HealthImaging イベントからの AWS Cognito アクセストークンを検証し、適切な DICOMWeb 権限を持つ IAM ロール ARN を返す Lambda オーソライザー関数を示しています。

Lambda オーソライザーは、2 つのキャッシュメカニズムを実装して、外部呼び出しと応答のレイテンシーを削減します。JWKS (JSON ウェブキーセット) は 1 時間に 1 回フェッチされ、関数の一時フォルダに保存されるため、後続の関数呼び出しはパブリックネットワークからフェッチする代わりにローカルで読み取ることができます。token_cache ディクショナリオブジェクトがこの Lambda 関数のグローバルコンテキストでインスタンス化されていることもわかります。グローバル変数は、同じウォームされた Lambda コンテキストを再利用するすべての呼び出しによって共有されます。これにより、正常に検証されたトークンをこのディクショナリに保存し、この同じ Lambda 関数の次の実行中にすばやく検索できます。キャッシュ方法は、ほとんどの ID プロバイダーから発行されたアクセストークンに適合する可能性がある一般論的なアプローチを表します。 AWS Cognito 固有のキャッシュオプションについては、AWS Cognito ドキュメント「ユーザープールの管理」セクションと「キャッシュ」セクションを参照してください。

import json import os import time import logging from jose import jwk, jwt from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError import requests import tempfile # Configure logging logger = logging.getLogger() log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper() logger.setLevel(getattr(logging, log_level, logging.WARNING)) # Global token cache with TTL token_cache = {} # JWKS cache file path JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json') JWKS_CACHE_TTL = 3600 # 1 hour # Load environment variables once USER_POOL_ID = os.environ['USER_POOL_ID'] CLIENT_ID = os.environ['CLIENT_ID'] ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '') def cleanup_expired_tokens(): """Remove expired tokens from cache""" now = int(time.time()) expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']] for token in expired_keys: del token_cache[token] def get_cached_jwks(): """Get JWKS from cache file if valid, otherwise return None """ try: if os.path.exists(JWKS_CACHE_FILE): # Check if cache file is still valid cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE) if cache_age < JWKS_CACHE_TTL: with open(JWKS_CACHE_FILE, 'r') as f: jwks = json.load(f) logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)') return jwks else: logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)') except Exception as e: logger.debug(f'Error reading JWKS cache: {e}') return None def cache_jwks(jwks): """Cache JWKS to file""" try: with open(JWKS_CACHE_FILE, 'w') as f: json.dump(jwks, f) logger.debug('JWKS cached successfully') except Exception as e: logger.debug(f'Error caching JWKS: {e}') def fetch_jwks(jwks_url): """Fetch JWKS from URL and cache it""" logger.debug('Fetching JWKS from URL') jwks = requests.get(jwks_url, timeout=10).json() # Convert to dict for faster lookups jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']} cache_jwks(jwks) return jwks def is_token_cached(token): if token not in token_cache: return None cached = token_cache[token] now = int(time.time()) if now > cached['cache_expiry']: del token_cache[token] return None return cached def cache_token(token, payload): now = int(time.time()) token_exp = payload.get('exp') cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner token_cache[token] = { 'payload': payload, 'cache_expiry': cache_expiry, 'role_arn': ROLE_ARN } def handler(event, context): cleanup_expired_tokens() # start be removing expired tokens from the cache try: # Extract token from bearerToken or authorizationToken field token = event.get('bearerToken') if not token: raise Exception('No token provided') # Check cache first cached = is_token_cached(token) if cached: logger.debug('Token found in cache, skipping verification') return { 'isTokenValid': True, 'roleArn': cached['role_arn'] } # Get Cognito configuration region = context.invoked_function_arn.split(':')[3] # Get JWKS (cached or fresh) jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json' jwks = get_cached_jwks() if not jwks: jwks = fetch_jwks(jwks_url) # Decode token header to get kid headers = jwt.get_unverified_headers(token) kid = headers['kid'] # Find the correct key key = None for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: # Key not found - try refreshing JWKS in case of key rotation logger.debug('Key not found in cached JWKS, fetching fresh JWKS') jwks = fetch_jwks(jwks_url) for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: raise Exception('Public key not found') # Construct the public key public_key = jwk.construct(key) # Verify and decode the token (includes expiry validation) payload = jwt.decode( token, public_key, algorithms=['RS256'], audience=CLIENT_ID, issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}' ) logger.debug('Token validated successfully') logger.debug('User: %s', payload.get('username', 'unknown')) # Cache the validated token cache_token(token, payload) # Return authorization response return { 'isTokenValid': True, 'roleArn': ROLE_ARN } except ExpiredSignatureError: logger.debug('Token expired') return { 'isTokenValid': False, 'roleArn': '' } except JWTClaimsError: logger.debug('Invalid token claims') return { 'isTokenValid': False, 'roleArn': '' } except JWTError as e: logger.debug('JWT validation error: %s', e) return { 'isTokenValid': False, 'roleArn': '' } except Exception as e: logger.debug('Authorization failed: %s', e) return { 'isTokenValid': False, 'roleArn': '' }