Siapkan otorisasi AWS Lambda untuk otentikasi OIDC - AWS HealthImaging

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Siapkan otorisasi AWS Lambda untuk otentikasi OIDC

Panduan ini mengasumsikan Anda telah mengonfigurasi Identity Provider (IDP) pilihan Anda untuk menyediakan token akses yang kompatibel dengan persyaratan fitur otentikasi HealthImaging OIDC.

1. Konfigurasikan Peran IAM untuk Akses DICOMWeb API

Sebelum mengonfigurasi otorisasi Lambda, buat peran IAM HealthImaging untuk diasumsikan saat memproses permintaan API. DICOMWeb Fungsi Lambda otorisasi mengembalikan salah satu peran ini ARN setelah verifikasi token berhasil, HealthImaging memungkinkan untuk mengeksekusi permintaan dengan izin yang sesuai.

  1. Buat kebijakan IAM yang menentukan hak istimewa DICOMWeb API yang diinginkan. Lihat bagian "Menggunakan DICOMweb" dari HealthImaging dokumentasi untuk izin yang tersedia.

  2. Buat peran IAM yang:

    • Lampirkan kebijakan ini

    • Sertakan hubungan kepercayaan yang memungkinkan AWS HealthImaging service principal (medical-imaging.amazonaws.com) untuk mengambil peran ini.

Berikut adalah contoh kebijakan yang mengizinkan peran terkait mengakses API HealthImaging DICOMWeb hanya-baca:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "MedicalImagingDicomWebOperations", "Effect": "Allow", "Action": [ "medical-imaging:SearchDICOMInstances", "medical-imaging:GetImageSetMetadata", "medical-imaging:GetDICOMSeriesMetadata", "medical-imaging:SearchDICOMStudies", "medical-imaging:GetDICOMBulkdata", "medical-imaging:SearchDICOMSeries", "medical-imaging:GetDICOMInstanceMetadata", "medical-imaging:GetDICOMInstance", "medical-imaging:GetDICOMInstanceFrames" ], "Resource": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } ] }

Berikut adalah contoh kebijakan hubungan kepercayaan yang harus dikaitkan dengan peran:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "OIDCRoleFederation", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Authorizer Lambda yang akan Anda buat pada langkah berikutnya dapat mengevaluasi klaim token dan mengembalikan ARN dari peran yang sesuai. AWS kemudian HealthImaging akan meniru peran ini untuk menjalankan permintaan DICOMWeb API dengan izin yang sesuai.

Misalnya:

  • Token dengan klaim “admin” mungkin mengembalikan ARN untuk peran dengan akses penuh

  • Token dengan klaim “pembaca” mungkin mengembalikan ARN untuk peran dengan akses hanya-baca

  • Token dengan klaim “Department_a” mungkin mengembalikan ARN untuk peran khusus untuk tingkat akses departemen tersebut

Mekanisme ini memungkinkan Anda memetakan model otorisasi IDP Anda ke HealthImaging izin AWS tertentu melalui peran IAM.

2. Buat dan Konfigurasikan Fungsi Lambda Authorizer

Buat fungsi Lambda yang akan memverifikasi token JWT dan mengembalikan ARN peran IAM yang sesuai berdasarkan evaluasi klaim token. Fungsi ini dipanggil oleh layanan pencitraan kesehatan dan meneruskan peristiwa yang berisi Id HealthImaging datastore, DICOMWeb operasi, dan token akses yang ditemukan dalam permintaan HTTP:

{ "datastoreId": "{datastore id}", "operation": "{Healthimaging API name e.g. GetDICOMInstance}", "bearerToken": "{access token}" }

Fungsi otorisasi Lambda harus mengembalikan respons JSON dengan struktur berikut:

{ "isTokenValid": {true or false}, "roleArn": "{role arn or empty string meaning to deny the request explicitly}" }

Anda dapat merujuk ke contoh implementasi untuk informasi lebih lanjut.

catatan

Karena DICOMWeb permintaan hanya dijawab setelah token akses diverifikasi oleh otorisasi lambda, penting bahwa eksekusi fungsi ini secepat mungkin untuk menyediakan waktu respons DICOMWeb API terbaik.

Agar HealthImaging layanan diotorisasi untuk menjalankan fungsi otorisasi lambda, layanan harus memiliki kebijakan sumber daya yang memungkinkan HealthImaging layanan untuk memanggilnya. Kebijakan sumber daya ini dapat dibuat di menu izin tab konfigurasi lambda atau Menggunakan AWS CLI:

aws lambda add-permission \ --function-name YourAuthorizerFunctionName \ --statement-id HealthImagingInvoke \ --action lambda:InvokeFunction \ --principal medical-imaging.amazonaws.com

Kebijakan sumber daya ini memungkinkan HealthImaging layanan untuk memanggil otorisasi Lambda Anda saat DICOMWeb mengautentikasi permintaan API.

catatan

Kebijakan sumber daya lambda dapat diperbarui nanti dengan kondisi "ArnLike" yang cocok dengan ARN dari datastore tertentu HealthImaging .

Berikut adalah contoh kebijakan sumber daya lambda:

{ "Version": "2012-10-17", "Id": "default", "Statement": [ { "Sid": "LambaAuthorizer-HealthImagingInvokePermission", "Effect": "Allow", "Principal": { "Service": "medical-imaging.amazonaws.com" }, "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:{Region}:{Account}::function:{LambdaAuthorizerFunctionName}", "Condition": { "ArnLike": { "AWS:SourceArn": "arn:aws:medical-imaging:{Region}:{Account}:datastore/{DatastoreId}" } } } ] }

3. Buat Datastore Baru dengan Otentikasi OIDC

Untuk mengaktifkan otentikasi OIDC, Anda harus membuat datastore baru menggunakan AWS CLI dengan parameter "”. lambda-authorizer-arn Otentikasi OIDC tidak dapat diaktifkan pada datastores yang ada tanpa menghubungi Support. AWS

Berikut adalah contoh cara membuat datastore baru dengan otentikasi OIDC diaktifkan:

aws medical-imaging create-datastore \ --datastore-name YourDatastoreName \ --lambda-authorizer-arn YourAuthorizerFunctionArn

Anda dapat memeriksa apakah datastore tertentu memiliki fitur otentikasi OIDC diaktifkan dengan menggunakan perintah AWS CLI get-datastore, dan memverifikasi apakah atribut "" ada: lambdaAuthorizerArn

aws medical-imaging get-datastore --datastore-id YourDatastoreId
{ "datastoreProperties": { "datastoreId": YourdatastoreId, "datastoreName": YourDatastoreName, "datastoreStatus": "ACTIVE", "lambdaAuthorizerArn": YourAuthorizerFunctionArn, "datastoreArn": YourDatastoreArn, "createdAt": "2025-09-30T14:16:04.015000-05:00", "updatedAt": "2025-09-30T14:16:04.015000-05:00" } }
catatan

Peran eksekusi untuk perintah pembuatan AWS CLI datastore harus memiliki izin yang sesuai untuk menjalankan fungsi otorisasi Lambda. Ini mengurangi serangan eskalasi hak istimewa di mana pengguna jahat dapat menjalankan fungsi Lambda yang tidak sah melalui konfigurasi otorisasi datastore.

Kode Pengecualian

Dalam kasus kegagalan otentikasi HealthImaging mengembalikan kode respon kesalahan HTTP berikut dan pesan tubuh:

Kondisi Tanggapan AHI
Lambda Authorizer tidak ada atau tidak valid 424 Kesalahan Konfigurasi Otorisasi
Authorizer dihentikan karena kegagalan eksekusi 424 Authorizer Gagal
Kesalahan otorisasi lain yang tidak dipetakan 424 Authorizer Gagal
Authorizer mengembalikan respons yang tidak valid/tidak terbentuk 424 Kesalahan Konfigurasi Otorisasi
Authorizer berjalan lebih dari 1 d 408 Batas Waktu Otorisasi
Token kedaluwarsa atau tidak valid 403 Token Tidak Valid atau Kedaluwarsa
AHI tidak dapat menggabungkan Peran IAM yang dikembalikan karena kesalahan konfigurasi otorisasi 424 Kesalahan Konfigurasi Otorisasi
Authorizer mengembalikan Peran kosong 403 Akses Ditolak
Peran yang Dikembalikan tidak dapat dipanggil (assume-role/trust misconfig) 424 Kesalahan Konfigurasi Otorisasi
Tingkat permintaan melebihi batas DICOMweb Gateway 429 Terlalu banyak permintaan
Datastore, Peran Pengembalian, atau Authorizer Lintas Wilayah Account/Cross 424 Akses Account/Cross Lintas Wilayah Authorizer

Contoh Implementasi

Contoh Python ini menunjukkan fungsi otorisasi lambda yang memverifikasi token akses AWS Cognito dari peristiwa HealthImaging dan mengembalikan peran IAM ARN dengan hak istimewa yang sesuai. DICOMWeb

Authorizer Lambda mengimplementasikan dua mekanisme caching untuk mengurangi panggilan eksternal dan latensi respons. JWKS (JSON Web Key Set) diambil sekali setiap jam dan disimpan dalam folder sementara fungsi, memungkinkan pemanggilan fungsi berikutnya untuk membacanya secara lokal alih-alih mengambil dari jaringan publik. Anda juga akan melihat bahwa objek kamus token_cache dipakai dalam konteks global fungsi Lambda ini. Variabel global dibagi oleh semua pemanggilan yang menggunakan kembali konteks Lambda hangat yang sama. Berkat ini, token yang berhasil diverifikasi dapat disimpan dalam kamus ini dan dicari dengan cepat selama eksekusi berikutnya dari fungsi Lambda yang sama ini. Metode caching merupakan pendekatan generalis yang dapat menyesuaikan token akses yang dikeluarkan dari sebagian besar penyedia identitas. Untuk opsi caching khusus AWS Cognito, lihat bagian Mengelola kumpulan Pengguna dan bagian caching dari dokumentasi Cognito.AWS

import json import os import time import logging from jose import jwk, jwt from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError import requests import tempfile # Configure logging logger = logging.getLogger() log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper() logger.setLevel(getattr(logging, log_level, logging.WARNING)) # Global token cache with TTL token_cache = {} # JWKS cache file path JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json') JWKS_CACHE_TTL = 3600 # 1 hour # Load environment variables once USER_POOL_ID = os.environ['USER_POOL_ID'] CLIENT_ID = os.environ['CLIENT_ID'] ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '') def cleanup_expired_tokens(): """Remove expired tokens from cache""" now = int(time.time()) expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']] for token in expired_keys: del token_cache[token] def get_cached_jwks(): """Get JWKS from cache file if valid, otherwise return None """ try: if os.path.exists(JWKS_CACHE_FILE): # Check if cache file is still valid cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE) if cache_age < JWKS_CACHE_TTL: with open(JWKS_CACHE_FILE, 'r') as f: jwks = json.load(f) logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)') return jwks else: logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)') except Exception as e: logger.debug(f'Error reading JWKS cache: {e}') return None def cache_jwks(jwks): """Cache JWKS to file""" try: with open(JWKS_CACHE_FILE, 'w') as f: json.dump(jwks, f) logger.debug('JWKS cached successfully') except Exception as e: logger.debug(f'Error caching JWKS: {e}') def fetch_jwks(jwks_url): """Fetch JWKS from URL and cache it""" logger.debug('Fetching JWKS from URL') jwks = requests.get(jwks_url, timeout=10).json() # Convert to dict for faster lookups jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']} cache_jwks(jwks) return jwks def is_token_cached(token): if token not in token_cache: return None cached = token_cache[token] now = int(time.time()) if now > cached['cache_expiry']: del token_cache[token] return None return cached def cache_token(token, payload): now = int(time.time()) token_exp = payload.get('exp') cache_expiry = min(now + 60, token_exp) # 1 minute or token expiry, whichever is sooner token_cache[token] = { 'payload': payload, 'cache_expiry': cache_expiry, 'role_arn': ROLE_ARN } def handler(event, context): cleanup_expired_tokens() # start be removing expired tokens from the cache try: # Extract token from bearerToken or authorizationToken field token = event.get('bearerToken') if not token: raise Exception('No token provided') # Check cache first cached = is_token_cached(token) if cached: logger.debug('Token found in cache, skipping verification') return { 'isTokenValid': True, 'roleArn': cached['role_arn'] } # Get Cognito configuration region = context.invoked_function_arn.split(':')[3] # Get JWKS (cached or fresh) jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json' jwks = get_cached_jwks() if not jwks: jwks = fetch_jwks(jwks_url) # Decode token header to get kid headers = jwt.get_unverified_headers(token) kid = headers['kid'] # Find the correct key key = None for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: # Key not found - try refreshing JWKS in case of key rotation logger.debug('Key not found in cached JWKS, fetching fresh JWKS') jwks = fetch_jwks(jwks_url) for jwk_key in jwks['keys']: if jwk_key['kid'] == kid: key = jwk_key break if not key: raise Exception('Public key not found') # Construct the public key public_key = jwk.construct(key) # Verify and decode the token (includes expiry validation) payload = jwt.decode( token, public_key, algorithms=['RS256'], audience=CLIENT_ID, issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}' ) logger.debug('Token validated successfully') logger.debug('User: %s', payload.get('username', 'unknown')) # Cache the validated token cache_token(token, payload) # Return authorization response return { 'isTokenValid': True, 'roleArn': ROLE_ARN } except ExpiredSignatureError: logger.debug('Token expired') return { 'isTokenValid': False, 'roleArn': '' } except JWTClaimsError: logger.debug('Invalid token claims') return { 'isTokenValid': False, 'roleArn': '' } except JWTError as e: logger.debug('JWT validation error: %s', e) return { 'isTokenValid': False, 'roleArn': '' } except Exception as e: logger.debug('Authorization failed: %s', e) return { 'isTokenValid': False, 'roleArn': '' }