AWS Glue-Streaming-Verbindungen - AWS Glue

AWS Glue-Streaming-Verbindungen

In den folgenden Abschnitten finden Sie Informationen zur Verwendung von Verbindungen in AWS Glue-Streaming.

Arbeiten mit Kafka-Verbindungen

Sie können eine Kafka-Verbindung verwenden, um in Kafka-Datenströmen zu lesen oder zu schreiben, indem Sie Informationen nutzen, die in einer Datenkatalogtabelle gespeichert sind, oder indem Sie Informationen bereitstellen, um direkt auf den Datenstrom zuzugreifen. Die Verbindung unterstützt einen Kafka-Cluster oder einem Cluster von Amazon Managed Streaming for Apache Kafka. Sie können Informationen aus Kafka in einen Spark DataFrame einlesen und dann in einen AWS Glue DynamicFrame konvertieren. Sie können DynamicFrames in einem JSON-Format in Kafka schreiben. Wenn Sie direkt auf den Datenstrom zugreifen, verwenden Sie diese Optionen, um Informationen zum Zugriff auf den Datenstrom bereitzustellen.

Wenn Sie getCatalogSource oder create_data_frame_from_catalog verwenden, um Datensätze aus einer Kafka-Streaming-Quelle zu verbrauchen, oder getCatalogSink oder write_dynamic_frame_from_catalog, um Datensätze in Kafka zu schreiben, enthält der Auftrag die Informationen zur Datenkatalog-Datenbank und Tabellennamen und kann diese verwenden, um einige grundlegende Parameter für das Lesen aus der Kafka-Streaming-Quelle zu erhalten. Wenn Sie getSource, getCatalogSink, getSourceWithFormat, getSinkWithFormat, createDataFrameFromOptions oder create_data_frame_from_options oder write_dynamic_frame_from_catalog verwenden, müssen Sie diese grundlegenden Parameter mithilfe der hier beschriebenen Verbindungsoptionen angeben.

Sie können die Verbindungsoptionen für Kafka mit den folgenden Argumenten für die angegebenen Methoden in der GlueContext-Klasse angeben.

  • Scala

    • connectionOptions: mit getSource, createDataFrameFromOptions, getSink verwenden

    • additionalOptions: mit getCatalogSource, getCatalogSink verwenden

    • options: mit getSourceWithFormat, getSinkWithFormat verwenden

  • Python

    • connection_options: mit create_data_frame_from_options, write_dynamic_frame_from_options verwenden

    • additional_options: mit create_data_frame_from_catalog, write_dynamic_frame_from_catalog verwenden

    • options: mit getSource, getSink verwenden

Hinweise und Einschränkungen zum Streaming von ETL-Aufträgen finden Sie unter Hinweise zu und Einschränkungen für Streaming-ETL.

Themen

    Kafka konfigurieren

    Für die Verbindung mit über das Internet verfügbaren Kafka-Streams sind keine AWS-Voraussetzungen erforderlich.

    Sie können eine AWS-Glue-Kafka-Verbindung erstellen, um Ihre Verbindungsanmeldeinformationen zu verwalten. Weitere Informationen finden Sie unter Erstellen einer AWS Glue-Verbindung für einen Apache-Kafka-Datenstrom. Geben Sie in Ihrer AWS-Glue-Auftragskonfiguration connectionName als zusätzliche Netzwerkverbindung an und geben Sie dann in Ihrem Methodenaufruf connectionName für den connectionName-Parameter an.

    In bestimmten Fällen müssen Sie zusätzliche Voraussetzungen konfigurieren:

    • Wenn Sie Amazon Managed Streaming für Apache Kafka mit IAM-Authentifizierung verwenden, benötigen Sie eine entsprechende IAM-Konfiguration.

    • Wenn Sie Amazon Managed Streaming für Apache Kafka innerhalb einer Amazon VPC verwenden, benötigen Sie eine entsprechende Amazon-VPC-Konfiguration. Sie müssen eine AWS-Glue-Verbindung erstellen, die Amazon VPC-Verbindungsinformationen bereitstellt. Sie benötigen in Ihrer Auftragskonfiguration die AWS-Glue-Verbindung als zusätzliche Netzwerkverbindung.

    Weitere Informationen zu den Voraussetzungen für Streaming-ETL-Aufträgen finden Sie unter Streaming-ETL-Aufträge in AWS Glue.

    Beispiel: Aus Kafka-Streams lesen

    Verwendet in Verbindung mit forEachBatch.

    Beispiel für die Kafka-Streaming-Quelle:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Beispiel: In Kafka-Streams schreiben

    Beispiele für das Schreiben in Kafka:

    Beispiel mit der getSink-Methode:

    data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

    Beispiel mit der write_dynamic_frame.from_options-Methode:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Referenz zur Kafka-Verbindungsoption

    Verwenden Sie beim Lesen die folgenden Verbindungsoptionen mit "connectionType": "kafka":

    • "bootstrap.servers" (Erforderlich) Eine Liste von Bootstrap-Server-URLs, z. B. b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094. Diese Option muss im API-Aufruf angegeben oder in den Tabellenmetadaten im Data Catalog definiert werden.

    • "security.protocol" (Erforderlich) Das Protokoll, das für die Kommunikation mit Brokern verwendet wird. Die möglichen Werte sind "SSL" oder "PLAINTEXT".

    • "topicName" (Erforderlich) Eine durch Kommas getrennte Liste von Themen, die abonniert werden sollen. Sie müssen nur eines der folgenden "topicName", "assign" oder "subscribePattern" angeben.

    • "assign": (Erforderlich) Eine JSON-Zeichenfolge, welche den spezifischen TopicPartitions zum Konsumieren angeben. Sie müssen nur eines der folgenden "topicName", "assign" oder "subscribePattern" angeben.

      Beispiel: '{"topicA":[0,1],"topicB":[2,4]}'

    • "subscribePattern": (Erforderlich) Eine Java-Regex-Zeichenfolge, die die Themenliste identifiziert, die abonniert werden soll. Sie müssen nur eines der folgenden "topicName", "assign" oder "subscribePattern" angeben.

      Beispiel: 'topic.*'

    • "classification" (Erforderlich) Das von den Daten im Datensatz verwendete Dateiformat. Erforderlich, sofern nicht in Data Catalog angegeben.

    • "delimiter" (Optional) Das verwendete Werttrennzeichen, wenn classification CSV ist. Der Standardwert ist „,„.

    • "startingOffsets": (Optional) Die Ausgangsposition im Kafka-Thema, aus dem Daten gelesen werden sollen. Die möglichen Werte sind "earliest" oder "latest". Der Standardwert ist "latest".

    • "startingTimestamp": (Optional, nur für AWS-Glue-Version 4.0 oder höher unterstützt) Der Zeitstempel des Datensatzes im Kafka-Thema, aus dem Daten gelesen werden sollen. Der mögliche Wert ist eine Zeitstempelzeichenfolge im UTC-Format im Muster yyyy-mm-ddTHH:MM:SSZ (wobei Z einen UTC-Zeitzonenversatz mit einem +/- darstellt. Beispiel: „2023-04-04T08:00:00-04:00“).

      Hinweis: In der Liste der Verbindungsoptionen des AWS-Glue-Streaming-Skripts darf nur einer von „startingOffsets“ oder „startingTimestamp“ vorhanden sein. Die Einbeziehung dieser beiden Eigenschaften führt zum Fehlschlagen des Auftrags.

    • "endingOffsets": (Optional) Der Endpunkt, wenn eine Batchabfrage beendet wird. Die möglichen Werte sind entweder "latest" oder eine JSON-Zeichenfolge, die einen Offset für das Ende jeder TopicPartition angibt.

      Für die JSON-Zeichenfolge lautet das Format {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}. Der Wert -1 als Offset steht für "latest".

    • "pollTimeoutMs": (Optional) Das Timeout in Millisekunden, um Daten von Kafka in Spark-Auftragsausführungen abzufragen. Der Standardwert ist 600000.

    • "numRetries": (Optional) Die Anzahl, wie oft erneute Versuche durchgeführt werden sollen, bevor Kafka-Offsets nicht abgerufen werden. Der Standardwert ist 3.

    • "retryIntervalMs": (Optional) Die Wartezeit in Millisekunden, bevor Sie erneut versuchen, Kafka-Offsets abzurufen. Der Standardwert ist 10.

    • "maxOffsetsPerTrigger": (Optional) Die Ratengrenze für die maximale Anzahl von Offsets, die pro Triggerintervall verarbeitet werden. Die angegebene Gesamtzahl der Offsets wird proportional auf topicPartitionsvon verschiedenen Volumes aufgeteilt. Der Standardwert ist null, was bedeutet, dass der Verbraucher alle Offsets bis zum bekannten letzten Offset liest.

    • "minPartitions": (Optional) Die gewünschte Mindestanzahl an Partitionen, die von Kafka gelesen werden sollen. Der Standardwert ist null, was bedeutet, dass die Anzahl der Spark-Partitionen gleich der Anzahl der Kafka-Partitionen ist.

    • "includeHeaders": (Optional) Gibt an, ob die Kafka-Header eingeschlossen werden sollen. Wenn die Option auf „true“ gesetzt ist, enthält die Datenausgabe eine zusätzliche Spalte mit dem Namen „glue_streaming_kafka_headers“ mit dem Typ Array[Struct(key: String, value: String)]. Der Standardwert ist „false“. Diese Option ist nur in AWS Glue Version 3.0 oder höher verfügbar.

    • "schema": (Erforderlich, wenn inferSchema auf „false“ festgelegt ist) Das Schema, das zur Verarbeitung der Nutzlast verwendet werden soll. Wenn die Klassifizierung avro ist, muss das bereitgestellte Schema im Avro-Schemaformat vorliegen. Wenn die Klassifizierung nicht avro ist, muss das bereitgestellte Schema im DDL-Schemaformat vorliegen.

      Im Folgenden finden Sie Beispiele für Schemata.

      Example in DDL schema format
      'column1' INT, 'column2' STRING , 'column3' FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema": (Optional) Der Standardwert ist „false“. Wenn auf „true“ gesetzt, wird das Schema zur Laufzeit von der Nutzlast in foreachbatch erkannt.

    • "avroSchema": (Veraltet) Parameter, der verwendet wird, um ein Schema von Avro-Daten anzugeben, wenn das Avro-Format verwendet wird. Dieser Parameter ist jetzt veraltet. Verwenden Sie den Parameter schema.

    • "addRecordTimestamp": (Optional) Wenn diese Option auf „true“ gesetzt ist, enthält die Datenausgabe eine zusätzliche Spalte mit dem Namen „__src_timestamp“, die den Zeitpunkt angibt, zu dem der entsprechende Datensatz beim Thema eingegangen ist. Der Standardwert von "false". Diese Option wird in AWS Glue Version 4.0 oder höher unterstützt.

    • "emitConsumerLagMetrics": (Optional) Wenn diese Option auf „true“ gesetzt ist, werden für jeden Batch die Metriken für die Dauer zwischen dem ältesten vom Thema empfangenen Datensatz und dem Zeitpunkt, zu dem er in AWS Glue bei CloudWatch eintrifft, ausgegeben. Der Name der Metrik lautet „glue.driver.streaming.maxConsumerLagInMs“. Der Standardwert von "false". Diese Option wird in AWS Glue Version 4.0 oder höher unterstützt.

    Verwenden Sie beim Schreiben die folgenden Verbindungsoptionen mit "connectionType": "kafka":

    • "connectionName" (Erforderlich) Name der AWS-Glue-Verbindung, die für die Verbindung mit dem Kafka-Cluster verwendet wird (ähnlich der Kafka-Quelle).

    • "topic" (Erforderlich) Wenn eine Themenspalte vorhanden ist, wird ihr Wert als Thema verwendet, wenn die angegebene Zeile in Kafka geschrieben wird, sofern die Themenkonfigurationsoption nicht festgelegt ist. Das heißt, die topic-Konfigurationsoption überschreibt die Themenspalte.

    • "partition" (Optional) Wenn eine gültige Partitionsnummer angegeben ist, wird diese partition beim Senden des Datensatzes verwendet.

      Wenn keine Partition angegeben ist, aber ein key vorhanden ist, wird eine Partition anhand eines Hashs des Schlüssels ausgewählt.

      Falls key weder noch vorhanden partition ist, wird eine Partition auf der Grundlage von Sticky-Partitionierung ausgewählt, wenn für die Partition mindestens batch.size Bytes erzeugt werden.

    • "key" (Optional) Wird für die Partitionierung verwendet, wenn partition NULL ist.

    • "classification" (Optional) Das von den Daten im Datensatz verwendete Dateiformat. Wir unterstützen nur JSON, CSV und Avro.

      Mit dem Avro-Format können wir ein benutzerdefiniertes AvroSchema für die Serialisierung bereitstellen. Beachten Sie jedoch, dass dieses auch in der Quelle für die Deserialisierung bereitgestellt werden muss. Andernfalls wird standardmäßig das Apache AvroSchema für die Serialisierung verwendet.

    Darüber hinaus können Sie die Kafka-Senke nach Bedarf optimieren, indem Sie die Konfigurationsparameter des Kafka-Producers aktualisieren. Beachten Sie, dass es keine Zulassungsliste für Verbindungsoptionen gibt, da alle Schlüssel-Wert-Paare unverändert auf der Senke gespeichert werden.

    Es gibt jedoch eine kleine Sperrliste von Optionen, die nicht wirksam werden. Weitere Informationen finden Sie unter Kafka-spezifische Konfigurationen.

    Arbeiten mit Kinesis-Verbindungen

    Sie können eine Kinesis-Verbindung verwenden, um in Amazon Kinesis Data Streams zu lesen oder zu schreiben, indem Sie Informationen nutzen, die in einer Datenkatalogtabelle gespeichert sind, oder indem Sie Informationen bereitstellen, um direkt auf den Datenstrom zuzugreifen. Sie können Informationen aus Kinesis in einem Spark DataFrame auslesen und sie dann in einen AWS Glue DynamicFrame konvertieren. Sie können DynamicFrames in einem JSON-Format in Kinesis schreiben. Wenn Sie direkt auf den Datenstrom zugreifen, verwenden Sie diese Optionen, um Informationen zum Zugriff auf den Datenstrom bereitzustellen.

    Wenn Sie getCatalogSource oder create_data_frame_from_catalog verwenden, um Einträge aus einer Kinesis-Streamingquelle zu verbrauchen, enthält der Auftrag die Informationen zu Data-Catalog-Datenbank und Tabellennamen und kann diese verwenden, um einige grundlegende Parameter für das Lesen aus der Kinesis-Streaming-Quelle zu erhalten. Wenn Sie getSource, getSourceWithFormat, createDataFrameFromOptions oder create_data_frame_from_options verwenden, müssen Sie diese grundlegenden Parameter mithilfe der hier beschriebenen Verbindungsoptionen angeben.

    Sie können die Verbindungsoptionen für Kinesis mit den folgenden Argumenten für die angegebenen Methoden in der GlueContext-Klasse angeben.

    • Scala

      • connectionOptions: mit getSource, createDataFrameFromOptions, getSink verwenden

      • additionalOptions: mit getCatalogSource, getCatalogSink verwenden

      • options: mit getSourceWithFormat, getSinkWithFormat verwenden

    • Python

      • connection_options: mit create_data_frame_from_options, write_dynamic_frame_from_options verwenden

      • additional_options: mit create_data_frame_from_catalog, write_dynamic_frame_from_catalog verwenden

      • options: mit getSource, getSink verwenden

    Hinweise und Einschränkungen zu Streaming-ETL-Aufträgen finden Sie unter Hinweise zu und Einschränkungen für Streaming-ETL.

    Kinesis konfigurieren

    Um in einem AWS-Glue-Spark-Auftrag eine Verbindung zu einem Kinesis-Datenstrom herzustellen, müssen einige Voraussetzungen erfüllt sein:

    • Beim Lesen muss der AWS-Glue-Auftrag über IAM-Berechtigungen auf Lesezugriffsebene für den Kinesis-Datenstrom verfügen.

    • Beim Schreiben muss der AWS-Glue-Auftrag über IAM-Berechtigungen auf Schreibzugriffsebene für den Kinesis-Datenstrom verfügen.

    In bestimmten Fällen müssen Sie zusätzliche Voraussetzungen konfigurieren:

    • Wenn Ihr AWS-Glue-Auftrag mit zusätzlichen Netzwerkverbindungen konfiguriert ist (typischerweise zum Herstellen einer Verbindung mit anderen Datensätzen) und eine dieser Verbindungen Netzwerkoptionen für Amazon VPC bereitstellt, wird Ihr Auftrag dadurch angewiesen, über Amazon VPC zu kommunizieren. In diesem Fall müssen Sie auch Ihren Kinesis-Datenstrom für die Kommunikation über Amazon VPC konfigurieren. Sie können dies tun, indem Sie einen Schnittstellen-VPC-Endpunkt zwischen Ihrer Amazon VPC und dem Kinesis-Datenstrom erstellen. Weitere Informationen finden Sie unter Verwenden von Kinesis Data Streams mit Schnittstellen-VPC-Endpunkten.

    • Wenn Sie Amazon Kinesis Data Streams in einem anderen Konto angeben, müssen Sie die Rollen und Richtlinien einrichten, um den kontoübergreifenden Zugriff zu ermöglichen. Weitere Informationen finden Sie unter Beispiel: Aus einem Kinesis Stream in einem anderen Konto lesen.

    Weitere Informationen zu den Voraussetzungen für Streaming-ETL-Aufträgen finden Sie unter Streaming-ETL-Aufträge in AWS Glue.

    Aus Kinesis lesen

    Beispiel: Lesen aus Kinesis-Streams

    Verwendet in Verbindung mit forEachBatch.

    Beispiel für Amazon-Kinesis-Streaming-Quelle:

    kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

    In Kinesis schreiben

    Beispiel: In Kinesis-Streams schreiben

    Verwendet in Verbindung mit forEachBatch. Ihr DynamicFrame wird im JSON-Format in den Stream geschrieben. Wenn der Auftrag nach mehreren Versuchen nicht schreiben kann, schlägt er fehl. DynamicFrame-Datensätze werden standardmäßig einzeln an den Kinesis-Stream gesendet. Sie können dieses Verhalten mithilfe von aggregationEnabled und zugehörigen Parametern konfigurieren.

    Beispiel für das Schreiben in Amazon Kinesis von einem Streaming-Auftrag aus:

    Python
    glueContext.write_dynamic_frame.from_options( frame=frameToWrite connection_type="kinesis", connection_options={ "partitionKey": "part1", "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName", } )
    Scala
    glueContext.getSinkWithFormat( connectionType="kinesis", options=JsonOptions("""{ "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName", "partitionKey": "part1" }"""), ) .writeDynamicFrame(frameToWrite)

    Kinesis-Verbindungsparameter

    Bezeichnet Verbindungoptionen zu Amazon Kinesis Data Streams.

    Verwenden Sie die folgenden Verbindungsoptionen für Kinesis-Streamingdatenquellen:

    • "streamARN" (Erforderlich) Wird zum Lesen/Schreiben verwendet. Der ARN des Kinesis-Datenstroms.

    • "classification" (Zum Lesen erforderlich) Wird zum Lesen verwendet. Das von den Daten im Datensatz verwendete Dateiformat. Erforderlich, sofern nicht in Data Catalog angegeben.

    • "streamName" – (Optional) Wird zum Lesen verwendet. Der Name eines Kinesis-Datenstroms, aus dem gelesen wird. Wird mit endpointUrl verwendet.

    • "endpointUrl" – (Optional) Wird zum Lesen verwendet. Standard: „https://kinesis.us-east-1.amazonaws.com“. Der AWS-Endpunkt des Kinesis-Streams. Sie müssen dies nicht ändern, es sei denn, Sie stellen eine Verbindung zu einer bestimmten Region her.

    • "partitionKey" – (Optional) Wird zum Schreiben verwendet. Der Kinesis-Partitionsschlüssel, der bei der Erstellung von Datensätzen verwendet wird.

    • "delimiter" (Optional) Wird zum Lesen verwendet. Das verwendete Werttrennzeichen, wenn classification CSV ist. Der Standardwert ist „,„.

    • "startingPosition": (Optional) Wird zum Lesen verwendet. Die Ausgangsposition im Kinesis Data Stream, von dem Daten gelesen werden sollen. Die möglichen Werte sind "latest", "trim_horizon", "earliest" oder eine Zeitstempelzeichenfolge im UTC-Format im Muster yyyy-mm-ddTHH:MM:SSZ (wobei Z einen UTC-Zeitzonenversatz mit einem +/- darstellt. Zum Beispiel „2023-04-04T08:00:00-04:00“). Der Standardwert ist "latest". Hinweis: Die Zeitstempelzeichenfolge im UTC-Format für "startingPosition" wird nur für AWS Glue Version 4.0 oder höher unterstützt.

    • "failOnDataLoss": (Optional) Lassen Sie den Auftrag fehlschlagen, wenn ein aktiver Shard fehlt oder abgelaufen ist. Der Standardwert ist "false".

    • "awsSTSRoleARN": (Optional) Wird zum Schreiben/Lesen verwendet. Der Amazon-Ressourcenname (ARN) der zu übernehmenden Rolle unter Verwendung von AWS Security Token Service (AWS STS). Diese Rolle muss über Berechtigungen zum Beschreiben oder Lesen von Datensatzoperationen für den Kinesis-Datenstrom verfügen. Sie müssen diesen Parameter verwenden, wenn Sie auf einen Datenstrom in einem anderen Konto zugreifen. Verwendet in Verbindung mit "awsSTSSessionName".

    • "awsSTSSessionName": (Optional) Wird zum Schreiben/Lesen verwendet. Ein Bezeichner für die Sitzung, die die Rolle unter Verwendung von AWS STS übernimmt. Sie müssen diesen Parameter verwenden, wenn Sie auf einen Datenstrom in einem anderen Konto zugreifen. Verwendet in Verbindung mit "awsSTSRoleARN".

    • "awsSTSEndpoint": (Optional) Der zu verwendende AWS STS-Endpunkt, wenn mit einer angenommenen Rolle eine Verbindung zu Kinesis hergestellt wird. Dies ermöglicht die Verwendung des regionalen AWS STS-Endpunkts in einer VPC, was mit dem standardmäßigen globalen Endpunkt nicht möglich ist.

    • "maxFetchTimeInMs": (Optional) Wird zum Lesen verwendet. Die maximale Zeit, die der Auftrags-Executor zum Lesen der Datensätze für den aktuellen Batch aus dem Kinesis-Datenstrom benötigt, angegeben in Millisekunden (ms). Innerhalb dieser Zeit können mehrere GetRecords-API-Aufrufe getätigt werden. Der Standardwert ist 1000.

    • "maxFetchRecordsPerShard": (Optional) Wird zum Lesen verwendet. Die maximale Anzahl von Datensätzen, die pro Shard im Kinesis-Datenstrom pro Microbatch abgerufen werden sollen. Hinweis: Der Client kann dieses Limit überschreiten, wenn der Streaming-Auftrag bereits zusätzliche Datensätze von Kinesis gelesen hat (im selben get-records-Aufruf). Wenn maxFetchRecordsPerShard streng sein muss, muss es ein Vielfaches von maxRecordPerRead sein. Der Standardwert ist 100000.

    • "maxRecordPerRead": (Optional) Wird zum Lesen verwendet. Die maximale Anzahl von Datensätzen, die bei jeder getRecords-Operation aus dem Kinesis-Datenstrom abgerufen werden sollen. Der Standardwert ist 10000.

    • "addIdleTimeBetweenReads": (Optional) Wird zum Lesen verwendet. Fügt eine Zeitverzögerung zwischen zwei aufeinanderfolgenden getRecords-Operationen ein. Der Standardwert ist "False". Diese Option ist nur für Glue 2.0 und höher konfigurierbar.

    • "idleTimeBetweenReadsInMs": (Optional) Wird zum Lesen verwendet. Die minimale Zeitverzögerung zwischen zwei aufeinanderfolgenden getRecords-Operationen, angegeben in Millisekunden (ms). Der Standardwert ist 1000. Diese Option ist nur für Glue 2.0 und höher konfigurierbar.

    • "describeShardInterval": (Optional) Wird zum Lesen verwendet. Das minimale Zeitintervall zwischen zwei ListShards-API-Aufrufen für die Erwägung eines erneuten Shardings durch Ihr Skript. Weitere Informationen finden Sie unter Strategies for Resharding (Strategien für das Resharding) im Entwicklerhandbuch für Amazon Kinesis Data Streams. Der Standardwert ist 1s.

    • "numRetries": (Optional) Wird zum Lesen verwendet. Die maximale Anzahl erneuter Versuche für API-Aufrufe von Kinesis Data Streams. Der Standardwert ist 3.

    • "retryIntervalMs": (Optional) Wird zum Lesen verwendet. Die Abkühlzeit (angegeben in ms) vor dem erneuten Versuch des API-Aufrufs von Kinesis Data Streams. Der Standardwert ist 1000.

    • "maxRetryIntervalMs": (Optional) Wird zum Lesen verwendet. Die maximale Abkühlzeit (angegeben in ms) zwischen zwei wiederholten Versuchen eines API-Aufrufs von Kinesis Data Streams. Der Standardwert ist 10000.

    • "avoidEmptyBatches": (Optional) Wird zum Lesen verwendet. Vermeidet das Erstellen eines leeren Mikrobatchauftrags, indem vor dem Start des Batches im Kinesis Data Stream nach ungelesenen Daten gesucht wird. Der Standardwert ist "False".

    • "schema": (Erforderlich, wenn inferSchema auf „false“ festgelegt ist) Wird zum Lesen verwendet. Das zu verwendende Schema für die Verarbeitung der Nutzlast. Wenn die Klassifizierung avro ist, muss das bereitgestellte Schema im Avro-Schemaformat vorliegen. Wenn die Klassifizierung nicht avro ist, muss das bereitgestellte Schema im DDL-Schemaformat vorliegen.

      Im Folgenden finden Sie Beispiele für Schemata.

      Example in DDL schema format
      `column1` INT, `column2` STRING , `column3` FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema": (Optional) Wird zum Lesen verwendet. Der Standardwert von "false". Wenn auf „true“ gesetzt, wird das Schema zur Laufzeit von der Nutzlast in foreachbatch erkannt.

    • "avroSchema": (Veraltet) Wird zum Lesen verwendet. Der Parameter, der verwendet wird, um ein Schema von Avro-Daten anzugeben, wenn das Avro-Format verwendet wird. Dieser Parameter ist jetzt veraltet. Verwenden Sie den Parameter schema.

    • "addRecordTimestamp": (Optional) Wird zum Lesen verwendet. Wenn diese Option auf 'true' gesetzt ist, enthält die Datenausgabe eine zusätzliche Spalte mit dem Namen „__src_timestamp“, die die Uhrzeit angibt, zu der der entsprechende Datensatz mit dem Stream empfangen wurde. Der Standardwert von "false". Diese Option wird in AWS Glue Version 4.0 oder höher unterstützt.

    • "emitConsumerLagMetrics": (Optional) Wird zum Lesen verwendet. Wenn die Option auf „true“ gesetzt ist, werden für jeden Batch die Metriken für die Dauer zwischen dem ältesten vom Stream empfangenen Datensatz und dem Zeitpunkt, zu dem er in AWS Glue bei CloudWatch eintrifft, ausgegeben. Der Name der Metrik lautet „glue.driver.streaming.maxConsumerLagInMs“. Der Standardwert von "false". Diese Option wird in AWS Glue Version 4.0 oder höher unterstützt.

    • "fanoutConsumerARN": (Optional) Wird zum Lesen verwendet. Der ARN eines Kinesis-Stream-Verbrauchers für den in streamARN angegebenen Stream. Wird verwendet, um den erweiterten Fan-Out-Modus für Ihre Kinesis-Verbindung zu aktivieren. Weitere Informationen zur Nutzung eines Kinesis-Streams mit erweitertem Fan-Out finden Sie unter Verwendung von erweitertem Fan-Out in Kinesis-Streaming-Aufträgen.

    • "recordMaxBufferedTime" – (Optional) Wird zum Schreiben verwendet. Standard: 1.000 (ms). Maximale Dauer, für die ein Datensatz gepuffert wird, während er darauf wartet, geschrieben zu werden.

    • "aggregationEnabled" – (Optional) Wird zum Schreiben verwendet. Standard: true Gibt an, ob Datensätze aggregiert werden sollen, bevor sie an Kinesis gesendet werden.

    • "aggregationMaxSize" – (Optional) Wird zum Schreiben verwendet. Standard: 51.200 (Byte). Wenn ein Datensatz dieses Limit übersteigt, wird der Aggregator umgangen. Hinweis Kinesis erzwingt ein Limit von 50 KB für die Größe eines Datensatzes. Wenn Sie diesen Wert auf mehr als 50 KB festlegen, werden übergroße Datensätze von Kinesis zurückgewiesen.

    • "aggregationMaxCount" – (Optional) Wird zum Schreiben verwendet. Standard: 4.294.967.295. Maximale Anzahl von Elementen, die in einen aggregierten Datensatz gepackt werden sollen.

    • "producerRateLimit" – (Optional) Wird zum Schreiben verwendet. Standard: 150 (%). Beschränkt den Durchsatz pro Shard, der von einem einzelnen Produzenten (z. B. Ihrem Auftrag) gesendet wird, als prozentualen Wert des Backend-Limits.

    • "collectionMaxCount" – (Optional) Wird zum Schreiben verwendet. Standard: 500. Maximale Anzahl von Elementen, die in eine PutRecords-Anfrage gepackt werden sollen.

    • "collectionMaxSize" – (Optional) Wird zum Schreiben verwendet. Standard: 5.242.880 (Byte). Maximale Menge an Daten, die mit einer PutRecords-Anfrage gesendet werden kann.