Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Siapkan otorisasi AWS Lambda untuk otentikasi OIDC
Panduan ini mengasumsikan Anda telah mengonfigurasi Identity Provider (IDP) pilihan Anda untuk menyediakan token akses yang kompatibel dengan persyaratan fitur otentikasi HealthImaging OIDC.
1. Konfigurasikan Peran IAM untuk Akses DICOMWeb API
Sebelum mengonfigurasi otorisasi Lambda, buat peran IAM HealthImaging untuk diasumsikan saat memproses permintaan API. DICOMWeb Fungsi Lambda otorisasi mengembalikan salah satu peran ini ARN setelah verifikasi token berhasil, HealthImaging memungkinkan untuk mengeksekusi permintaan dengan izin yang sesuai.
-
Buat kebijakan IAM yang menentukan hak istimewa DICOMWeb API yang diinginkan. Lihat bagian "Menggunakan DICOMweb" dari HealthImaging dokumentasi untuk izin yang tersedia.
-
Buat peran IAM yang:
-
Lampirkan kebijakan ini
-
Sertakan hubungan kepercayaan yang memungkinkan AWS HealthImaging service principal (
medical-imaging.amazonaws.com) untuk mengambil peran ini.
-
Berikut adalah contoh kebijakan yang mengizinkan peran terkait mengakses API HealthImaging DICOMWeb hanya-baca:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "MedicalImagingDicomWebOperations",
"Effect": "Allow",
"Action": [
"medical-imaging:SearchDICOMInstances",
"medical-imaging:GetImageSetMetadata",
"medical-imaging:GetDICOMSeriesMetadata",
"medical-imaging:SearchDICOMStudies",
"medical-imaging:GetDICOMBulkdata",
"medical-imaging:SearchDICOMSeries",
"medical-imaging:GetDICOMInstanceMetadata",
"medical-imaging:GetDICOMInstance",
"medical-imaging:GetDICOMInstanceFrames"
],
"Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}"
}
]
}
Berikut adalah contoh kebijakan hubungan kepercayaan yang harus dikaitkan dengan peran:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "OIDCRoleFederation",
"Effect": "Allow",
"Principal": {
"Service": "medical-imaging.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Authorizer Lambda yang akan Anda buat pada langkah berikutnya dapat mengevaluasi klaim token dan mengembalikan ARN dari peran yang sesuai. AWS kemudian HealthImaging akan meniru peran ini untuk menjalankan permintaan DICOMWeb API dengan izin yang sesuai.
Misalnya:
-
Token dengan klaim “admin” mungkin mengembalikan ARN untuk peran dengan akses penuh
-
Token dengan klaim “pembaca” mungkin mengembalikan ARN untuk peran dengan akses hanya-baca
-
Token dengan klaim “Department_a” mungkin mengembalikan ARN untuk peran khusus untuk tingkat akses departemen tersebut
Mekanisme ini memungkinkan Anda memetakan model otorisasi IDP Anda ke HealthImaging izin AWS tertentu melalui peran IAM.
2. Buat dan Konfigurasikan Fungsi Lambda Authorizer
Buat fungsi Lambda yang akan memverifikasi token JWT dan mengembalikan ARN peran IAM yang sesuai berdasarkan evaluasi klaim token. Fungsi ini dipanggil oleh layanan pencitraan kesehatan dan meneruskan peristiwa yang berisi Id HealthImaging datastore, DICOMWeb operasi, dan token akses yang ditemukan dalam permintaan HTTP:
{
"datastoreId": "{datastore id}",
"operation": "{Healthimaging API name e.g. GetDICOMInstance}",
"bearerToken": "{access token}"
}
Fungsi otorisasi Lambda harus mengembalikan respons JSON dengan struktur berikut:
{
"isTokenValid": {true or false},
"roleArn": "{role arn or empty string meaning to deny the request explicitly}"
}
Anda dapat merujuk ke contoh implementasi untuk informasi lebih lanjut.
catatan
Karena DICOMWeb permintaan hanya dijawab setelah token akses diverifikasi oleh otorisasi lambda, penting bahwa eksekusi fungsi ini secepat mungkin untuk menyediakan waktu respons DICOMWeb API terbaik.
Agar HealthImaging layanan diotorisasi untuk menjalankan fungsi otorisasi lambda, layanan harus memiliki kebijakan sumber daya yang memungkinkan HealthImaging layanan untuk memanggilnya. Kebijakan sumber daya ini dapat dibuat di menu izin tab konfigurasi lambda atau Menggunakan AWS CLI:
aws lambda add-permission \
--function-name YourAuthorizerFunctionName \
--statement-id HealthImagingInvoke \
--action lambda:InvokeFunction \
--principal medical-imaging.amazonaws.com
Kebijakan sumber daya ini memungkinkan HealthImaging layanan untuk memanggil otorisasi Lambda Anda saat DICOMWeb mengautentikasi permintaan API.
catatan
Kebijakan sumber daya lambda dapat diperbarui nanti dengan kondisi "ArnLike" yang cocok dengan ARN dari datastore tertentu HealthImaging .
Berikut adalah contoh kebijakan sumber daya lambda:
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Sid": "LambaAuthorizer-HealthImagingInvokePermission",
"Effect": "Allow",
"Principal": {
"Service": "medical-imaging.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}",
"Condition": {
"ArnLike": {
"AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}"
}
}
}
]
}
3. Buat Datastore Baru dengan Otentikasi OIDC
Untuk mengaktifkan otentikasi OIDC, Anda harus membuat datastore baru menggunakan AWS CLI dengan parameter "”. lambda-authorizer-arn Otentikasi OIDC tidak dapat diaktifkan pada datastores yang ada tanpa menghubungi Support. AWS
Berikut adalah contoh cara membuat datastore baru dengan otentikasi OIDC diaktifkan:
aws medical-imaging create-datastore \
--datastore-name YourDatastoreName \
--lambda-authorizer-arn YourAuthorizerFunctionArn
Anda dapat memeriksa apakah datastore tertentu memiliki fitur otentikasi OIDC diaktifkan dengan menggunakan perintah AWS CLI get-datastore, dan memverifikasi apakah atribut "" ada: lambdaAuthorizerArn
aws medical-imaging get-datastore --datastore-id YourDatastoreId
{
"datastoreProperties": {
"datastoreId": YourdatastoreId,
"datastoreName": YourDatastoreName,
"datastoreStatus": "ACTIVE",
"lambdaAuthorizerArn": YourAuthorizerFunctionArn,
"datastoreArn": YourDatastoreArn,
"createdAt": "2025-09-30T14:16:04.015000-05:00",
"updatedAt": "2025-09-30T14:16:04.015000-05:00"
}
}
catatan
Peran eksekusi untuk perintah pembuatan AWS CLI datastore harus memiliki izin yang sesuai untuk menjalankan fungsi otorisasi Lambda. Ini mengurangi serangan eskalasi hak istimewa di mana pengguna jahat dapat menjalankan fungsi Lambda yang tidak sah melalui konfigurasi otorisasi datastore.
Kode Pengecualian
Dalam kasus kegagalan otentikasi HealthImaging mengembalikan kode respon kesalahan HTTP berikut dan pesan tubuh:
| Kondisi | Tanggapan AHI |
|---|---|
| Lambda Authorizer tidak ada atau tidak valid | 424 Kesalahan Konfigurasi Otorisasi |
| Authorizer dihentikan karena kegagalan eksekusi | 424 Authorizer Gagal |
| Kesalahan otorisasi lain yang tidak dipetakan | 424 Authorizer Gagal |
| Authorizer mengembalikan respons yang tidak valid/tidak terbentuk | 424 Kesalahan Konfigurasi Otorisasi |
| Authorizer berjalan lebih dari 1 d | 408 Batas Waktu Otorisasi |
| Token kedaluwarsa atau tidak valid | 403 Token Tidak Valid atau Kedaluwarsa |
| AHI tidak dapat menggabungkan Peran IAM yang dikembalikan karena kesalahan konfigurasi otorisasi | 424 Kesalahan Konfigurasi Otorisasi |
| Authorizer mengembalikan Peran kosong | 403 Akses Ditolak |
| Peran yang Dikembalikan tidak dapat dipanggil (assume-role/trust misconfig) | 424 Kesalahan Konfigurasi Otorisasi |
| Tingkat permintaan melebihi batas DICOMweb Gateway | 429 Terlalu banyak permintaan |
| Datastore, Peran Pengembalian, atau Authorizer Lintas Wilayah Account/Cross | 424 Akses Account/Cross Lintas Wilayah Authorizer |
Contoh Implementasi
Contoh Python ini menunjukkan fungsi otorisasi lambda yang memverifikasi token akses AWS Cognito dari peristiwa HealthImaging dan mengembalikan peran IAM ARN dengan hak istimewa yang sesuai. DICOMWeb
Authorizer Lambda mengimplementasikan dua mekanisme caching untuk mengurangi panggilan eksternal dan latensi respons. JWKS (JSON Web Key Set) diambil sekali setiap jam dan disimpan dalam folder sementara fungsi, memungkinkan pemanggilan fungsi berikutnya untuk membacanya secara lokal alih-alih mengambil dari jaringan publik. Anda juga akan melihat bahwa objek kamus token_cache dipakai dalam konteks global fungsi Lambda ini. Variabel global dibagi oleh semua pemanggilan yang menggunakan kembali konteks Lambda hangat yang sama. Berkat ini, token yang berhasil diverifikasi dapat disimpan dalam kamus ini dan dicari dengan cepat selama eksekusi berikutnya dari fungsi Lambda yang sama ini. Metode caching merupakan pendekatan generalis yang dapat menyesuaikan token akses yang dikeluarkan dari sebagian besar penyedia identitas. Untuk opsi caching khusus AWS Cognito, lihat bagian Mengelola kumpulan Pengguna dan bagian caching dari dokumentasi Cognito.AWS
import json
import os
import time
import logging
from jose import jwk, jwt
from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError
import requests
import tempfile
# Configure logging
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper()
logger.setLevel(getattr(logging, log_level, logging.WARNING))
# Global token cache with TTL
token_cache = {}
# JWKS cache file path
JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json')
JWKS_CACHE_TTL = 3600 # 1 hour
# Load environment variables once
USER_POOL_ID = os.environ['USER_POOL_ID']
CLIENT_ID = os.environ['CLIENT_ID']
ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '')
def cleanup_expired_tokens():
"""Remove expired tokens from cache"""
now = int(time.time())
expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']]
for token in expired_keys:
del token_cache[token]
def get_cached_jwks():
"""Get JWKS from cache file if valid, otherwise return None """
try:
if os.path.exists(JWKS_CACHE_FILE):
# Check if cache file is still valid
cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE)
if cache_age < JWKS_CACHE_TTL:
with open(JWKS_CACHE_FILE, 'r') as f:
jwks = json.load(f)
logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)')
return jwks
else:
logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)')
except Exception as e:
logger.debug(f'Error reading JWKS cache: {e}')
return None
def cache_jwks(jwks):
"""Cache JWKS to file"""
try:
with open(JWKS_CACHE_FILE, 'w') as f:
json.dump(jwks, f)
logger.debug('JWKS cached successfully')
except Exception as e:
logger.debug(f'Error caching JWKS: {e}')
def fetch_jwks(jwks_url):
"""Fetch JWKS from URL and cache it"""
logger.debug('Fetching JWKS from URL')
jwks = requests.get(jwks_url, timeout=10).json()
# Convert to dict for faster lookups
jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']}
cache_jwks(jwks)
return jwks
def is_token_cached(token):
if token not in token_cache:
return None
cached = token_cache[token]
now = int(time.time())
if now > cached['cache_expiry']:
del token_cache[token]
return None
return cached
def cache_token(token, payload):
now = int(time.time())
token_exp = payload.get('exp')
cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner
token_cache[token] = {
'payload': payload,
'cache_expiry': cache_expiry,
'role_arn': ROLE_ARN
}
def handler(event, context):
cleanup_expired_tokens() # start be removing expired tokens from the cache
try:
# Extract token from bearerToken or authorizationToken field
token = event.get('bearerToken')
if not token:
raise Exception('No token provided')
# Check cache first
cached = is_token_cached(token)
if cached:
logger.debug('Token found in cache, skipping verification')
return {
'isTokenValid': True,
'roleArn': cached['role_arn']
}
# Get Cognito configuration
region = context.invoked_function_arn.split(':')[3]
# Get JWKS (cached or fresh)
jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json'
jwks = get_cached_jwks()
if not jwks:
jwks = fetch_jwks(jwks_url)
# Decode token header to get kid
headers = jwt.get_unverified_headers(token)
kid = headers['kid']
# Find the correct key
key = None
for jwk_key in jwks['keys']:
if jwk_key['kid'] == kid:
key = jwk_key
break
if not key:
# Key not found - try refreshing JWKS in case of key rotation
logger.debug('Key not found in cached JWKS, fetching fresh JWKS')
jwks = fetch_jwks(jwks_url)
for jwk_key in jwks['keys']:
if jwk_key['kid'] == kid:
key = jwk_key
break
if not key:
raise Exception('Public key not found')
# Construct the public key
public_key = jwk.construct(key)
# Verify and decode the token (includes expiry validation)
payload = jwt.decode(
token,
public_key,
algorithms=['RS256'],
audience=CLIENT_ID,
issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}'
)
logger.debug('Token validated successfully')
logger.debug('User: %s', payload.get('username', 'unknown'))
# Cache the validated token
cache_token(token, payload)
# Return authorization response
return {
'isTokenValid': True,
'roleArn': ROLE_ARN
}
except ExpiredSignatureError:
logger.debug('Token expired')
return {
'isTokenValid': False,
'roleArn': ''
}
except JWTClaimsError:
logger.debug('Invalid token claims')
return {
'isTokenValid': False,
'roleArn': ''
}
except JWTError as e:
logger.debug('JWT validation error: %s', e)
return {
'isTokenValid': False,
'roleArn': ''
}
except Exception as e:
logger.debug('Authorization failed: %s', e)
return {
'isTokenValid': False,
'roleArn': ''
}