Connessioni Kinesis - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Connessioni Kinesis

È possibile usare una connessione Kinesis per leggere e scrivere su flussi di dati Amazon Kinesis utilizzando le informazioni archiviate in una tabella del Catalogo dati o fornendo informazioni per accedere direttamente al flusso di dati. È possibile leggere le informazioni da Kinesis in uno Spark DataFrame, quindi convertirle in un AWS Glue DynamicFrame. È possibile scrivere DynamicFrames su Kinesis in un formato JSON. Se si accede direttamente al flusso di dati, utilizzare queste opzioni per fornire le informazioni su come accedere al flusso di dati.

Se si utilizza getCatalogSource o create_data_frame_from_catalog per consumare i registri da una sorgente di streaming Kinesis, il processo avrà le informazioni sul database catalogo dati e sul nome della tabella, e potrà usarle per ottenere alcuni parametri di base per la lettura dalla sorgente di streaming Kinesis. Se si utilizza getSource, getSourceWithFormat, createDataFrameFromOptions o create_data_frame_from_options, dovrai specificare questi parametri di base utilizzando le opzioni di connessione descritte qui.

È possibile specificare le opzioni di connessione per Kinesis utilizzando i seguenti argomenti per i metodi specificati nella classe GlueContext.

  • Scala

    • connectionOptions: utilizza con getSource, createDataFrameFromOptions e getSink

    • additionalOptions: utilizza con getCatalogSource, getCatalogSink

    • options: utilizza con getSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options: utilizza con create_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options: utilizza con create_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options: utilizza con getSource, getSink

Per osservazioni e restrizioni sui processi ETL dei flussi di dati, consultare la pagina Streaming di note e restrizioni ETL.

Configurazione di Kinesis

Per connettersi a un flusso di dati Kinesis in un processo AWS Glue Spark, sono necessari alcuni prerequisiti:

  • In lettura, il processo AWS Glue deve disporre delle autorizzazioni IAM di livello di accesso in lettura per il flusso di dati Kinesis.

  • In scrittura, il processo AWS Glue deve disporre delle autorizzazioni IAM di livello di accesso in scrittura per il flusso di dati Kinesis.

In alcuni casi, è necessario configurare ulteriori prerequisiti:

  • Se il tuo processo AWS Glue è configurato con connessioni di rete aggiuntive (in genere per connettersi ad altri set di dati) e una di queste connessioni offre opzioni di rete Amazon VPC, questo porterà il tuo processo a comunicare tramite Amazon VPC. In questo caso, per comunicare tramite Amazon VPC dovrai configurare anche il flusso di dati Kinesis. È possibile farlo creando un endpoint VPC di interfaccia tra l'Amazon VPC e il flusso di dati Kinesis. Per ulteriori informazioni, consulta la pagina Using Amazon Kinesis Data Streams with Interface VPC Endpoints.

  • Quando si specifica un flusso di dati Amazon Kinesis in un altro account, è necessario impostare i ruoli e le policy per consentire l'accesso multi-account. Per ulteriori informazioni, consultare Esempio: lettura da un flusso Kinesis in un account diverso.

Per ulteriori informazioni sui prerequisiti dei processi ETL dei flussi di dati, consultare la pagina Aggiunta di processi di streaming ETL in AWS Glue.

Esempio: lettura da flussi Kinesis

Esempio: lettura da flussi Kinesis

Usato in combinazione con forEachBatch.

Esempio per l'origine di streaming Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Esempio: scrittura su flussi Kinesis

Esempio: lettura da flussi Kinesis

Usato in combinazione con forEachBatch.

Esempio per l'origine di streaming Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Indicazioni di riferimento alle opzioni di connessione a Kinesis

Indica le opzioni di connessione ad Amazon Kinesis Data Streams.

Utilizza le seguenti opzioni di connessione per le origini dati in streaming Kinesis:

  • "streamARN": (obbligatorio) utilizzato per la lettura/scrittura. L'ARN del flusso di dati di Kinesis.

  • "classification": (obbligatorio per la lettura) utilizzato per la lettura. Il formato di file utilizzato dai dati nel record. Obbligatorio, a meno che non sia fornito tramite Catalogo dati.

  • "streamName": (facoltativo) utilizzato per la lettura. Il nome di un flusso di dati Kinesis da cui leggere. Usato con endpointUrl.

  • "endpointUrl": (facoltativo) utilizzato per la lettura. Predefinito: "https://kinesis.us-east-1.amazonaws.com". L'endpoint AWS del flusso di Kinesis. Non è necessario modificarlo a meno che non ci si stia connettendo a una regione speciale.

  • "partitionKey": (facoltativo) utilizzato per la scrittura. La chiave di partizione di Kinesis utilizzata per la produzione dei record.

  • "delimiter": (facoltativo) utilizzato per la lettura. Il separatore di valori utilizzato quando classification è CSV. Il valore predefinito è ",".

  • "startingPosition": (facoltativo) utilizzato per la lettura. La posizione di partenza nel flusso dei dati Kinesis da cui leggere i dati. I valori possibili sono "latest", "trim_horizon", "earliest" o una stringa di timestamp in formato UTC con il modello yyyy-mm-ddTHH:MM:SSZ, dove Z rappresenta uno scostamento del fuso orario UTC con un +/- (ad esempio: "2023-04-04T08:00:00-04:00"). Il valore predefinito è "latest". Nota: la stringa Timestamp in formato UTC per "startingPosition" è supportata solo per AWS Glue versione 4.0 o successiva.

  • "failOnDataLoss": (facoltativo) non è possibile eseguire il processo se una partizione attiva è mancante o scaduta. Il valore predefinito è "false".

  • "awsSTSRoleARN": (facoltativo) utilizzato per la lettura/scrittura. Il nome della risorsa Amazon (ARN) del ruolo da assumere utilizzando AWS Security Token Service (AWS STS). Questo ruolo deve disporre delle autorizzazioni per descrivere o leggere le operazioni dei registri per il flusso di dati Kinesis. Quando si accede a un flusso di dati in un altro account, è necessario utilizzare questo parametro. Usato in combinazione con "awsSTSSessionName".

  • "awsSTSSessionName": (facoltativo) utilizzato per la lettura/scrittura. Un identificatore della sessione che assume il ruolo usando AWS STS. Quando si accede a un flusso di dati in un altro account, è necessario utilizzare questo parametro. Usato in combinazione con "awsSTSRoleARN".

  • "awsSTSEndpoint": (facoltativo) l'endpoint AWS STS da utilizzare quando ci si connette a Kinesis con un ruolo assunto. Ciò consente di utilizzare l'endpoint AWS STS regionale in un VPC, cosa non possibile con l'endpoint globale predefinito.

  • "maxFetchTimeInMs": (facoltativo) utilizzato per la lettura. Il tempo massimo impiegato affinché l'esecutore del processo legga i record della batch attuale dal flusso di dati Kinesis, specificato in millisecondi (ms). Entro questo periodo è possibile effettuate più chiamate API GetRecords. Il valore predefinito è 1000.

  • "maxFetchRecordsPerShard": (facoltativo) utilizzato per la lettura. Il numero massimo di record da recuperare per shard nel flusso di dati Kinesis per microbatch. Nota: il client può superare questo limite se il processo di streaming ha già letto i record aggiuntivi da Kinesis (nella stessa chiamata get-records). Se maxFetchRecordsPerShard deve essere rigoroso, allora deve essere un multiplo di maxRecordPerRead. Il valore predefinito è 100000.

  • "maxRecordPerRead": (facoltativo) utilizzato per la lettura. Il numero massimo di record da recuperare nel flusso di dati Kinesis in ciascuna operazione getRecords. Il valore predefinito è 10000.

  • "addIdleTimeBetweenReads": (facoltativo) utilizzato per la lettura. Aggiunge un ritardo tra due operazioni consecutive getRecords. Il valore predefinito è "False". Questa opzione è configurabile solo per Glue versione 2.0 e successive.

  • "idleTimeBetweenReadsInMs": (facoltativo) utilizzato per la lettura. Il ritardo minimo tra due operazioni consecutive getRecords, specificato in ms. Il valore predefinito è 1000. Questa opzione è configurabile solo per Glue versione 2.0 e successive.

  • "describeShardInterval": (facoltativo) utilizzato per la lettura. L'intervallo di tempo minimo tra due chiamate API ListShards affinché lo script consideri il resharding. Per ulteriori informazioni, consultare Strategie per il resharding nella Guida per gli sviluppatori di Amazon Kinesis Data Streams. Il valore predefinito è 1s.

  • "numRetries": (facoltativo) utilizzato per la lettura. Il numero massimo di tentativi per le richieste API Kinesis Data Streams. Il valore predefinito è 3.

  • "retryIntervalMs": (opzionale) utilizzato per la lettura. Il periodo di raffreddamento (specificato in ms) prima di riprovare la chiamata API Kinesis Data Streams. Il valore predefinito è 1000.

  • "maxRetryIntervalMs": (opzionale) utilizzato per la lettura. Il periodo di raffreddamento (specificato in ms) tra due tentativi di chiamata API Kinesis Data Streams. Il valore predefinito è 10000.

  • "avoidEmptyBatches": (opzionale) utilizzato per la lettura. Impedisce la creazione di un processo microbatch vuoto controllando la presenza di dati non letti nel flusso dei dati Kinesis prima che il batch venga avviato. Il valore predefinito è "False".

  • "schema": (obbligatorio quando inferSchema è impostato su falso) utilizzato per la lettura. Lo schema da utilizzare per elaborare il payload. Se la classificazione è avro, lo schema fornito dovrà essere nel formato dello schema Avro. Se la classificazione è avro, lo schema fornito dovrà essere nel formato dello schema DDL.

    Di seguito sono riportati alcuni esempi di schema.

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (opzionale) utilizzato per la lettura. Il valore predefinito è "false". Se impostato su "true", lo schema verrà rilevato in fase di runtime dal payload all'interno di foreachbatch.

  • "avroSchema": (obsoleto) utilizzato per la lettura. Parametro utilizzato per specificare uno schema di dati Avro quando viene utilizzato il formato Avro. Questo parametro è obsoleto. Utilizzo del parametro schema.

  • "addRecordTimestamp": (opzionale) utilizzato per la lettura. Quando questa opzione è impostata su "true", l'output dei dati conterrà una colonna aggiuntiva denominata "__src_timestamp" che indica l'ora in cui il record corrispondente è stato ricevuto dal flusso. Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.

  • "emitConsumerLagMetrics": (opzionale) utilizzato per la lettura. Quando l'opzione è impostata su "true", per ogni batch, emetterà i parametri relativi alla durata tra il record più vecchio ricevuto dal flusso e l'ora in cui arriva in AWS Glue a CloudWatch. Il nome del parametro è "glue.driver.streaming.maxConsumerLaginMs". Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.

  • "fanoutConsumerARN": (opzionale) utilizzato per la lettura. L'ARN di un consumatore di un flusso Kinesis per il flusso specificato in streamARN. Utilizzato per abilitare la modalità di fan-out avanzato per la connessione Kinesis. Per ulteriori informazioni sull'utilizzo di un flusso Kinesis con fan-out avanzato, consultare la pagina Utilizzo del fan-out avanzato nei processi di flussi di dati Kinesis.

  • "recordMaxBufferedTime": (opzionale) utilizzato per la scrittura. Predefinito: 1000 (ms). Tempo massimo di memorizzazione nel buffer di un record in attesa di essere scritto.

  • "aggregationEnabled": (opzionale) utilizzato per la scrittura. Default: true. Speciifica se i record devono essere aggregati prima di inviarli a Kinesis.

  • "aggregationMaxSize": (opzionale) utilizzato per la scrittura. Impostazione predefinita: 51200 (byte). Se un record è superiore a questo limite, ignorerà l'aggregatore. Ricorda che Kinesis impone un limite di 50 KB alla dimensione del record. Se imposti questo valore oltre i 50 KB, i record di grandi dimensioni verranno rifiutati da Kinesis.

  • "aggregationMaxCount": (facoltativo) utilizzato per la scrittura. Predefinito: 4294967295. Numero massimo di voci da inserire in un record aggregato.

  • "producerRateLimit": (facoltativo) utilizzato per la scrittura. Predefinito: 150 (%). Limita la velocità di trasmissione effettiva per partizione inviata da un singolo produttore (ad esempio, il tuo processo), come percentuale del limite di backend.

  • "collectionMaxCount": (facoltativo) utilizzato per la scrittura. Predefinito: 500. Numero massimo di voci da inserire in una richiesta PutRecords.

  • "collectionMaxSize": (facoltativo) utilizzato per la scrittura. Impostazione predefinita: 5242880 (byte). Quantità massima di dati da inviare con una richiesta PutRecords.