本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为 OIDC 身份验证设置一个 AWS Lambda 授权机构
本指南假设您已经将所选的身份提供商 (IdP) 配置为提供与 HealthImaging OIDC 身份验证功能要求兼容的访问令牌。
1. 配置用于 DICOMWeb API 访问的 IAM 角色
在配置 Lambda 授权方之前,请创建 HealthImaging 要在处理 DICOMWeb API 请求时代入的 IAM 角色。授权方 Lambda 函数在成功进行令牌验证后返回其中一个角色 ARN, HealthImaging 允许使用适当的权限执行请求。
-
创建定义所需的 DICOMWeb API 权限的 IAM 策略。有关可用权限,请参阅 HealthImaging 文档的 DICOMweb “使用” 部分。
-
创建可执行以下操作的 IAM 角色:
-
附上这些政策
-
包括允许 AWS HealthImaging 服务委托人 (
medical-imaging.amazonaws.com) 担任这些角色的信任关系。
-
以下是允许关联角色访问 HealthImaging DICOMWeb 只读 API 的策略示例:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "MedicalImagingDicomWebOperations",
"Effect": "Allow",
"Action": [
"medical-imaging:SearchDICOMInstances",
"medical-imaging:GetImageSetMetadata",
"medical-imaging:GetDICOMSeriesMetadata",
"medical-imaging:SearchDICOMStudies",
"medical-imaging:GetDICOMBulkdata",
"medical-imaging:SearchDICOMSeries",
"medical-imaging:GetDICOMInstanceMetadata",
"medical-imaging:GetDICOMInstance",
"medical-imaging:GetDICOMInstanceFrames"
],
"Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}"
}
]
}
以下是应与角色关联的信任关系策略的示例:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "OIDCRoleFederation",
"Effect": "Allow",
"Principal": {
"Service": "medical-imaging.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
您将在下一步中创建的 Lambda 授权者可以评估令牌声明并返回相应角色的 ARN。然后,AWS HealthImaging 将模拟此角色以相应的权限执行 DICOMWeb API 请求。
例如:
-
具有 “管理员” 声明的令牌可能会返回具有完全访问权限的角色的 ARN
-
具有 “读者” 声明的令牌可能会为具有只读访问权限的角色返回 ARN
-
具有 “department_A” 声明的令牌可能会返回特定于该部门访问级别的角色的 ARN
此机制允许您通过 IAM 角色将 IdP 的授权模型映射到特定的 AWS HealthImaging 权限。
2. 创建和配置 Lambda 授权器函数
创建一个 Lambda 函数,该函数将验证 JWT 令牌,并根据令牌声明评估返回相应的 IAM 角色 ARN。此函数由运行状况映像服务调用,并传递了一个包含 HealthImaging 数据存储 ID、 DICOMWeb 操作和在 HTTP 请求中找到的访问令牌的事件:
{
"datastoreId": "{datastore id}",
"operation": "{Healthimaging API name e.g. GetDICOMInstance}",
"bearerToken": "{access token}"
}
Lambda 授权方函数必须返回具有以下结构的 JSON 响应:
{
"isTokenValid": {true or false},
"roleArn": "{role arn or empty string meaning to deny the request explicitly}"
}
您可以参考实现示例以了解更多信息。
注意
由于只有在 lambda 授权者验证访问令牌后才会回复 DICOMWeb 请求,因此必须尽可能快地执行此函数,以提供最佳 DICOMWeb API 响应时间。
要授权 HealthImaging 服务调用 lambda 授权器函数,它必须具有允许 HealthImaging 服务调用该函数的资源策略。此资源策略可以在 lambda 配置选项卡的权限菜单中创建,也可以使用 AWS CLI:
aws lambda add-permission \
--function-name YourAuthorizerFunctionName \
--statement-id HealthImagingInvoke \
--action lambda:InvokeFunction \
--principal medical-imaging.amazonaws.com
此资源策略允许 HealthImaging 服务在对 API 请求进行身份验证 DICOMWeb 时调用您的 Lambda 授权方。
注意
稍后可以更新 lambda 资源策略,条件与ArnLike特定数据存储的 ARN 相匹配。 HealthImaging
以下是 lambda 资源策略的示例:
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Sid": "LambaAuthorizer-HealthImagingInvokePermission",
"Effect": "Allow",
"Principal": {
"Service": "medical-imaging.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}",
"Condition": {
"ArnLike": {
"AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}"
}
}
}
]
}
3. 使用 OIDC 身份验证创建新的数据存储
要启用 OIDC 身份验证,必须使用参数为 “ AWS CLI ” 的新数据存储库。lambda-authorizer-arn如果不联系 Support,则无法在现有数据存储上启用 OIDC 身份验证。 AWS
以下是如何创建启用 OIDC 身份验证的新数据存储的示例:
aws medical-imaging create-datastore \
--datastore-name YourDatastoreName \
--lambda-authorizer-arn YourAuthorizerFunctionArn
您可以使用 AWS CLI get-datastore 命令检查特定数据存储是否启用了 OIDC 身份验证功能,并验证属性 “” 是否存在:lambdaAuthorizerArn
aws medical-imaging get-datastore --datastore-id YourDatastoreId
{
"datastoreProperties": {
"datastoreId": YourdatastoreId,
"datastoreName": YourDatastoreName,
"datastoreStatus": "ACTIVE",
"lambdaAuthorizerArn": YourAuthorizerFunctionArn,
"datastoreArn": YourDatastoreArn,
"createdAt": "2025-09-30T14:16:04.015000-05:00",
"updatedAt": "2025-09-30T14:16:04.015000-05:00"
}
}
注意
AWS CLI 数据存储库创建命令的执行角色必须具有相应的权限才能调用 Lambda 授权器函数。这可以缓解权限升级攻击,在这种攻击中,恶意用户可以通过数据存储授权器配置执行未经授权的 Lambda 函数。
异常代码
如果身份验证失败, HealthImaging 则返回以下 HTTP 错误响应代码和正文消息:
| 条件 | AHI 回应 |
|---|---|
| Lambda 授权器不存在或无效 | 424 授权器配置错误 |
| 由于执行失败,授权方终止 | 424 授权器失败 |
| 任何其他未映射的授权方错误 | 424 授权器失败 |
| 授权方返回的响应无效/格式不正确 | 424 授权器配置错误 |
| Authorizer 跑了 1 秒以上 | 408 授权器超时 |
| 令牌已过期或因其他原因无效 | 403 令牌无效或已过期 |
| 由于授权方配置错误,AHI 无法联合返回的 IAM 角色 | 424 授权器配置错误 |
| 授权方返回了一个空角色 | 403 访问被拒绝 |
| 返回的角色不可调用(假设角色/信任错误配置) | 424 授权器配置错误 |
| 请求速率超过 DICOMweb 网关限制 | 429 请求太多 |
| 跨区域数据存储、返回角色或授权者 Account/Cross | 424 授权者跨区域访问 Account/Cross |
实现示例
此 Python 示例演示了一个 lambda 授权器函数,该函数验证 HealthImaging 来自事件的 C AWS ognito 访问令牌并返回具有适当权限的 IAM 角色 ARN。 DICOMWeb
Lambda 授权方实现了两种缓存机制,以减少外部调用和响应延迟。JWKS(JSON Web Key Set)每小时提取一次,并存储在函数的临时文件夹中,允许后续函数调用在本地读取它,而不是从公共网络获取。您还会注意到 token_cache 字典对象是在此 Lambda 函数的全局上下文中实例化的。所有重用相同预热 Lambda 上下文的调用都共享全局变量。因此,成功验证的令牌可以存储在此字典中,并在下次执行相同的 Lambda 函数时快速查找。缓存方法代表了一种通用的方法,可以适合大多数身份提供商发放的访问令牌。有关 AWS Cognito 特定的缓存选项,请参阅 Cognito AWS 文档的管理用户池部分和缓存部分。
import json
import os
import time
import logging
from jose import jwk, jwt
from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError
import requests
import tempfile
# Configure logging
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper()
logger.setLevel(getattr(logging, log_level, logging.WARNING))
# Global token cache with TTL
token_cache = {}
# JWKS cache file path
JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json')
JWKS_CACHE_TTL = 3600 # 1 hour
# Load environment variables once
USER_POOL_ID = os.environ['USER_POOL_ID']
CLIENT_ID = os.environ['CLIENT_ID']
ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '')
def cleanup_expired_tokens():
"""Remove expired tokens from cache"""
now = int(time.time())
expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']]
for token in expired_keys:
del token_cache[token]
def get_cached_jwks():
"""Get JWKS from cache file if valid, otherwise return None """
try:
if os.path.exists(JWKS_CACHE_FILE):
# Check if cache file is still valid
cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE)
if cache_age < JWKS_CACHE_TTL:
with open(JWKS_CACHE_FILE, 'r') as f:
jwks = json.load(f)
logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)')
return jwks
else:
logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)')
except Exception as e:
logger.debug(f'Error reading JWKS cache: {e}')
return None
def cache_jwks(jwks):
"""Cache JWKS to file"""
try:
with open(JWKS_CACHE_FILE, 'w') as f:
json.dump(jwks, f)
logger.debug('JWKS cached successfully')
except Exception as e:
logger.debug(f'Error caching JWKS: {e}')
def fetch_jwks(jwks_url):
"""Fetch JWKS from URL and cache it"""
logger.debug('Fetching JWKS from URL')
jwks = requests.get(jwks_url, timeout=10).json()
# Convert to dict for faster lookups
jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']}
cache_jwks(jwks)
return jwks
def is_token_cached(token):
if token not in token_cache:
return None
cached = token_cache[token]
now = int(time.time())
if now > cached['cache_expiry']:
del token_cache[token]
return None
return cached
def cache_token(token, payload):
now = int(time.time())
token_exp = payload.get('exp')
cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner
token_cache[token] = {
'payload': payload,
'cache_expiry': cache_expiry,
'role_arn': ROLE_ARN
}
def handler(event, context):
cleanup_expired_tokens() # start be removing expired tokens from the cache
try:
# Extract token from bearerToken or authorizationToken field
token = event.get('bearerToken')
if not token:
raise Exception('No token provided')
# Check cache first
cached = is_token_cached(token)
if cached:
logger.debug('Token found in cache, skipping verification')
return {
'isTokenValid': True,
'roleArn': cached['role_arn']
}
# Get Cognito configuration
region = context.invoked_function_arn.split(':')[3]
# Get JWKS (cached or fresh)
jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json'
jwks = get_cached_jwks()
if not jwks:
jwks = fetch_jwks(jwks_url)
# Decode token header to get kid
headers = jwt.get_unverified_headers(token)
kid = headers['kid']
# Find the correct key
key = None
for jwk_key in jwks['keys']:
if jwk_key['kid'] == kid:
key = jwk_key
break
if not key:
# Key not found - try refreshing JWKS in case of key rotation
logger.debug('Key not found in cached JWKS, fetching fresh JWKS')
jwks = fetch_jwks(jwks_url)
for jwk_key in jwks['keys']:
if jwk_key['kid'] == kid:
key = jwk_key
break
if not key:
raise Exception('Public key not found')
# Construct the public key
public_key = jwk.construct(key)
# Verify and decode the token (includes expiry validation)
payload = jwt.decode(
token,
public_key,
algorithms=['RS256'],
audience=CLIENT_ID,
issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}'
)
logger.debug('Token validated successfully')
logger.debug('User: %s', payload.get('username', 'unknown'))
# Cache the validated token
cache_token(token, payload)
# Return authorization response
return {
'isTokenValid': True,
'roleArn': ROLE_ARN
}
except ExpiredSignatureError:
logger.debug('Token expired')
return {
'isTokenValid': False,
'roleArn': ''
}
except JWTClaimsError:
logger.debug('Invalid token claims')
return {
'isTokenValid': False,
'roleArn': ''
}
except JWTError as e:
logger.debug('JWT validation error: %s', e)
return {
'isTokenValid': False,
'roleArn': ''
}
except Exception as e:
logger.debug('Authorization failed: %s', e)
return {
'isTokenValid': False,
'roleArn': ''
}