本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Kafka 連線
您可以使用 Kafka 連線來讀取和寫入 Kafka 資料串流,方法是使用儲存在 Data Catalog 資料表中的資訊,或提供資訊以直接存取資料串流。連線支援 Kafka 叢集或 Amazon Managed Streaming for Apache Kafka 叢集。您可以將 Kafka 的資訊讀取至 Spark DataFrame 中,然後將其轉換為 AWS Glue DynamicFrame。您可以使用 JSON 格式將 DynamicFrames 寫入 Kafka。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。
如果您使用 getCatalogSource 或 create_data_frame_from_catalog 來取用來自 Kinesis 串流來源的記錄,或者使用 getCatalogSink 或 write_dynamic_frame_from_catalog 將記錄寫入 Kafka,並且則該任務具有 Data Catalog 資料庫和資料表名稱資訊,並可以使用其來取得一些從 Kafka 串流來源讀取的基本參數。如果使用 getSource、getCatalogSink、getSourceWithFormat、getSinkWithFormat、createDataFrameFromOptions、create_data_frame_from_options 或 write_dynamic_frame_from_catalog,您必須使用此處描述的連線選項來指定這些基本參數。
您可以使用 GlueContext 類別中指定方法的下列引數來指定 Kafka 的連線選項。
-
Scala
-
connectionOptions:與getSource、createDataFrameFromOptions、getSink搭配使用 -
additionalOptions:與getCatalogSource、getCatalogSink搭配使用。 -
options:與getSourceWithFormat、getSinkWithFormat搭配使用。
-
-
Python
-
connection_options:與create_data_frame_from_options、write_dynamic_frame_from_options搭配使用。 -
additional_options:與create_data_frame_from_catalog、write_dynamic_frame_from_catalog搭配使用。 -
options:與getSource、getSink搭配使用。
-
如需有關串流 ETL 任務的注意事項和限制,請參閱 串流 ETL 注意事項和限制。
主題
設定 Kafka
透過網際網路連線到可用的 Kafka 串流沒有 AWS 先決條件。
您可以建立 AWS Glue Kafka 連線來管理連線憑證。如需更多詳細資訊,請參閱 為 Apache Kafka 資料串流建立 AWS Glue 連線。在您的 AWS Glue 任務組態中,提供 connectionName 作為其他網路連線,然後在您的方法呼叫中,提供 connectionName 給 connectionName 參數。
在某些情況下,您需要設定其他先決條件:
-
如果搭配 IAM 身分驗證使用 Amazon Managed Streaming for Apache Kafka,您會需要適當的 IAM 組態。
-
如果搭配 Amazon VPC 使用 Amazon Managed Streaming for Apache Kafka,您會需要適當的 Amazon VPC 組態。您需要建立可提供 Amazon VPC 連線資訊的 AWS Glue 連線。您需要任務組態,才能將 AWS Glue 連線納入為其他網路連線。
如需有關串流 ETL 任務先決條件的詳細資訊,請參閱 在 AWS Glue 中串流 ETL 任務。
範例:從 Kafka 串流讀取
搭配 forEachBatch 使用。
Kafka 串流來源範例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
範例:寫入 Kafka 串流
寫入 Kafka 的範例:
使用 getSink 方法的範例:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
使用 write_dynamic_frame.from_options 方法的範例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Kafka 連線選項參考
在讀取時,將下列連線選項與 "connectionType": "kafka" 搭配使用:
-
"bootstrap.servers"(必要) 自舉伺服器 URL 的清單,例如b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094。此選項必須在 API 呼叫中指定,或在 Data Catalog 的資料表中繼資料中定義。 -
"security.protocol"(必要) 用來與代理程式通訊的協定。可能的值為"SSL"或"PLAINTEXT"。 -
"topicName"(必要) 要訂閱的主題清單 (以逗號分隔)。您必須指定"topicName"、"assign"或"subscribePattern"其中一個。 -
"assign":(必要) JSON 字串,指定要消耗的特定TopicPartitions。您必須指定"topicName"、"assign"或"subscribePattern"其中一個。範例:'{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern":(必要) 識別要訂閱的主題清單的 Java regex 字串。您必須指定"topicName"、"assign"或"subscribePattern"其中一個。範例:'topic.*'
-
"classification"(必要) 記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。 -
"delimiter"(選用) 當classification為 CSV 時使用的值分隔符號。預設值為 ","。 -
"startingOffsets":(選用) 要從中讀取資料的 Kafka 主題的起始位置。可能的值為"earliest"或"latest"。預設值為"latest"。 -
"startingTimestamp":(選用,僅支援 AWS Glue 4.0 版或更新版本) 要從中讀取資料的 Kafka 主題中記錄的時間戳記。可能的值是yyyy-mm-ddTHH:MM:SSZ模式中 UTC 格式的時間戳記字串 (其中Z代表以 +/- 表示的 UTC 時區偏移。例如:"2023-04-04T08:00:00-04:00")。注意:AWS Glue 串流指令碼的連線選項清單中只能有 'startingOffsets' 或 'startingTimestamp',若同時包括這兩個屬性,會導致任務失敗。
-
"endingOffsets":(選用) 批次查詢結束時的終點。可能值為"latest"或指定每個TopicPartition結束偏移的 JSON 字串。對於 JSON 字串,格式為
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}。值-1作為偏移代表"latest"。 -
"pollTimeoutMs":(選用) 在 Spark 任務執行器中從 Kafka 輪詢資料的逾時 (以毫秒為單位)。預設值為600000。 -
"numRetries":(選用) 擷取 Kafka 位移失敗之前,要重試的次數。預設值為3。 -
"retryIntervalMs":(選用) 重試擷取 Kafka 偏移量之前等待的時間 (毫秒)。預設值為10。 -
"maxOffsetsPerTrigger":(選用) 每個觸發間隔所處理之偏移數目上限的速率限制。指定的偏移總數會按比例跨topicPartitions或不同磁碟區而分割。預設值為 null,這表示消費者讀取所有偏移,直到已知的最新偏移。 -
"minPartitions":(選用) 從 Kafka 讀取所需的分割區最小數量。預設值為 null,這表示 Spark 分割區的數量等於 Kafka 分割區的數量。 -
"includeHeaders":(選用) 是否包含 Kafka 標頭。當選項設定為「true」時,資料輸出將包含一個名為「glue_streaming_kafka_headers」的額外欄,其類型為Array[Struct(key: String, value: String)]。預設值為 "false"。此選項能在 AWS Glue 3.0 版或更新版中使用。 -
"schema":(當 InferSchema 設定為 false 時為必要) 用於處理承載的架構。如果分類為avro,提供的架構必須採用 Avro 架構格式。如果分類不是avro,提供的架構必須採用 DDL 架構格式。以下是架構範例。
-
"inferSchema":(選用) 預設值為 'false'。如果設為 'true',將在執行時間時從foreachbatch承載偵測架構。 -
"avroSchema":(已棄用) 使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用schema參數。 -
"addRecordTimestamp"︰(選用) 當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示主題收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。 -
"emitConsumerLagMetrics": (選用) 當該選項設定為 'true' 時,在介於主題收到最舊記錄與其在 AWS Glue 中到達 CloudWatch 的時間之間的持續時間,將會針對每個批次發出指標。指標的名稱為 "glue.driver.streaming.maxConsumerLagInMs"。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
在寫入時,將下列連線選項與 "connectionType": "kafka" 搭配使用:
-
"connectionName"(必要) 用於連線至 Kafka 叢集的 AWS Glue 連線名稱 (類似 Kafka 來源)。 -
"topic"(必要) 如果主題資料欄存在,則在將指定的資料列寫入 Kafka 時會使用其值作為主題,除非已設定主題組態選項。也就是說,topic組態選項會覆寫主題資料欄。 -
"partition"(選用) 如果指定有效的分區編號,則會在傳送記錄時使用該partition。如果未指定分區,但
key存在 ,則會使用金鑰的雜湊來選擇分區。如果既不存在
key也不存在partition,則當至少向分區產生 batch.size 個位元組時,將根據這些變更的黏性分區來選擇分區。 -
"key"(選用) 如果partition為 null,則用於分區。 -
"classification"(選用) 記錄中資料使用的檔案格式。我們僅支援 JSON、CSV 和 Avro。使用 Avro 格式,我們可以提供要序列化的自訂 avroSchema,但請注意,這也需要在來源上提供,以進行還原序列化。否則,預設會使用 Apache AvroSchema 進行序列化。
此外,您可以視需要更新 Kafka 生產者組態參數
但是,有一個不會生效的小型拒絕選項清單。如需詳細資訊,請參閱 Kafka 特定的組態