設定 OIDC 身分驗證的 AWS Lambda 授權方 - AWS HealthImaging

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

設定 OIDC 身分驗證的 AWS Lambda 授權方

本指南假設您已設定您選擇的身分提供者 (IdP),以提供與 HealthImaging OIDC 身分驗證功能要求相容的存取權杖。

1. 設定 DICOMWeb API 存取的 IAM 角色

在設定 Lambda 授權方之前,請為 HealthImaging 建立 IAM 角色,以在處理 DICOMWeb API 請求時擔任。授權方 Lambda 函數會在權杖驗證成功後傳回其中一個角色 ARN,允許 HealthImaging 以適當的許可執行請求。

  1. 建立定義所需 DICOMWeb API 權限的 IAM 政策。如需可用的許可,請參閱 HealthImaging 文件的「使用 DICOMweb」一節。

  2. 建立符合下列條件的 IAM 角色:

    • 連接這些政策

    • 包含信任關係,允許 AWS HealthImaging 服務主體 (medical-imaging.amazonaws.com) 擔任這些角色。

以下是允許相關角色存取 HealthImaging DICOMWeb 唯讀 API 的政策範例:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "MedicalImagingDicomWebOperations", "Effect": "Allow", "Action": [ "medical-imaging:SearchDICOMInstances", "medical-imaging:GetImageSetMetadata", "medical-imaging:GetDICOMSeriesMetadata", "medical-imaging:SearchDICOMStudies", "medical-imaging:GetDICOMBulkdata", "medical-imaging:SearchDICOMSeries", "medical-imaging:GetDICOMInstanceMetadata", "medical-imaging:GetDICOMInstance", "medical-imaging:GetDICOMInstanceFrames" ], "Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } ] }

以下是應該與角色建立關聯的信任關係政策範例 (以下):

{ "Version": "2012-10-17", "Statement": [ { "Sid": "OIDCRoleFederation", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

您將在下一個步驟中建立的 Lambda 授權方可以評估字符宣告並傳回適當角色的 ARN。然後,AWS HealthImaging 會模擬此角色,以執行具有對應許可的 DICOMWeb API 請求。

例如:

  • 具有「admin」宣告的權杖可能會傳回具有完整存取權之角色的 ARN

  • 具有「讀取器」宣告的字符可能會傳回具有唯讀存取權之角色的 ARN

  • 具有「department_A」宣告的字符可能會傳回該部門存取層級特定角色的 ARN

此機制可讓您將 IdP 的授權模型映射至透過 IAM 角色的特定 AWS HealthImaging 許可。

2. 建立和設定 Lambda 授權方函數

建立 Lambda 函數來驗證 JWT 字符,並根據字符宣告評估傳回適當的 IAM 角色 ARN。運作狀態影像服務會叫用此函數,並傳遞事件,其中包含在 HTTP 請求中找到的 HealthImaging 資料存放區 ID、DICOMWeb 操作和存取字符:

{ "datastoreId": "{datastore id}", "operation": "{Healthimaging API name e.g. GetDICOMInstance}", "bearerToken": "{access token}" }

Lambda 授權方函數必須傳回具有下列結構的 JSON 回應:

{ "isTokenValid": {true or false}, "roleArn": "{role arn or empty string meaning to deny the request explicitly}" }

如需詳細資訊,請參閱實作範例。

注意

由於 DICOMWeb 請求只有在 Lambda 授權方驗證存取權杖後才會得到回應,因此請務必盡快執行此函數,以提供最佳的 DICOMWeb API 回應時間。

若要讓 HealthImaging 服務有權叫用 lambda 授權方函數,它必須具有允許 HealthImaging 服務叫用它的資源政策。此資源政策可以在 Lambda 組態索引標籤的許可選單中建立,或使用 AWS CLI:

aws lambda add-permission \ --function-name YourAuthorizerFunctionName \ --statement-id HealthImagingInvoke \ --action lambda:InvokeFunction \ --principal medical-imaging.amazonaws.com

此資源政策允許 HealthImaging 服務在驗證 DICOMWeb API 請求時調用您的 Lambda 授權方。

注意

稍後可以使用符合特定 HealthImaging 資料存放區的 ARN 的「ArnLike」條件來更新 lambda 資源政策。

以下是 lambda 資源政策的範例:

{ "Version": "2012-10-17", "Id": "default", "Statement": [ { "Sid": "LambaAuthorizer-HealthImagingInvokePermission", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}", "Condition": { "ArnLike": { "AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } } } ] }

3. 使用 OIDC 身分驗證建立新的資料存放區

若要啟用 OIDC 身分驗證,您必須使用 AWS CLI 參數為 "lambda-authorizer-arn" 的 建立新的資料存放區。在未聯絡 AWS Support 的情況下,無法在現有的資料存放區上啟用 OIDC 身分驗證。

以下是如何在啟用 OIDC 身分驗證的情況下建立新的資料存放區的範例:

aws medical-imaging create-datastore \ --datastore-name YourDatastoreName \ --lambda-authorizer-arn YourAuthorizerFunctionArn

您可以使用 get-datastore 命令來檢查特定資料存放區是否已啟用 AWS CLI OIDC 身分驗證功能,並驗證屬性 "lambdaAuthorizerArn" 是否存在:

aws medical-imaging get-datastore --datastore-id YourDatastoreId
{ "datastoreProperties": { "datastoreId": YourdatastoreId, "datastoreName": YourDatastoreName, "datastoreStatus": "ACTIVE", "lambdaAuthorizerArn": YourAuthorizerFunctionArn, "datastoreArn": YourDatastoreArn, "createdAt": "2025-09-30T14:16:04.015000-05:00", "updatedAt": "2025-09-30T14:16:04.015000-05:00" } }
注意

AWS CLI 資料存放區建立命令的執行角色必須具有適當的許可,才能叫用 Lambda 授權方函數。這可減少惡意使用者可以透過資料存放區授權方組態執行未經授權 Lambda 函數的權限提升攻擊。

例外代碼

如果身分驗證失敗,HealthImaging 會傳回下列 HTTP 錯誤回應代碼和內文訊息:

Condition AHI 回應
Lambda 授權方不存在或無效 424 授權方設定錯誤
授權方因執行失敗而終止 424 授權方失敗
任何其他未映射的授權方錯誤 424 授權方失敗
授權方傳回無效/格式錯誤的回應 424 授權方設定錯誤
授權方執行超過 1 秒 408 授權方逾時
字符已過期或無效 403 權杖無效或過期
由於授權方設定錯誤,AHI 無法聯合傳回的 IAM 角色 424 授權方設定錯誤
授權方傳回空的角色 403 存取遭拒
傳回的角色不可呼叫 (assume-role/trust misconfig) 424 授權方設定錯誤
請求率超過 DICOMweb Gateway 限制 429 太多請求
資料存放區、傳回角色或授權方跨帳戶/跨區域 424 授權方跨帳戶/跨區域存取

實作範例

此 Python 範例示範 lambda 授權方函數,可驗證來自 HealthImaging 事件的 AWS Cognito 存取字符,並傳回具有適當 DICOMWeb 權限的 IAM 角色 ARN。

Lambda 授權方實作兩種快取機制,以減少外部呼叫和回應延遲。JWKS (JSON Web 金鑰集) 會每小時擷取一次,並存放在函數的暫時資料夾中,允許後續函數叫用在本機讀取,而不是從公有網路擷取。您也會注意到 Token_cache 字典物件在此 Lambda 函數的全域內容中執行個體化。全域變數由重複使用相同暖 Lambda 內容的所有調用共用。有鑑於此,已成功驗證的權杖可以存放在此字典中,並在下次執行這個相同的 Lambda 函數時快速查詢。快取方法代表一種通才方法,可以適應大多數身分提供者發出的存取權杖。如需 AWS Cognito 特定快取選項,請參閱 AWS Cognito 文件管理使用者集區區段和快取區段

import json import os import time import logging from jose import jwk, jwt from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError import requests import tempfile # Configure logging logger = logging.getLogger() log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper() logger.setLevel(getattr(logging, log_level, logging.WARNING)) # Global token cache with TTL token_cache = {} # JWKS cache file path JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json') JWKS_CACHE_TTL = 3600 # 1 hour # Load environment variables once USER_POOL_ID = os.environ['USER_POOL_ID'] CLIENT_ID = os.environ['CLIENT_ID'] ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '') def cleanup_expired_tokens(): """Remove expired tokens from cache""" now = int(time.time()) expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']] for token in expired_keys: del token_cache[token] def get_cached_jwks(): """Get JWKS from cache file if valid, otherwise return None """ try: if os.path.exists(JWKS_CACHE_FILE): # Check if cache file is still valid cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE) if cache_age < JWKS_CACHE_TTL: with open(JWKS_CACHE_FILE, 'r') as f: jwks = json.load(f) logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)') return jwks else: logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)') except Exception as e: logger.debug(f'Error reading JWKS cache: {e}') return None def cache_jwks(jwks): """Cache JWKS to file""" try: with open(JWKS_CACHE_FILE, 'w') as f: json.dump(jwks, f) logger.debug('JWKS cached successfully') except Exception as e: logger.debug(f'Error caching JWKS: {e}') def fetch_jwks(jwks_url): """Fetch JWKS from URL and cache it""" logger.debug('Fetching JWKS from URL') jwks = requests.get(jwks_url, timeout=10).json() # Convert to dict for faster lookups jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']} cache_jwks(jwks) return jwks def is_token_cached(token): if token not in token_cache: return None cached = token_cache[token] now = int(time.time()) if now > cached['cache_expiry']: del token_cache[token] return None return cached def cache_token(token, payload): now = int(time.time()) token_exp = payload.get('exp') cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner token_cache[token] = { 'payload': payload, 'cache_expiry': cache_expiry, 'role_arn': ROLE_ARN } def handler(event, context): cleanup_expired_tokens() # start be removing expired tokens from the cache try: # Extract token from bearerToken or authorizationToken field token = event.get('bearerToken') if not token: raise Exception('No token provided') # Check cache first cached = is_token_cached(token) if cached: logger.debug('Token found in cache, skipping verification') return { 'isTokenValid': True, 'roleArn': cached['role_arn'] } # Get Cognito configuration region = context.invoked_function_arn.split(':')[3] # Get JWKS (cached or fresh) jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json' jwks = get_cached_jwks() if not jwks: jwks = fetch_jwks(jwks_url) # Decode token header to get kid headers = jwt.get_unverified_headers(token) kid = headers['kid'] # Find the correct key key = None for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: # Key not found - try refreshing JWKS in case of key rotation logger.debug('Key not found in cached JWKS, fetching fresh JWKS') jwks = fetch_jwks(jwks_url) for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: raise Exception('Public key not found') # Construct the public key public_key = jwk.construct(key) # Verify and decode the token (includes expiry validation) payload = jwt.decode( token, public_key, algorithms=['RS256'], audience=CLIENT_ID, issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}' ) logger.debug('Token validated successfully') logger.debug('User: %s', payload.get('username', 'unknown')) # Cache the validated token cache_token(token, payload) # Return authorization response return { 'isTokenValid': True, 'roleArn': ROLE_ARN } except ExpiredSignatureError: logger.debug('Token expired') return { 'isTokenValid': False, 'roleArn': '' } except JWTClaimsError: logger.debug('Invalid token claims') return { 'isTokenValid': False, 'roleArn': '' } except JWTError as e: logger.debug('JWT validation error: %s', e) return { 'isTokenValid': False, 'roleArn': '' } except Exception as e: logger.debug('Authorization failed: %s', e) return { 'isTokenValid': False, 'roleArn': '' }