为 OIDC 身份验证设置一个 AWS Lambda 授权机构 - AWS HealthImaging

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为 OIDC 身份验证设置一个 AWS Lambda 授权机构

本指南假设您已经将所选的身份提供商 (IdP) 配置为提供与 HealthImaging OIDC 身份验证功能要求兼容的访问令牌。

1. 配置用于 DICOMWeb API 访问的 IAM 角色

在配置 Lambda 授权方之前,请创建 HealthImaging 要在处理 DICOMWeb API 请求时代入的 IAM 角色。授权方 Lambda 函数在成功进行令牌验证后返回其中一个角色 ARN, HealthImaging 允许使用适当的权限执行请求。

  1. 创建定义所需的 DICOMWeb API 权限的 IAM 策略。有关可用权限,请参阅 HealthImaging 文档的 DICOMweb “使用” 部分。

  2. 创建可执行以下操作的 IAM 角色:

    • 附上这些政策

    • 包括允许 AWS HealthImaging 服务委托人 (medical-imaging.amazonaws.com) 担任这些角色的信任关系。

以下是允许关联角色访问 HealthImaging DICOMWeb 只读 API 的策略示例:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "MedicalImagingDicomWebOperations", "Effect": "Allow", "Action": [ "medical-imaging:SearchDICOMInstances", "medical-imaging:GetImageSetMetadata", "medical-imaging:GetDICOMSeriesMetadata", "medical-imaging:SearchDICOMStudies", "medical-imaging:GetDICOMBulkdata", "medical-imaging:SearchDICOMSeries", "medical-imaging:GetDICOMInstanceMetadata", "medical-imaging:GetDICOMInstance", "medical-imaging:GetDICOMInstanceFrames" ], "Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } ] }

以下是应与角色关联的信任关系策略的示例:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "OIDCRoleFederation", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

您将在下一步中创建的 Lambda 授权者可以评估令牌声明并返回相应角色的 ARN。然后,AWS HealthImaging 将模拟此角色以相应的权限执行 DICOMWeb API 请求。

例如:

  • 具有 “管理员” 声明的令牌可能会返回具有完全访问权限的角色的 ARN

  • 具有 “读者” 声明的令牌可能会为具有只读访问权限的角色返回 ARN

  • 具有 “department_A” 声明的令牌可能会返回特定于该部门访问级别的角色的 ARN

此机制允许您通过 IAM 角色将 IdP 的授权模型映射到特定的 AWS HealthImaging 权限。

2. 创建和配置 Lambda 授权器函数

创建一个 Lambda 函数,该函数将验证 JWT 令牌,并根据令牌声明评估返回相应的 IAM 角色 ARN。此函数由运行状况映像服务调用,并传递了一个包含 HealthImaging 数据存储 ID、 DICOMWeb 操作和在 HTTP 请求中找到的访问令牌的事件:

{ "datastoreId": "{datastore id}", "operation": "{Healthimaging API name e.g. GetDICOMInstance}", "bearerToken": "{access token}" }

Lambda 授权方函数必须返回具有以下结构的 JSON 响应:

{ "isTokenValid": {true or false}, "roleArn": "{role arn or empty string meaning to deny the request explicitly}" }

您可以参考实现示例以了解更多信息。

注意

由于只有在 lambda 授权者验证访问令牌后才会回复 DICOMWeb 请求,因此必须尽可能快地执行此函数,以提供最佳 DICOMWeb API 响应时间。

要授权 HealthImaging 服务调用 lambda 授权器函数,它必须具有允许 HealthImaging 服务调用该函数的资源策略。此资源策略可以在 lambda 配置选项卡的权限菜单中创建,也可以使用 AWS CLI:

aws lambda add-permission \ --function-name YourAuthorizerFunctionName \ --statement-id HealthImagingInvoke \ --action lambda:InvokeFunction \ --principal medical-imaging.amazonaws.com

此资源策略允许 HealthImaging 服务在对 API 请求进行身份验证 DICOMWeb 时调用您的 Lambda 授权方。

注意

稍后可以更新 lambda 资源策略,条件与ArnLike特定数据存储的 ARN 相匹配。 HealthImaging

以下是 lambda 资源策略的示例:

{ "Version": "2012-10-17", "Id": "default", "Statement": [ { "Sid": "LambaAuthorizer-HealthImagingInvokePermission", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}", "Condition": { "ArnLike": { "AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } } } ] }

3. 使用 OIDC 身份验证创建新的数据存储

要启用 OIDC 身份验证,必须使用参数为 “ AWS CLI ” 的新数据存储库。lambda-authorizer-arn如果不联系 Support,则无法在现有数据存储上启用 OIDC 身份验证。 AWS

以下是如何创建启用 OIDC 身份验证的新数据存储的示例:

aws medical-imaging create-datastore \ --datastore-name YourDatastoreName \ --lambda-authorizer-arn YourAuthorizerFunctionArn

您可以使用 AWS CLI get-datastore 命令检查特定数据存储是否启用了 OIDC 身份验证功能,并验证属性 “” 是否存在:lambdaAuthorizerArn

aws medical-imaging get-datastore --datastore-id YourDatastoreId
{ "datastoreProperties": { "datastoreId": YourdatastoreId, "datastoreName": YourDatastoreName, "datastoreStatus": "ACTIVE", "lambdaAuthorizerArn": YourAuthorizerFunctionArn, "datastoreArn": YourDatastoreArn, "createdAt": "2025-09-30T14:16:04.015000-05:00", "updatedAt": "2025-09-30T14:16:04.015000-05:00" } }
注意

AWS CLI 数据存储库创建命令的执行角色必须具有相应的权限才能调用 Lambda 授权器函数。这可以缓解权限升级攻击,在这种攻击中,恶意用户可以通过数据存储授权器配置执行未经授权的 Lambda 函数。

异常代码

如果身份验证失败, HealthImaging 则返回以下 HTTP 错误响应代码和正文消息:

条件 AHI 回应
Lambda 授权器不存在或无效 424 授权器配置错误
由于执行失败,授权方终止 424 授权器失败
任何其他未映射的授权方错误 424 授权器失败
授权方返回的响应无效/格式不正确 424 授权器配置错误
Authorizer 跑了 1 秒以上 408 授权器超时
令牌已过期或因其他原因无效 403 令牌无效或已过期
由于授权方配置错误,AHI 无法联合返回的 IAM 角色 424 授权器配置错误
授权方返回了一个空角色 403 访问被拒绝
返回的角色不可调用(假设角色/信任错误配置) 424 授权器配置错误
请求速率超过 DICOMweb 网关限制 429 请求太多
跨区域数据存储、返回角色或授权者 Account/Cross 424 授权者跨区域访问 Account/Cross

实现示例

此 Python 示例演示了一个 lambda 授权器函数,该函数验证 HealthImaging 来自事件的 C AWS ognito 访问令牌并返回具有适当权限的 IAM 角色 ARN。 DICOMWeb

Lambda 授权方实现了两种缓存机制,以减少外部调用和响应延迟。JWKS(JSON Web Key Set)每小时提取一次,并存储在函数的临时文件夹中,允许后续函数调用在本地读取它,而不是从公共网络获取。您还会注意到 token_cache 字典对象是在此 Lambda 函数的全局上下文中实例化的。所有重用相同预热 Lambda 上下文的调用都共享全局变量。因此,成功验证的令牌可以存储在此字典中,并在下次执行相同的 Lambda 函数时快速查找。缓存方法代表了一种通用的方法,可以适合大多数身份提供商发放的访问令牌。有关 AWS Cognito 特定的缓存选项,请参阅 Cognito AWS 文档的管理用户池部分和缓存部分

import json import os import time import logging from jose import jwk, jwt from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError import requests import tempfile # Configure logging logger = logging.getLogger() log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper() logger.setLevel(getattr(logging, log_level, logging.WARNING)) # Global token cache with TTL token_cache = {} # JWKS cache file path JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json') JWKS_CACHE_TTL = 3600 # 1 hour # Load environment variables once USER_POOL_ID = os.environ['USER_POOL_ID'] CLIENT_ID = os.environ['CLIENT_ID'] ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '') def cleanup_expired_tokens(): """Remove expired tokens from cache""" now = int(time.time()) expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']] for token in expired_keys: del token_cache[token] def get_cached_jwks(): """Get JWKS from cache file if valid, otherwise return None """ try: if os.path.exists(JWKS_CACHE_FILE): # Check if cache file is still valid cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE) if cache_age < JWKS_CACHE_TTL: with open(JWKS_CACHE_FILE, 'r') as f: jwks = json.load(f) logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)') return jwks else: logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)') except Exception as e: logger.debug(f'Error reading JWKS cache: {e}') return None def cache_jwks(jwks): """Cache JWKS to file""" try: with open(JWKS_CACHE_FILE, 'w') as f: json.dump(jwks, f) logger.debug('JWKS cached successfully') except Exception as e: logger.debug(f'Error caching JWKS: {e}') def fetch_jwks(jwks_url): """Fetch JWKS from URL and cache it""" logger.debug('Fetching JWKS from URL') jwks = requests.get(jwks_url, timeout=10).json() # Convert to dict for faster lookups jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']} cache_jwks(jwks) return jwks def is_token_cached(token): if token not in token_cache: return None cached = token_cache[token] now = int(time.time()) if now > cached['cache_expiry']: del token_cache[token] return None return cached def cache_token(token, payload): now = int(time.time()) token_exp = payload.get('exp') cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner token_cache[token] = { 'payload': payload, 'cache_expiry': cache_expiry, 'role_arn': ROLE_ARN } def handler(event, context): cleanup_expired_tokens() # start be removing expired tokens from the cache try: # Extract token from bearerToken or authorizationToken field token = event.get('bearerToken') if not token: raise Exception('No token provided') # Check cache first cached = is_token_cached(token) if cached: logger.debug('Token found in cache, skipping verification') return { 'isTokenValid': True, 'roleArn': cached['role_arn'] } # Get Cognito configuration region = context.invoked_function_arn.split(':')[3] # Get JWKS (cached or fresh) jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json' jwks = get_cached_jwks() if not jwks: jwks = fetch_jwks(jwks_url) # Decode token header to get kid headers = jwt.get_unverified_headers(token) kid = headers['kid'] # Find the correct key key = None for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: # Key not found - try refreshing JWKS in case of key rotation logger.debug('Key not found in cached JWKS, fetching fresh JWKS') jwks = fetch_jwks(jwks_url) for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: raise Exception('Public key not found') # Construct the public key public_key = jwk.construct(key) # Verify and decode the token (includes expiry validation) payload = jwt.decode( token, public_key, algorithms=['RS256'], audience=CLIENT_ID, issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}' ) logger.debug('Token validated successfully') logger.debug('User: %s', payload.get('username', 'unknown')) # Cache the validated token cache_token(token, payload) # Return authorization response return { 'isTokenValid': True, 'roleArn': ROLE_ARN } except ExpiredSignatureError: logger.debug('Token expired') return { 'isTokenValid': False, 'roleArn': '' } except JWTClaimsError: logger.debug('Invalid token claims') return { 'isTokenValid': False, 'roleArn': '' } except JWTError as e: logger.debug('JWT validation error: %s', e) return { 'isTokenValid': False, 'roleArn': '' } except Exception as e: logger.debug('Authorization failed: %s', e) return { 'isTokenValid': False, 'roleArn': '' }