

# Set up an AWS Lambda authorizer for OIDC authentication
<a name="dicomweb-oidc-requirements"></a>

This guide assumes you have already configured your Identity Provider (IdP) of choice to provide access tokens compatible with the requirements of the HealthImaging OIDC authentication feature.

## 1. Configure IAM Roles for DICOMWeb API Access
<a name="dicomweb-oidc-iam-roles"></a>

Before configuring the Lambda authorizer, create IAM roles for HealthImaging to assume when processing DICOMWeb API requests. The authorizer Lambda function returns one of these roles ARN after successful token verification, allowing HealthImaging to execute the requests with appropriate permissions.

1. Create IAM policies defining the desired DICOMWeb API privileges. Refer to the "[Using DICOMweb](https://docs.aws.amazon.com/healthimaging/latest/devguide/using-dicomweb.html)" section of the HealthImaging documentation for available permissions.

1. Create IAM roles that:
   + Attach these policies
   + Include a trust relationship allowing the AWS HealthImaging service principal (`medical-imaging.amazonaws.com`) to assume these roles.

Here is an example of a policy allowing associated roles to access to HealthImaging DICOMWeb read-only API:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "MedicalImagingDicomWebOperations",
            "Effect": "Allow",
            "Action": [
                "medical-imaging:SearchDICOMInstances",
                "medical-imaging:GetImageSetMetadata",
                "medical-imaging:GetDICOMSeriesMetadata",
                "medical-imaging:SearchDICOMStudies",
                "medical-imaging:GetDICOMBulkdata",
                "medical-imaging:SearchDICOMSeries",
                "medical-imaging:GetDICOMInstanceMetadata",
                "medical-imaging:GetDICOMInstance",
                "medical-imaging:GetDICOMInstanceFrames"
            ],
            "Resource": "arn:aws:medical-imaging:us-east-1:123456789012:datastore/datastore-123"
        }
    ]
}
```

------

Here is an example of the trust relationship policy that should be associated to the role(s):

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "OIDCRoleFederation",
            "Effect": "Allow",
            "Principal": {
                "Service": "medical-imaging.amazonaws.com"
        },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

The Lambda authorizer you'll create in the next step can evaluate the token claims and return the ARN of the appropriate role. AWS HealthImaging will then impersonate this role to execute the DICOMWeb API request with the corresponding permissions.

For example:
+ A token with "admin" claims might return an ARN for a role with full access
+ A token with "reader" claims might return an ARN for a role with read-only access
+ A token with "department\$1A" claims might return an ARN for a role specific to that department's access level

This mechanism allows you to map your IdP's authorization model to specific AWS HealthImaging permissions through IAM roles.

## 2. Create and Configure Lambda Authorizer Function
<a name="dicomweb-oidc-configure-lambda"></a>

Create a Lambda function that will verify the JWT token and return the appropriate IAM role ARN based on the token claims evaluation. This function is invoked by the health imaging service and passed an event that contains the HealthImaging datastore Id, the DICOMWeb operation, and the access token found in the HTTP request:

```
{
  "datastoreId": "{datastore id}",
  "operation": "{Healthimaging API name e.g. GetDICOMInstance}",
  "bearerToken": "{access token}"
}
```

The Lambda authorizer function must return a JSON response with the following structure:

```
{
  "isTokenValid": {true or false},
  "roleArn": "{role arn or empty string meaning to deny the request explicitly}"
}
```

You can refer to the implementation example for more information.

**Note**  
Because the DICOMWeb request is only answered after the access token is verified by the lambda authorizer, it is important that the execution of this function be as fast as possible to provide with the best DICOMWeb API response time.

For the HealthImaging service to be authorized to invoke the lambda authorizer function, it must have a resource policy that allows HealthImaging service to invoke it. This resource policy can be created in the permission menu of the lambda configuration tab or Using AWS CLI:

```
aws lambda add-permission \
    --function-name YourAuthorizerFunctionName \
    --statement-id HealthImagingInvoke \
    --action lambda:InvokeFunction \
    --principal medical-imaging.amazonaws.com
```

This resource policy allows the HealthImaging service to invoke your Lambda authorizer when authenticating DICOMWeb API requests.

**Note**  
The lambda resource policy can be updated later on with an "ArnLike" condition matching the ARN of a specific HealthImaging datastore.

Here is an example of lambda resource policy:

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Id": "default",
  "Statement": [
    {
      "Sid": "LambaAuthorizer-HealthImagingInvokePermission",
      "Effect": "Allow",
      "Principal": {
        "Service": "medical-imaging.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:us-east-1:123456789012::function:{LambdaAuthorizerFunctionName}",
      "Condition": {
        "ArnLike": {
          "AWS:SourceArn": "arn:aws:medical-imaging:us-east-1:123456789012:datastore/datastore-123"
        }
      }
    }
  ]
}
```

------

## 3. Create a New Datastore with OIDC Authentication
<a name="dicomweb-oidc-datastore"></a>

To enable OIDC authentication, you must create a new datastore using the AWS CLI with the parameter "lambda-authorizer-arn". OIDC Authentication cannot be enabled on existing datastores without contacting AWS Support.

Here's an example of how to create a new datastore with OIDC authentication enabled:

```
aws medical-imaging create-datastore \
    --datastore-name YourDatastoreName \
    --lambda-authorizer-arn YourAuthorizerFunctionArn
```

You can check if a specific datastore has OIDC authentication feature enabled by using the AWS CLI get-datastore command, and verifying if the attribute "lambdaAuthorizerArn" is present:

```
aws medical-imaging get-datastore --datastore-id YourDatastoreId
```

```
{
    "datastoreProperties": {
        "datastoreId": YourdatastoreId,
        "datastoreName": YourDatastoreName,
        "datastoreStatus": "ACTIVE",
        "lambdaAuthorizerArn": YourAuthorizerFunctionArn,
        "datastoreArn": YourDatastoreArn,
        "createdAt": "2025-09-30T14:16:04.015000-05:00",
        "updatedAt": "2025-09-30T14:16:04.015000-05:00"
    }
}
```

**Note**  
The execution role for the AWS CLI datastore creation command must have appropriate permissions to invoke the Lambda authorizer function. This mitigates privilege escalation attacks where malicious users could execute unauthorized Lambda functions through the datastore authorizer configuration.

## Exception Codes
<a name="dicomweb-oidc-exceptions"></a>

In case of authentication failure HealthImaging returns the following HTTP error response codes and body messages:


| Condition | AHI response | 
| --- | --- | 
| Lambda Authorizer does not exist or is invalid | 424 Authorizer Misconfiguration | 
| Authorizer terminated due to execution failure | 424 Authorizer Failed | 
| Any other unmapped authorizer error | 424 Authorizer Failed | 
| Authorizer returned invalid/ill-formed response | 424 Authorizer Misconfiguration | 
| Authorizer ran more than 1s | 408 Authorizer Timeout | 
| Token is expired or otherwise invalid | 403 Invalid or Expired Token | 
| AHI can't federate the returned IAM Role due to authorizer misconfiguration | 424 Authorizer Misconfiguration | 
| Authorizer returned an empty Role | 403 Access Denied | 
| Returned Role is not callable (assume-role/trust misconfig) | 424 Authorizer Misconfiguration | 
| Request rate exceeds DICOMweb Gateway limits | 429 Too many requests | 
| Datastore, Return Role, or Authorizer Cross Account/Cross Region | 424 Authorizer Cross Account/Cross Region Access | 

## Implementation Example
<a name="dicomweb-oidc-implementation"></a>

This Python example demonstrates a lambda authorizer function that verifies AWS Cognito access tokens from HealthImaging events and returns an IAM role ARN with appropriate DICOMWeb privileges.

The Lambda authorizer implements two caching mechanisms to reduce external calls and response latency. The JWKS (JSON Web Key Set) is fetched once every hour and stored in the function's temporary folder, allowing subsequent function invocations to read it locally instead of fetching from the public network. You will also notice that a token\$1cache dictionary object is instantiated in the global context of this Lambda function. Global variables are shared by all invocations that reuse the same warmed Lambda context. Thanks to this, successfully verified tokens can be stored in this dictionary and looked up quickly during the next execution of this same Lambda function. The caching method represents a generalist approach that could fit access tokens issued from most identity providers. For an AWS Cognito specific caching option, refer to [Managing User pool](https://docs.aws.amazon.com/cognito/latest/developerguide/managing-users.html) section and [caching section](https://docs.aws.amazon.com/cognito/latest/developerguide/amazon-cognito-user-pools-using-tokens-caching-tokens.html) of [AWS Cognito documentation](https://docs.aws.amazon.com/cognito/latest/developerguide/what-is-amazon-cognito.html).

```
import json
import os
import time
import logging
from jose import jwk, jwt
from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError
import requests
import tempfile

# Configure logging
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper()
logger.setLevel(getattr(logging, log_level, logging.WARNING))

# Global token cache with TTL
token_cache = {}

# JWKS cache file path
JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json')
JWKS_CACHE_TTL = 3600  # 1 hour

# Load environment variables once
USER_POOL_ID = os.environ['USER_POOL_ID']
CLIENT_ID = os.environ['CLIENT_ID']
ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '')

def cleanup_expired_tokens():
    """Remove expired tokens from cache"""
    now = int(time.time())
    expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']]
    for token in expired_keys:
        del token_cache[token]

def get_cached_jwks():
    """Get JWKS from cache file if valid, otherwise return None """
    try:
        if os.path.exists(JWKS_CACHE_FILE):
            # Check if cache file is still valid
            cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE)
            if cache_age < JWKS_CACHE_TTL:
                with open(JWKS_CACHE_FILE, 'r') as f:
                    jwks = json.load(f)
                    logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)')
                    return jwks
            else:
                logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)')
    except Exception as e:
        logger.debug(f'Error reading JWKS cache: {e}')
    
    return None

def cache_jwks(jwks):
    """Cache JWKS to file"""
    try:
        with open(JWKS_CACHE_FILE, 'w') as f:
            json.dump(jwks, f)
        logger.debug('JWKS cached successfully')
    except Exception as e:
        logger.debug(f'Error caching JWKS: {e}')

def fetch_jwks(jwks_url):
    """Fetch JWKS from URL and cache it"""
    logger.debug('Fetching JWKS from URL')
    jwks = requests.get(jwks_url, timeout=10).json()
    # Convert to dict for faster lookups
    jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']}
    cache_jwks(jwks)
    return jwks

def is_token_cached(token):
    if token not in token_cache:
        return None
    
    cached = token_cache[token]
    now = int(time.time())
    
    if now > cached['cache_expiry']:
        del token_cache[token]
        return None
    
    return cached

def cache_token(token, payload):
    now = int(time.time())
    token_exp = payload.get('exp')
    cache_expiry = min(now + 60, token_exp)  # 1 minute or token expiry, whichever is sooner
    
    token_cache[token] = {
        'payload': payload,
        'cache_expiry': cache_expiry,
        'role_arn': ROLE_ARN
    }

def handler(event, context):
    cleanup_expired_tokens() # start be removing expired tokens from the cache
    try:
        # Extract token from bearerToken or authorizationToken field
        token = event.get('bearerToken')
        if not token:
            raise Exception('No token provided')
        
        # Check cache first
        cached = is_token_cached(token)
        if cached:
            logger.debug('Token found in cache, skipping verification')
            return {
                'isTokenValid': True,
                'roleArn': cached['role_arn']
            }
        
        # Get Cognito configuration
        region = context.invoked_function_arn.split(':')[3]
        
        # Get JWKS (cached or fresh)
        jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json'
        jwks = get_cached_jwks()
        if not jwks:
            jwks = fetch_jwks(jwks_url)
        
        # Decode token header to get kid
        headers = jwt.get_unverified_headers(token)
        kid = headers['kid']
        
        # Find the correct key
        key = None
        for jwk_key in jwks['keys']:
            if jwk_key['kid'] == kid:
                key = jwk_key
                break
        
        if not key:
            # Key not found - try refreshing JWKS in case of key rotation
            logger.debug('Key not found in cached JWKS, fetching fresh JWKS')
            jwks = fetch_jwks(jwks_url)
            for jwk_key in jwks['keys']:
                if jwk_key['kid'] == kid:
                    key = jwk_key
                    break
        
        if not key:
            raise Exception('Public key not found')
        
        # Construct the public key
        public_key = jwk.construct(key)
        
        # Verify and decode the token (includes expiry validation)
        payload = jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            audience=CLIENT_ID,
            issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}'
        )
        
        logger.debug('Token validated successfully')
        logger.debug('User: %s', payload.get('username', 'unknown'))
        
        # Cache the validated token
        cache_token(token, payload)
        
        # Return authorization response
        return {
            'isTokenValid': True,
            'roleArn': ROLE_ARN
        }
        
    except ExpiredSignatureError:
        logger.debug('Token expired')
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except JWTClaimsError:
        logger.debug('Invalid token claims')
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except JWTError as e:
        logger.debug('JWT validation error: %s', e)
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except Exception as e:
        logger.debug('Authorization failed: %s', e)
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
```