Connessioni Kafka - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Connessioni Kafka

È possibile usare una connessione Kafka per leggere e scrivere su flussi di dati Kafka utilizzando le informazioni archiviate in una tabella del Catalogo dati o fornendo informazioni per accedere direttamente al flusso di dati. La connessione supporta un cluster Kafka o un cluster Streaming gestito da Amazon per Apache Kafka. È possibile leggere le informazioni da Kafka in un DataFrame Spark, quindi convertirle in un DynamicFrame AWS Glue. È possibile scrivere DynamicFrames su Kafka in un formato JSON. Se si accede direttamente al flusso di dati, utilizzare queste opzioni per fornire le informazioni su come accedere al flusso di dati.

Se si utilizza getCatalogSource o create_data_frame_from_catalog per consumare i record da una sorgente di streaming Kafka oppure getCatalogSink o write_dynamic_frame_from_catalog per scrivere i record su Kafka, il processo avrà le informazioni sul database del Catalogo dati e sul nome della tabella, e potrà usarle per ottenere alcuni parametri di base per la lettura dall'origine di streaming Kafka. Se si utilizza getSource, getCatalogSink, getSourceWithFormat, getSinkWithFormat, createDataFrameFromOptions, create_data_frame_from_options o write_dynamic_frame_from_catalog, sarà necessario specificare questi parametri di base utilizzando le opzioni di connessione descritte qui.

È possibile specificare le opzioni di connessione per Kafka utilizzando gli argomenti per i metodi specificati nella classe GlueContext descritti di seguito.

  • Scala

    • connectionOptions: utilizza con getSource, createDataFrameFromOptions e getSink

    • additionalOptions: utilizza con getCatalogSource, getCatalogSink

    • options: utilizza con getSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options: utilizza con create_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options: utilizza con create_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options: utilizza con getSource, getSink

Per osservazioni e restrizioni sui processi ETL dei flussi di dati, consultare la pagina Streaming di note e restrizioni ETL.

Argomenti

    Configurazione di Kafka

    Non sussistono prerequisiti AWS per la connessione ai flussi di Kafka disponibili su Internet.

    È possibile creare una connessione AWS Glue Kafka per gestire le proprie credenziali di connessione. Per ulteriori informazioni, consultare Creazione di una connessione AWS Glue per un flusso di dati Apache Kafka. Nella configurazione del processo AWS Glue, fornisci connectionName come Connessione di rete aggiuntiva, quindi, nella chiamata al metodo, fornisci connectionName al parametro connectionName.

    In alcuni casi, è necessario configurare ulteriori prerequisiti:

    • Se utilizzi Streaming gestito da Amazon per Apache Kafka con l'autenticazione IAM, avrai bisogno di una configurazione appropriata di IAM.

    • Se utilizzi Streaming gestito da Amazon per Apache Kafka con un Amazon VPC, avrai bisogno di una configurazione appropriata di Amazon VPC. Dovrai creare una connessione ad AWS Glue che fornisca informazioni sulla connessione ad Amazon VPC. È necessario che la configurazione del processo includa la connessione AWS Glue come connessione di rete aggiuntiva.

    Per ulteriori informazioni sui prerequisiti dei processi ETL dei flussi di dati, consulta la pagina Aggiunta di processi di streaming ETL in AWS Glue.

    Esempio: lettura di flussi da Kafka

    Usato in combinazione con forEachBatch.

    Esempio per l'origine di streaming Kafka:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Esempio: scrittura in flussi Kafka

    Esempi di scrittura in Kafka:

    Esempio con il metodo getSink:

    data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

    Esempio con il metodo write_dynamic_frame.from_options:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Indicazioni di riferimento alle opzioni di connessione a Kafka

    Per la lettura, utilizzare le seguenti opzioni di connessione con "connectionType": "kafka":

    • "bootstrap.servers": (obbligatorio) un elenco di URL del server bootstrap, ad esempio, come b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094. Questa opzione deve essere specificata nella chiamata API o definita nei metadati della tabella in catalogo dati.

    • "security.protocol": (obbligatorio) Il protocollo utilizzato per comunicare con i broker. I valori possibili sono "SSL" o "PLAINTEXT".

    • "topicName": (obbligatorio) un elenco separato da virgole di argomenti a cui iscriversi. Devi specificare solo uno tra "topicName", "assign" o "subscribePattern".

    • "assign": (obbligatorio) una stringa JSON che specifica il TopicPartitions specifico da utilizzare. Devi specificare solo uno tra "topicName", "assign" o "subscribePattern".

      Esempio: '{"topicA":[0,1],"topicB":[2,4]}'

    • "subscribePattern": (obbligatorio) una stringa regex Java che identifichi l'elenco degli argomenti a cui effettuare la sottoscrizione. Devi specificare solo uno tra "topicName", "assign" o "subscribePattern".

      Esempio: 'topic.*'

    • "classification" (obbligatorio): il formato di file utilizzato dai dati nel record. Obbligatorio, a meno che non sia fornito tramite Catalogo dati.

    • "delimiter" (opzionale): il separatore di valori utilizzato quando classification è CSV. Il valore predefinito è “,”.

    • "startingOffsets": (opzionale) la posizione di partenza nell'argomento Kafka da cui leggere i dati. I valori possibili sono "earliest" o "latest". Il valore predefinito è "latest".

    • "startingTimestamp": (opzionale, supportato solo per AWS Glue versione 4.0 o successiva). Il timestamp del record nell'argomento Kafka da cui leggere i dati. Il valore possibile è una stringa timestamp in formato UTC nel modello yyyy-mm-ddTHH:MM:SSZ, dove Z rappresenta un offset del fuso orario UTC con un segno +/- (ad esempio: "2023-04-04T08:00:00-04:00").

      Nota: nell'elenco delle opzioni di connessione dello script di flussi di dati di AWS Glue può essere presente solo un valore tra “startingOffsets” o “startingTimestamp”; l'inclusione di entrambe queste proprietà comporterà un errore del processo.

    • "endingOffsets": (opzionale) il punto di fine di una query batch. I valori possibili sono "latest" o una stringa JSON che specifica un offset finale per ogni TopicPartition.

      Per la stringa JSON, il formato è {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}. Il valore -1 come offset rappresenta "latest".

    • "pollTimeoutMs": (opzionale) il timeout in millisecondi per il polling dei dati da Kafka negli executor del processo Spark. Il valore predefinito è 600000.

    • "numRetries": (opzionale) i numero di tentativi prima di non riuscire a recuperare gli offset Kafka. Il valore predefinito è 3.

    • "retryIntervalMs": (opzionale) il tempo di attesa in millisecondi prima di riprovare a recuperare gli offset Kafka. Il valore predefinito è 10.

    • "maxOffsetsPerTrigger": (opzionale) il limite di velocità sul numero massimo di offset elaborati per intervallo di trigger. Il numero totale di offset specificato viene suddiviso proporzionalmente tra topicPartitions di diversi volumi. Il valore di default è null, il che significa che il consumer legge tutti gli offset fino all'ultimo offset noto.

    • "minPartitions": (opzionale) il numero minimo desiderato di partizioni da leggere da Kafka. Il valore di default è null, il che significa che il numero di partizioni Spark è uguale al numero di partizioni Kafka.

    • "includeHeaders": (opzionale) indica se includere le intestazioni Kafka. Quando l'opzione è impostata su "true", l'output dei dati conterrà una colonna aggiuntiva denominata "glue_streaming_kafka_headers" con tipo Array[Struct(key: String, value: String)]. Il valore di default è "false". Questa opzione è disponibile in AWS Glue versione 3.0 o successive.

    • "schema": (obbligatorio quando inferSchema è impostato su false) lo schema da utilizzare per elaborare il payload. Se la classificazione è avro, lo schema fornito dovrà essere nel formato dello schema Avro. Se la classificazione è avro, lo schema fornito dovrà essere nel formato dello schema DDL.

      Di seguito sono riportati alcuni esempi di schema.

      Example in DDL schema format
      'column1' INT, 'column2' STRING , 'column3' FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema": (facoltativo) il valore di default è "false". Se impostato su "true", lo schema verrà rilevato in fase di runtime dal payload all'interno di foreachbatch.

    • "avroSchema": (obsoleto) parametro utilizzato per specificare uno schema di dati Avro quando viene utilizzato il formato Avro. Questo parametro è obsoleto. Utilizzo del parametro schema.

    • "addRecordTimestamp": (opzionale) Quando questa opzione è impostata su "true", l'output dei dati conterrà una colonna aggiuntiva denominata "__src_timestamp" che indica l'ora in cui il record corrispondente è stato ricevuto dall'argomento. Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.

    • "emitConsumerLagMetrics": (opzionale) Quando questa opzione è impostata su "true", per ogni batch, emetterà i parametri relativi alla durata tra il record più vecchio ricevuto dall'argomento e l'ora in cui arriva in AWS Glue a CloudWatch. Il nome del parametro è "glue.driver.streaming.maxConsumerLaginMs". Il valore predefinito è "false". Questa opzione è supportata in AWS Glue versione 4.0 o successive.

    Per la scrittura, utilizzare le seguenti opzioni di connessione con "connectionType": "kafka":

    • "connectionName" (obbligatorio) Nome della connessione AWS Glue utilizzata per connettersi al cluster Kafka (simile all'origine Kafka).

    • "topic" (obbligatorio) Se esiste una colonna di argomento, il relativo valore viene utilizzato come argomento quando si scrive la riga specifica in Kafka, a meno che non sia impostata l'opzione di configurazione dell'argomento. In altre parole, l'opzione di configurazione topic sovrascrive la colonna dell'argomento.

    • "partition" (opzionale) Se viene specificato un numero di partizione valido, partition verrà utilizzato per l'invio del record.

      Se non viene specificata alcuna partizione ma è presente key, verrà scelta una partizione utilizzando un hash della chiave.

      Se non sono presenti né keypartition, verrà scelta una partizione in base al partizionamento permanente delle modifiche quando nella partizione vengono prodotti almeno batch.size byte.

    • "key" (opzionale) Utilizzato per il partizionamento se partition è null.

    • "classification" (opzionale) Il formato di file utilizzato dai dati nel record. Supportiamo solo JSON, CSV e Avro.

      Con il formato Avro, possiamo fornire un AvroSchema personalizzato con cui serializzare, ma bisogna considerare che tale schema deve essere fornito anche nell'origine per la deserializzazione. In caso contrario, per la serializzazione viene utilizzato l'AvroSchema Apache per impostazione predefinita.

    Inoltre, è possibile eseguire il fine-tuning del sink Kafka in base alle esigenze aggiornando i parametri di configurazione del produttore di Kafka. Da notare che non esiste un elenco delle opzioni di connessione consentite, tutte le coppie chiave-valore vengono mantenute nel sink così come sono.

    Tuttavia, esiste un piccolo elenco di opzioni di rifiuto che non avranno effetto. Per ulteriori informazioni, consultare le configurazioni specifiche di Kafka.