

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Glue 串流概念
<a name="glue-streaming-concepts"></a>

 下列各節提供有關 AWS Glue 串流概念的資訊。

**Topics**
+ [AWS Glue 串流任務的剖析](#glue-streaming-anatomy)

## AWS Glue 串流任務的剖析
<a name="glue-streaming-anatomy"></a>

 AWS Glue 串流任務在 Spark 串流範例上運作，並利用 Spark 架構的結構化串流。串流任務會以特定時間間隔持續輪詢串流資料來源，以擷取記錄作為微批次。下列各節會檢查 AWS Glue 串流任務的不同部分。

![螢幕擷取畫面顯示 Amazon CloudWatch 監控日誌， AWS Glue 針對上述範例，並查看所需的執行器數量 （橘色線），並擴展執行器 （藍色線） 以符合該日誌，而不需要手動調整。](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/glue-streaming-anatomy.png)


### forEachBatch
<a name="glue-streaming-anatomy-batch"></a>

 `forEachBatch` 方法是 AWS Glue 串流任務執行的進入點。 AWS Glue 串流任務使用 `forEachBatch`方法輪詢運作方式類似迭代器的資料，該迭代器在串流任務生命週期內保持作用中狀態，並定期輪詢串流來源以取得新資料，並以微批次處理最新資料。

```
glueContext.forEachBatch(
    frame=dataFrame_AmazonKinesis_node1696872487972,    
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds",
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
    },
)
```

 設定 `forEachBatch` 的 `frame` 屬性以指定串流來源。在此範例中，您在建立任務時在空白畫布建立的來源節點會填入任務的預設 DataFrame。將 `batch_function` 屬性設定為您決定針對每個微批次作業呼叫的 `function`。您必須定義函數來處理傳入資料的批次轉換。

### 來源
<a name="glue-streaming-anatomy-source"></a>

 在 `processBatch` 函數的第一個步驟，程式會驗證您定義作為 `forEachBatch` 框架屬性的 DataFrame 的記錄計數。程式會將擷取時間戳記附加至非空白的 DataFrame。`data_frame.count()>0` 子句會判斷最新的微批次是否並非空白，並且已準備好進行進一步處理。

```
def processBatch(data_frame, batchId):
    if data_frame.count() >0:       
        AmazonKinesis_node1696872487972 = DynamicFrame.fromDF(
            glueContext.add_ingestion_time_columns(data_frame, "hour"),
            glueContext,
            "from_data_frame",
        )
```

### 映射
<a name="glue-streaming-anatomy-mapping"></a>

 該程式的下一部分是套用映射。Spark DataFrame 上的 `Mapping.apply` 方法可讓您定義關於資料元素的轉換規則。一般來說，您可以重新命名和變更資料類型，或在來源資料欄上套用自訂函數，並將這些函數映射至目標欄。

```
    #Script generated for node ChangeSchema        
    ChangeSchema_node16986872679326 = ApplyMapping.apply(
        frame =  AmazonKinesis_node1696872487972,
        mappings = [
            ("eventtime", "string", "eventtime", "string"),
            ("manufacturer", "string", "manufacturer", "string"),
            ("minutevolume", "long", "minutevolume", "int"),
            ("o2stats", "long", "OxygenSaturation", "int"),
            ("pressurecontrol", "long", "pressurecontrol", "int"),
            ("serialnumber", "string", "serialnumber", "string"),
            ("ventilatorid", "long", "ventilatorid", "long"),
            ("ingest_year", "string", "ingest_year", "string"),
            ("ingest_month", "string", "ingest_month", "string"),
            ("ingest_day", "string", "ingest_day", "string"),
            ("ingest_hour", "string", "ingest_hour", "string"),
        ],
        transformation_ctx="ChangeSchema_node16986872679326",
    )
        )
```

### 接收
<a name="glue-streaming-anatomy-sink"></a>

 在這個部分，來自串流來源的傳入資料集會儲存在目標位置。在此範例中，我們會將資料寫入 Amazon S3 位置。`AmazonS3_node_path` 屬性詳細資料會根據您在建立任務期間所使用的設定從畫布預先填入。您可以根據您的使用案例來設定 `updateBehavior`，並決定不更新資料目錄表格，或在後續執行時建立資料目錄並更新資料目錄結構描述，或是建立目錄表格而不在後續執行時更新結構描述定義。

 `partitionKeys` 屬性定義了存儲分割區選項。預設行為是根據在來源區段中提供的 `ingestion_time_columns` 來分割資料。`compression` 屬性可讓您設定要在目標寫入期間套用的壓縮演算法。您可以選擇將壓縮技術設定為 Snappy、LZO 或 GZIP。`enableUpdateCatalog` 屬性可控制是否需要更新 AWS Glue 目錄表格。此屬性可用的選項為 `True` 或 `False`。

```
    #Script generated for node Amazon S3        
    AmazonS3_node1696872743449 = glueContext.getSink(
        path =  AmazonS3_node1696872743449_path,
        connection_type = "s3",
        updateBehavior = "UPDATE_IN_DATABASE",
        partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"],
        compression = "snappy",
        enableUpdateCatalog = True,
        transformation_ctx = "AmazonS3_node1696872743449",
    )
```

### AWS Glue 目錄接收器
<a name="glue-streaming-anatomy-catalog-sink"></a>

 任務的本節控制 AWS Glue 目錄資料表更新行為。根據您的 AWS Glue 目錄資料庫名稱和與您設計 AWS Glue 任務相關聯的資料表名稱來設定 `catalogDatabase`和 `catalogTableName` 屬性。您可以透過 `setFormat` 屬性定義目標資料的檔案格式。在這個例子中，我們會以 Parquet 格式儲存資料。

 設定並執行參考本教學課程的 AWS Glue 串流任務後， 產生的串流資料 Amazon Kinesis Data Streams 將儲存在具有快照壓縮的 Amazon S3 位置。成功執行串流任務後，您將可透過 Amazon Athena查詢資料。

```
       
    AmazonS3_node1696872743449 = setCatalogInfo(
        catalogDatabase =  "demo", catalogTableName = "demo_stream_transform_result"
    )
    AmazonS3_node1696872743449.setFormat("glueparquet")
    AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326")
    )
```