翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
OIDC 認証用の AWS Lambda オーソライザーを設定する
このガイドでは、選択した ID プロバイダー (IdP) が HealthImaging OIDC 認証機能の要件と互換性のあるアクセストークンを提供するように設定されていることを前提としています。
1. DICOMWeb API アクセス用の IAM ロールを設定する
Lambda オーソライザーを設定する前に、DICOMWeb API リクエストを処理するときに HealthImaging が引き受ける IAM ロールを作成します。オーソライザー Lambda 関数は、トークンの検証に成功した後、これらのロール ARN のいずれかを返します。これにより、HealthImaging は適切なアクセス許可でリクエストを実行できます。
-
目的の DICOMWeb API 権限を定義する IAM ポリシーを作成します。利用可能なアクセス許可については、HealthImaging ドキュメントのDICOMweb の使用」セクションを参照してください。
-
以下の IAM ロールを作成します。
-
これらのポリシーをアタッチする
-
AWS HealthImaging サービスプリンシパル (
medical-imaging.amazonaws.com) がこれらのロールを引き受けることを許可する信頼関係を含めます。
-
以下は、関連付けられたロールが HealthImaging DICOMWeb 読み取り専用 API にアクセスすることを許可するポリシーの例です。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "MedicalImagingDicomWebOperations",
"Effect": "Allow",
"Action": [
"medical-imaging:SearchDICOMInstances",
"medical-imaging:GetImageSetMetadata",
"medical-imaging:GetDICOMSeriesMetadata",
"medical-imaging:SearchDICOMStudies",
"medical-imaging:GetDICOMBulkdata",
"medical-imaging:SearchDICOMSeries",
"medical-imaging:GetDICOMInstanceMetadata",
"medical-imaging:GetDICOMInstance",
"medical-imaging:GetDICOMInstanceFrames"
],
"Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}"
}
]
}
ロール (複数可) に関連付ける必要がある信頼関係ポリシーの例を次に示します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "OIDCRoleFederation",
"Effect": "Allow",
"Principal": {
"Service": "medical-imaging.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
次のステップで作成する Lambda オーソライザーは、トークンクレームを評価し、適切なロールの ARN を返すことができます。その後、AWS HealthImaging はこのロールを偽装して、対応するアクセス許可で DICOMWeb API リクエストを実行します。
例:
-
「管理者」クレームを持つトークンは、フルアクセスを持つロールの ARN を返す場合があります
-
「リーダー」クレームを持つトークンは、読み取り専用アクセスを持つロールの ARN を返す場合があります
-
「department_A」クレームを持つトークンは、その部門のアクセスレベルに固有のロールの ARN を返す場合があります
このメカニズムにより、IAM ロールを通じて IdP の認可モデルを特定の AWS HealthImaging アクセス許可にマッピングできます。
2. Lambda オーソライザー関数の作成と設定
JWT トークンを検証し、トークンクレーム評価に基づいて適切な IAM ロール ARN を返す Lambda 関数を作成します。この関数はヘルスイメージングサービスによって呼び出され、HealthImaging データストア ID、DICOMWeb オペレーション、HTTP リクエストで見つかったアクセストークンを含むイベントを渡します。
{
"datastoreId": "{datastore id}",
"operation": "{Healthimaging API name e.g. GetDICOMInstance}",
"bearerToken": "{access token}"
}
Lambda オーソライザー関数は、次の構造を持つ JSON レスポンスを返す必要があります。
{
"isTokenValid": {true or false},
"roleArn": "{role arn or empty string meaning to deny the request explicitly}"
}
詳細については、実装例を参照してください。
注記
DICOMWeb リクエストは、アクセストークンが Lambda オーソライザーによって検証された後にのみ応答されるため、この関数の実行を可能な限り速くして、最適な DICOMWeb API 応答時間を提供することが重要です。
HealthImaging サービスが Lambda オーソライザー関数を呼び出す権限を付与されるようにするには、HealthImaging サービスが関数を呼び出すことを許可するリソースポリシーが必要です。このリソースポリシーは、Lambda 設定タブのアクセス許可メニューまたは以下を使用して作成できます AWS CLI。
aws lambda add-permission \
--function-name YourAuthorizerFunctionName \
--statement-id HealthImagingInvoke \
--action lambda:InvokeFunction \
--principal medical-imaging.amazonaws.com
このリソースポリシーにより、HealthImaging サービスは DICOMWeb API リクエストを認証するときに Lambda オーソライザーを呼び出すことができます。
注記
Lambda リソースポリシーは、特定の HealthImaging データストアの ARN に一致するArnLike」条件で後で更新できます。
Lambda リソースポリシーの例を次に示します。
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Sid": "LambaAuthorizer-HealthImagingInvokePermission",
"Effect": "Allow",
"Principal": {
"Service": "medical-imaging.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}",
"Condition": {
"ArnLike": {
"AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}"
}
}
}
]
}
3. OIDC 認証を使用して新しいデータストアを作成する
OIDC 認証を有効にするには、lambda-authorizer-arn」パラメータ AWS CLI を持つ を使用して新しいデータストアを作成する必要があります。サポートに連絡して AWS いなければ、既存のデータストアで OIDC 認証を有効にすることはできません。
OIDC 認証を有効にして新しいデータストアを作成する方法の例を次に示します。
aws medical-imaging create-datastore \
--datastore-name YourDatastoreName \
--lambda-authorizer-arn YourAuthorizerFunctionArn
get-datastore コマンドを使用して AWS CLI 、特定のデータストアで OIDC 認証機能が有効になっているかどうか、および属性lambdaAuthorizerArn」が存在するかどうかを確認できます。
aws medical-imaging get-datastore --datastore-id YourDatastoreId
{
"datastoreProperties": {
"datastoreId": YourdatastoreId,
"datastoreName": YourDatastoreName,
"datastoreStatus": "ACTIVE",
"lambdaAuthorizerArn": YourAuthorizerFunctionArn,
"datastoreArn": YourDatastoreArn,
"createdAt": "2025-09-30T14:16:04.015000-05:00",
"updatedAt": "2025-09-30T14:16:04.015000-05:00"
}
}
注記
AWS CLI データストア作成コマンドの実行ロールには、Lambda オーソライザー関数を呼び出すための適切なアクセス許可が必要です。これにより、悪意のあるユーザーがデータストアオーソライザー設定を通じて不正な Lambda 関数を実行できる特権エスカレーション攻撃が軽減されます。
例外コード
認証に失敗した場合、HealthImaging は次の HTTP エラーレスポンスコードと本文メッセージを返します。
| 条件 | AHI レスポンス |
|---|---|
| Lambda オーソライザーが存在しないか、無効です | 424 オーソライザーの設定ミス |
| 実行の失敗によりオーソライザーが終了しました | 424 オーソライザーが失敗しました |
| マッピングされていない他のオーソライザーエラー | 424 オーソライザーが失敗しました |
| オーソライザーが無効な/不正な形式のレスポンスを返しました | 424 オーソライザーの設定ミス |
| オーソライザーが 1 秒以上実行されました | 408 オーソライザータイムアウト |
| トークンの有効期限が切れているか、無効です | 403 無効または期限切れのトークン |
| オーソライザーの設定ミスにより、AHI は返された IAM ロールをフェデレーションできません | 424 オーソライザーの設定ミス |
| オーソライザーが空のロールを返しました | 403 アクセスが拒否されました |
| 返されたロールは呼び出せません (assume-role/trust misconfig) | 424 オーソライザーの設定ミス |
| リクエストレートが DICOMweb Gateway の制限を超えています | 429 リクエストが多すぎる |
| データストア、リターンロール、またはオーソライザーのクロスアカウント/クロスリージョン | 424 オーソライザークロスアカウント/クロスリージョンアクセス |
実装例
この Python の例は、HealthImaging イベントからの AWS Cognito アクセストークンを検証し、適切な DICOMWeb 権限を持つ IAM ロール ARN を返す Lambda オーソライザー関数を示しています。
Lambda オーソライザーは、2 つのキャッシュメカニズムを実装して、外部呼び出しと応答のレイテンシーを削減します。JWKS (JSON ウェブキーセット) は 1 時間に 1 回フェッチされ、関数の一時フォルダに保存されるため、後続の関数呼び出しはパブリックネットワークからフェッチする代わりにローカルで読み取ることができます。token_cache ディクショナリオブジェクトがこの Lambda 関数のグローバルコンテキストでインスタンス化されていることもわかります。グローバル変数は、同じウォームされた Lambda コンテキストを再利用するすべての呼び出しによって共有されます。これにより、正常に検証されたトークンをこのディクショナリに保存し、この同じ Lambda 関数の次の実行中にすばやく検索できます。キャッシュ方法は、ほとんどの ID プロバイダーから発行されたアクセストークンに適合する可能性がある一般論的なアプローチを表します。 AWS Cognito 固有のキャッシュオプションについては、AWS Cognito ドキュメントの「ユーザープールの管理」セクションと「キャッシュ」セクションを参照してください。
import json
import os
import time
import logging
from jose import jwk, jwt
from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError
import requests
import tempfile
# Configure logging
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper()
logger.setLevel(getattr(logging, log_level, logging.WARNING))
# Global token cache with TTL
token_cache = {}
# JWKS cache file path
JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json')
JWKS_CACHE_TTL = 3600 # 1 hour
# Load environment variables once
USER_POOL_ID = os.environ['USER_POOL_ID']
CLIENT_ID = os.environ['CLIENT_ID']
ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '')
def cleanup_expired_tokens():
"""Remove expired tokens from cache"""
now = int(time.time())
expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']]
for token in expired_keys:
del token_cache[token]
def get_cached_jwks():
"""Get JWKS from cache file if valid, otherwise return None """
try:
if os.path.exists(JWKS_CACHE_FILE):
# Check if cache file is still valid
cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE)
if cache_age < JWKS_CACHE_TTL:
with open(JWKS_CACHE_FILE, 'r') as f:
jwks = json.load(f)
logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)')
return jwks
else:
logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)')
except Exception as e:
logger.debug(f'Error reading JWKS cache: {e}')
return None
def cache_jwks(jwks):
"""Cache JWKS to file"""
try:
with open(JWKS_CACHE_FILE, 'w') as f:
json.dump(jwks, f)
logger.debug('JWKS cached successfully')
except Exception as e:
logger.debug(f'Error caching JWKS: {e}')
def fetch_jwks(jwks_url):
"""Fetch JWKS from URL and cache it"""
logger.debug('Fetching JWKS from URL')
jwks = requests.get(jwks_url, timeout=10).json()
# Convert to dict for faster lookups
jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']}
cache_jwks(jwks)
return jwks
def is_token_cached(token):
if token not in token_cache:
return None
cached = token_cache[token]
now = int(time.time())
if now > cached['cache_expiry']:
del token_cache[token]
return None
return cached
def cache_token(token, payload):
now = int(time.time())
token_exp = payload.get('exp')
cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner
token_cache[token] = {
'payload': payload,
'cache_expiry': cache_expiry,
'role_arn': ROLE_ARN
}
def handler(event, context):
cleanup_expired_tokens() # start be removing expired tokens from the cache
try:
# Extract token from bearerToken or authorizationToken field
token = event.get('bearerToken')
if not token:
raise Exception('No token provided')
# Check cache first
cached = is_token_cached(token)
if cached:
logger.debug('Token found in cache, skipping verification')
return {
'isTokenValid': True,
'roleArn': cached['role_arn']
}
# Get Cognito configuration
region = context.invoked_function_arn.split(':')[3]
# Get JWKS (cached or fresh)
jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json'
jwks = get_cached_jwks()
if not jwks:
jwks = fetch_jwks(jwks_url)
# Decode token header to get kid
headers = jwt.get_unverified_headers(token)
kid = headers['kid']
# Find the correct key
key = None
for jwk_key in jwks['keys']:
if jwk_key['kid'] == kid:
key = jwk_key
break
if not key:
# Key not found - try refreshing JWKS in case of key rotation
logger.debug('Key not found in cached JWKS, fetching fresh JWKS')
jwks = fetch_jwks(jwks_url)
for jwk_key in jwks['keys']:
if jwk_key['kid'] == kid:
key = jwk_key
break
if not key:
raise Exception('Public key not found')
# Construct the public key
public_key = jwk.construct(key)
# Verify and decode the token (includes expiry validation)
payload = jwt.decode(
token,
public_key,
algorithms=['RS256'],
audience=CLIENT_ID,
issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}'
)
logger.debug('Token validated successfully')
logger.debug('User: %s', payload.get('username', 'unknown'))
# Cache the validated token
cache_token(token, payload)
# Return authorization response
return {
'isTokenValid': True,
'roleArn': ROLE_ARN
}
except ExpiredSignatureError:
logger.debug('Token expired')
return {
'isTokenValid': False,
'roleArn': ''
}
except JWTClaimsError:
logger.debug('Invalid token claims')
return {
'isTokenValid': False,
'roleArn': ''
}
except JWTError as e:
logger.debug('JWT validation error: %s', e)
return {
'isTokenValid': False,
'roleArn': ''
}
except Exception as e:
logger.debug('Authorization failed: %s', e)
return {
'isTokenValid': False,
'roleArn': ''
}