OIDC 인증을 위한 AWS Lambda 권한 부여자 설정 - AWS HealthImaging

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

OIDC 인증을 위한 AWS Lambda 권한 부여자 설정

이 안내서에서는 HealthImaging OIDC 인증 기능의 요구 사항과 호환되는 액세스 토큰을 제공하도록 선택한 ID 제공업체(IdP)를 이미 구성했다고 가정합니다.

1. DICOMWeb API 액세스를 위한 IAM 역할 구성

Lambda 권한 부여자를 구성하기 전에 DICOMWeb API 요청을 처리할 때 HealthImaging이 수임할 IAM 역할을 생성합니다. 권한 부여자 Lambda 함수는 토큰 확인에 성공한 후 이러한 역할 ARN 중 하나를 반환하여 HealthImaging이 적절한 권한으로 요청을 실행할 수 있도록 합니다.

  1. 원하는 DICOMWeb API 권한을 정의하는 IAM 정책을 생성합니다. 사용 가능한 권한은 HealthImaging 설명서의 "DICOMweb 사용" 섹션을 참조하세요.

  2. 다음과 같은 IAM 역할을 생성합니다.

    • 이러한 정책 연결

    • AWS HealthImaging 서비스 보안 주체(medical-imaging.amazonaws.com)가 이러한 역할을 수임할 수 있도록 신뢰 관계를 포함합니다.

다음은 연결된 역할이 HealthImaging DICOMWeb 읽기 전용 API에 액세스하도록 허용하는 정책의 예입니다.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "MedicalImagingDicomWebOperations", "Effect": "Allow", "Action": [ "medical-imaging:SearchDICOMInstances", "medical-imaging:GetImageSetMetadata", "medical-imaging:GetDICOMSeriesMetadata", "medical-imaging:SearchDICOMStudies", "medical-imaging:GetDICOMBulkdata", "medical-imaging:SearchDICOMSeries", "medical-imaging:GetDICOMInstanceMetadata", "medical-imaging:GetDICOMInstance", "medical-imaging:GetDICOMInstanceFrames" ], "Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } ] }

다음은 역할(들)과 연결되어야 하는 신뢰 관계 정책의 예입니다.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "OIDCRoleFederation", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

다음 단계에서 생성할 Lambda 권한 부여자는 토큰 클레임을 평가하고 적절한 역할의 ARN을 반환할 수 있습니다. 그런 다음 AWS HealthImaging은이 역할을 가장하여 해당 권한으로 DICOMWeb API 요청을 실행합니다.

예:

  • "관리자" 클레임이 있는 토큰은 전체 액세스 권한이 있는 역할에 대한 ARN을 반환할 수 있습니다.

  • "읽기" 클레임이 있는 토큰은 읽기 전용 액세스 권한이 있는 역할에 대한 ARN을 반환할 수 있습니다.

  • “department_A” 클레임이 있는 토큰은 해당 부서의 액세스 수준에 특정한 역할에 대한 ARN을 반환할 수 있습니다.

이 메커니즘을 사용하면 IAM 역할을 통해 IdP의 권한 부여 모델을 특정 AWS HealthImaging 권한에 매핑할 수 있습니다.

2. Lambda 권한 부여자 함수 생성 및 구성

JWT 토큰을 확인하고 토큰 클레임 평가를 기반으로 적절한 IAM 역할 ARN을 반환하는 Lambda 함수를 생성합니다. 이 함수는 상태 이미징 서비스에서 호출되며 HealthImaging 데이터 스토어 ID, DICOMWeb 작업 및 HTTP 요청에 있는 액세스 토큰이 포함된 이벤트를 전달합니다.

{ "datastoreId": "{datastore id}", "operation": "{Healthimaging API name e.g. GetDICOMInstance}", "bearerToken": "{access token}" }

Lambda 권한 부여자 함수는 다음 구조의 JSON 응답을 반환해야 합니다.

{ "isTokenValid": {true or false}, "roleArn": "{role arn or empty string meaning to deny the request explicitly}" }

자세한 내용은 구현 예제를 참조하세요.

참고

DICOMWeb 요청은 lambda 권한 부여자가 액세스 토큰을 확인한 후에만 응답되므로 최상의 DICOMWeb API 응답 시간을 제공하기 위해이 함수의 실행을 최대한 빨리 하는 것이 중요합니다.

HealthImaging 서비스에 Lambda 권한 부여자 함수를 호출할 수 있는 권한이 부여되려면 HealthImaging 서비스가 해당 함수를 호출할 수 있도록 허용하는 리소스 정책이 있어야 합니다. 이 리소스 정책은 Lambda 구성 탭의 권한 메뉴 또는 다음을 사용하여 생성할 수 있습니다 AWS CLI.

aws lambda add-permission \ --function-name YourAuthorizerFunctionName \ --statement-id HealthImagingInvoke \ --action lambda:InvokeFunction \ --principal medical-imaging.amazonaws.com

이 리소스 정책은 DICOMWeb API 요청을 인증할 때 HealthImaging 서비스가 Lambda 권한 부여자를 호출하도록 허용합니다.

참고

Lambda 리소스 정책은 나중에 특정 HealthImaging 데이터 스토어의 ARN과 일치하는 "ArnLike" 조건으로 업데이트할 수 있습니다.

다음은 Lambda 리소스 정책의 예입니다.

{ "Version": "2012-10-17", "Id": "default", "Statement": [ { "Sid": "LambaAuthorizer-HealthImagingInvokePermission", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}", "Condition": { "ArnLike": { "AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } } } ] }

3. OIDC 인증을 사용하여 새 데이터 스토어 생성

OIDC 인증을 활성화하려면 파라미터 "lambda-authorizer-arn"과 AWS CLI 함께를 사용하여 새 데이터 스토어를 생성해야 합니다. Support에 문의 AWS 하지 않으면 기존 데이터 스토어에서 OIDC 인증을 활성화할 수 없습니다.

다음은 OIDC 인증이 활성화된 새 데이터 스토어를 생성하는 방법의 예입니다.

aws medical-imaging create-datastore \ --datastore-name YourDatastoreName \ --lambda-authorizer-arn YourAuthorizerFunctionArn

get-datastore 명령을 사용하고 AWS CLI "lambdaAuthorizerArn" 속성이 있는지 확인하여 특정 데이터 스토어에 OIDC 인증 기능이 활성화되어 있는지 확인할 수 있습니다.

aws medical-imaging get-datastore --datastore-id YourDatastoreId
{ "datastoreProperties": { "datastoreId": YourdatastoreId, "datastoreName": YourDatastoreName, "datastoreStatus": "ACTIVE", "lambdaAuthorizerArn": YourAuthorizerFunctionArn, "datastoreArn": YourDatastoreArn, "createdAt": "2025-09-30T14:16:04.015000-05:00", "updatedAt": "2025-09-30T14:16:04.015000-05:00" } }
참고

AWS CLI 데이터 스토어 생성 명령의 실행 역할에는 Lambda 권한 부여자 함수를 호출할 수 있는 적절한 권한이 있어야 합니다. 이렇게 하면 악의적인 사용자가 데이터 스토어 권한 부여자 구성을 통해 승인되지 않은 Lambda 함수를 실행할 수 있는 권한 에스컬레이션 공격이 완화됩니다.

예외 코드

인증 실패 시 HealthImaging은 다음 HTTP 오류 응답 코드와 본문 메시지를 반환합니다.

Condition AHI 응답
Lambda 권한 부여자가 존재하지 않거나 잘못되었습니다. 424 권한 부여자 구성 오류
실행 실패로 인해 권한 부여자가 종료됨 424 권한 부여자 실패
매핑되지 않은 다른 권한 부여자 오류 424 권한 부여자 실패
권한 부여자가 유효하지 않거나 형식이 잘못된 응답을 반환함 424 권한 부여자 구성 오류
권한 부여자가 1초 이상 실행됨 408 권한 부여자 제한 시간
토큰이 만료되었거나 유효하지 않습니다. 403 유효하지 않거나 만료된 토큰
권한 부여자 구성 오류로 인해 AHI가 반환된 IAM 역할을 페더레이션할 수 없음 424 권한 부여자 구성 오류
권한 부여자가 빈 역할을 반환함 403 액세스 거부됨
반환된 역할을 호출할 수 없음(assume-role/trust misconfig) 424 권한 부여자 구성 오류
요청 속도가 DICOMweb Gateway 제한을 초과함 429 요청이 너무 많음
데이터 스토어, 반환 역할 또는 권한 부여자 교차 계정/교차 리전 424 권한 부여자 교차 계정/교차 리전 액세스

구현 예제

이 Python 예제는 HealthImaging 이벤트의 AWS Cognito 액세스 토큰을 확인하고 적절한 DICOMWeb 권한이 있는 IAM 역할 ARN을 반환하는 Lambda 권한 부여자 함수를 보여줍니다.

Lambda 권한 부여자는 두 가지 캐싱 메커니즘을 구현하여 외부 호출과 응답 지연 시간을 줄입니다. JWKS(JSON 웹 키 세트)는 1시간마다 한 번씩 가져와 함수의 임시 폴더에 저장되므로 후속 함수 호출은 퍼블릭 네트워크에서 가져오는 대신 로컬에서 읽을 수 있습니다. 또한이 Lambda 함수의 전역 컨텍스트에서 token_cache 딕셔너리 객체가 인스턴스화됩니다. 전역 변수는 동일한 웜 Lambda 컨텍스트를 재사용하는 모든 호출에서 공유됩니다. 이 덕분에 성공적으로 확인된 토큰을이 사전에 저장하고 동일한 Lambda 함수의 다음 실행 중에 빠르게 조회할 수 있습니다. 캐싱 방법은 대부분의 ID 제공업체에서 발급한 액세스 토큰에 맞을 수 있는 일반적인 접근 방식을 나타냅니다. AWS Cognito 관련 캐싱 옵션은 AWS Cognito 설명서사용자 풀 관리 섹션 및 캐싱 섹션을 참조하세요.

import json import os import time import logging from jose import jwk, jwt from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError import requests import tempfile # Configure logging logger = logging.getLogger() log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper() logger.setLevel(getattr(logging, log_level, logging.WARNING)) # Global token cache with TTL token_cache = {} # JWKS cache file path JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json') JWKS_CACHE_TTL = 3600 # 1 hour # Load environment variables once USER_POOL_ID = os.environ['USER_POOL_ID'] CLIENT_ID = os.environ['CLIENT_ID'] ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '') def cleanup_expired_tokens(): """Remove expired tokens from cache""" now = int(time.time()) expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']] for token in expired_keys: del token_cache[token] def get_cached_jwks(): """Get JWKS from cache file if valid, otherwise return None """ try: if os.path.exists(JWKS_CACHE_FILE): # Check if cache file is still valid cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE) if cache_age < JWKS_CACHE_TTL: with open(JWKS_CACHE_FILE, 'r') as f: jwks = json.load(f) logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)') return jwks else: logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)') except Exception as e: logger.debug(f'Error reading JWKS cache: {e}') return None def cache_jwks(jwks): """Cache JWKS to file""" try: with open(JWKS_CACHE_FILE, 'w') as f: json.dump(jwks, f) logger.debug('JWKS cached successfully') except Exception as e: logger.debug(f'Error caching JWKS: {e}') def fetch_jwks(jwks_url): """Fetch JWKS from URL and cache it""" logger.debug('Fetching JWKS from URL') jwks = requests.get(jwks_url, timeout=10).json() # Convert to dict for faster lookups jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']} cache_jwks(jwks) return jwks def is_token_cached(token): if token not in token_cache: return None cached = token_cache[token] now = int(time.time()) if now > cached['cache_expiry']: del token_cache[token] return None return cached def cache_token(token, payload): now = int(time.time()) token_exp = payload.get('exp') cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner token_cache[token] = { 'payload': payload, 'cache_expiry': cache_expiry, 'role_arn': ROLE_ARN } def handler(event, context): cleanup_expired_tokens() # start be removing expired tokens from the cache try: # Extract token from bearerToken or authorizationToken field token = event.get('bearerToken') if not token: raise Exception('No token provided') # Check cache first cached = is_token_cached(token) if cached: logger.debug('Token found in cache, skipping verification') return { 'isTokenValid': True, 'roleArn': cached['role_arn'] } # Get Cognito configuration region = context.invoked_function_arn.split(':')[3] # Get JWKS (cached or fresh) jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json' jwks = get_cached_jwks() if not jwks: jwks = fetch_jwks(jwks_url) # Decode token header to get kid headers = jwt.get_unverified_headers(token) kid = headers['kid'] # Find the correct key key = None for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: # Key not found - try refreshing JWKS in case of key rotation logger.debug('Key not found in cached JWKS, fetching fresh JWKS') jwks = fetch_jwks(jwks_url) for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: raise Exception('Public key not found') # Construct the public key public_key = jwk.construct(key) # Verify and decode the token (includes expiry validation) payload = jwt.decode( token, public_key, algorithms=['RS256'], audience=CLIENT_ID, issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}' ) logger.debug('Token validated successfully') logger.debug('User: %s', payload.get('username', 'unknown')) # Cache the validated token cache_token(token, payload) # Return authorization response return { 'isTokenValid': True, 'roleArn': ROLE_ARN } except ExpiredSignatureError: logger.debug('Token expired') return { 'isTokenValid': False, 'roleArn': '' } except JWTClaimsError: logger.debug('Invalid token claims') return { 'isTokenValid': False, 'roleArn': '' } except JWTError as e: logger.debug('JWT validation error: %s', e) return { 'isTokenValid': False, 'roleArn': '' } except Exception as e: logger.debug('Authorization failed: %s', e) return { 'isTokenValid': False, 'roleArn': '' }