本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
進階 AWS Glue 串流概念
在當代以資料驅動的應用程式中,資料的重要性隨著時間推移而逐漸式微,其價值也從提供預測轉變為做出反應。因此,客戶希望能即時處理資料,以便更快地做出決策。處理即時資料饋送 (例如來自 IoT 感應器) 時,資料可能不會依序送達,或在擷取時由於網路延遲和其他來源相關的故障而導致處理延遲。作為 AWS Glue 平台的一部分,AWS Glue 串流以這些功能為基礎,以提供可擴展的無伺服器串流 ETL,並由 Apache Spark 結構化串流提供支援,讓使用者能夠進行即時資料處理。
在本主題中,我們將探討 AWS Glue 串流的進階串流概念和功能。
處理串流時的時間考量
處理串流時有四種時間概念:
-
Event-time:事件發生的時間。在大多數情況下,此欄位會內嵌在來源的 event-data 本身。
-
Event-time-window:兩個事件時間之間的時間範圍。如上圖所示,W1 是從 17:00 到 17:10 的事件時間範圍。每個事件時間範圍都是由多個事件組成的群組。
-
Trigger-time:觸發時間可控制資料處理和更新結果的頻率。這是微批次處理開始的時間。
-
Ingestion-time:將串流資料擷取至串流服務的時間。如果事件時間未嵌入至事件本身,在某些情況下,可將此時間用於衡量事件時段。
衡量事件時段
衡量事件時段是一種技術,您可以依事件時段將多個事件分組及彙整。我們將透過以下範例探討衡量事件時段的好處,以及運用此技術的時機。
根據業務使用案例,Spark 支援三種時段類型。
-
輪轉時段:您彙整的一系列固定長度的不重疊事件時段。
-
滑動時段:從「長度固定」這點來看與輪轉時段類似,但只要滑動的時段小於事件時段本身,則時段可以重疊或滑動。
-
工作階段時段:從輸入資料事件開始,只要在間隙或閒置時接收到輸入資料,此時段就會持續延長。視資料輸入而定,工作階段時段的時段長度可以是靜態或動態。
輪轉時段
輪轉時段是您彙整的一系列固定長度的不重疊事件時段。讓我們以真實的範例來說明這一點。
ABC 汽車公司想要為一款新品牌的跑車進行行銷活動。他們想挑一個跑車迷人數最多的城市。為了達到這個目標,他們在網站上展示了一段介紹該車款的 15 秒簡短廣告。會記錄並串流所有「點選」和對應的「城市」 Amazon Kinesis Data Streams。我們想算出在 10 分鐘的時段中的點擊次數,並依城市進行分組,以了解哪個城市的需求最高。以下為彙整的輸出結果。
| window_start_time | window_end_time | 城市 | total_clicks |
|---|---|---|---|
| 2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 達拉斯 | 75 |
| 2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 芝加哥 | 10 |
| 2023-07-10 17:20:00 | 2023-07-10 17:30:00 | 達拉斯 | 20 |
| 2023-07-10 17:20:00 | 2023-07-10 17:30:00 | 芝加哥 | 50 |
如上所述,這些事件時段與觸發時間間隔不同。舉例來說,即使觸發時間是每分鐘一次,輸出結果也只會顯示 10 分鐘的不重疊彙總時段。為了取得最佳結果,觸發間隔最好能與事件時段一致。
在上表中,達拉斯在 17:00-17:10 這個時段共獲得 75 次點擊,芝加哥則有 10 次點擊。此外,任何城市在 17:10 - 17:20 這個時段都沒有資料,因此系統略過了這個時段。
現在,您可以透過下游分析應用程式對此資料進行進一步分析,以確定最適合進行行銷活動的城市。
在 AWS Glue 使用輪轉時段
-
建立 a Amazon Kinesis Data Streams DataFrame 並從中讀取。範例:
parsed_df = kinesis_raw_df \ .selectExpr('CAST(data AS STRING)') \ .select(from_json("data", ticker_schema).alias("data")) \ .select('data.event_time','data.ticker','data.trade','data.volume', 'data.price') -
處理輪轉時段的資料。在下面的範例中,資料會根據 10 分鐘輪轉時段的輸入欄位「event_time」分組,並將輸出寫入到 Amazon S3 資料湖中。
grouped_df = parsed_df \ .groupBy(window("event_time", "10 minutes"), "city") \ .agg(sum("clicks").alias("total_clicks")) summary_df = grouped_df \ .withColumn("window_start_time", col("window.start")) \ .withColumn("window_end_time", col("window.end")) \ .withColumn("year", year("window_start_time")) \ .withColumn("month", month("window_start_time")) \ .withColumn("day", dayofmonth("window_start_time")) \ .withColumn("hour", hour("window_start_time")) \ .withColumn("minute", minute("window_start_time")) \ .drop("window") write_result = summary_df \ .writeStream \ .format("parquet") \ .trigger(processingTime="10 seconds") \ .option("checkpointLocation", "s3a://bucket-stock-stream/stock-stream-catalog-job/checkpoint/") \ .option("path", "s3a://bucket-stock-stream/stock-stream-catalog-job/summary_output/") \ .partitionBy("year", "month", "day") \ .start()
滑動時段
從「長度固定」這點來看,滑動時段與輪轉時段類似,但只要滑動的時段小於事件時段本身,則時段可以重疊或滑動。由於可滑動的性質,輸入資料可以綁定至多個時段。
為了更清楚地了解這一點,讓我們舉個例子,來看一家想要找出疑似信用卡詐騙的銀行。串流應用程式可以監控連續串流的信用卡交易。這些交易可以每 10 分鐘的時段進行彙總,每 5 分鐘,時段就會向前滑動,將最久之前的 5 分鐘的資料消除,新增最近 5 分鐘的新資料。在每一個時段,可依國家/地區將交易分組,檢查是否存在可疑模式,例如在美國進行交易後,緊接著立即在澳洲進行另一筆交易。為了簡單起見,當總交易金額大於 $100 美元,我們就將該交易歸類為詐騙。一旦偵測到這類模式,系統就會發出疑似詐騙訊號,並可能將該信用卡凍結。
信用卡處理系統會針對每個信用卡識別碼和國家/地區,將一連串交易事件傳送給 Kinesis。 AWS Glue 任務會執行分析並產生下列彙總輸出。
| window_start_time | window_end_time | card_last_four | 國家/地區 | total_amount |
|---|---|---|---|---|
| 2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | 美國 | 85 |
| 2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | 澳洲 | 10 |
| 2023-07-10 17:05:45 | 2023-07-10 17:15:45 | 6544 | 美國 | 50 |
| 2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | 美國 | 50 |
| 2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | 澳洲 | 150 |
根據以上彙總資料,您可以看到 10 分鐘的時段每 5 分鐘滑動一次,並依交易金額進行加總。系統在 17:10-17:20 這個時段偵測到異常,該時段出現極端值,也就是一筆澳洲 150 美元的交易。AWS Glue 可偵測到此異常,並使用 boto3 將帶有違規密鑰的警報事件推送到 SNS 主題。此外,Lambda 函式可以訂閱此主題並採取動作。
處理滑動時段的資料
group-by 子句和範圍函數可用於實作滑動時段,如下圖所示。
grouped_df = parsed_df \ .groupBy(window(col("event_time"), "10 minute", "5 min"), "country", "card_last_four") \ .agg(sum("tx_amount").alias("total_amount"))
工作階段時段
和前述兩種時段皆有固定的長度不同,工作階段時段的時段長度視資料輸入而定,可以是靜態或動態。工作階段時段從輸入資料事件開始,只要在間隙或閒置時接收到輸入資料,此時段就會持續延長。
讓我們舉個例子。ABC 飯店想要找出一星期中最繁忙的時間,以便為客人提供更好的優惠。只要客人一登記入住,工作階段時段便隨即開始,而 Spark 會在該事件時段維持彙總狀態。每次訪客簽入時,都會產生事件並傳送至其中 Amazon Kinesis Data Streams。該飯店決定如果 15 分鐘內沒有人辦理入住手續,事件時段就會結束。當有新的客人登記入住時,下一個事件時段就會再次開始。輸出的資料如下所示。
| window_start_time | window_end_time | 城市 | total_checkins |
|---|---|---|---|
| 2023-07-10 17:02:00 | 2023-07-10 17:30:00 | 達拉斯 | 50 |
| 2023-07-10 17:02:00 | 2023-07-10 17:30:00 | 芝加哥 | 25 |
| 2023-07-10 17:40:00 | 2023-07-10 18:20:00 | 達拉斯 | 75 |
| 2023-07-10 18:50:45 | 2023-07-10 19:15:45 | 達拉斯 | 20 |
第一次登記入住的事件時間發生在 17:02,彙總事件時段便從 17:02 開始。只要系統在 15 分鐘內接收到事件,此彙總就會持續進行。在上面的例子中,最後收到的事件是在 17:15,接下來的 15 分鐘都沒有事件。因此,Spark 在 17:30 (17:15 + 15 分鐘) 結束了該事件時段,並設定該時段為 17:02 - 17:30。在 17:47 收到新的登記入住資料事件時,便又開始了新的事件時段。
處理工作階段時段的資料
group-by 子句和範圍函數可用於實作滑動時段。
grouped_df = parsed_df \ .groupBy(session_window(col("event_time"), "10 minute"), "city") \ .agg(count("check_in").alias("total_checkins"))
輸出模式
輸出模式是將無邊界表格的結果寫入外部連接器的一種模式,共有三種模式可用。在以下範例中,您要在每個微批次串流和處理資料行時,計算某個字的出現次數。
-
完整模式:即使單字出現次數尚未在目前的事件時段更新,整個結果表格仍會在每次微批次處理後寫入連接器。
-
附加模式:此為預設模式。在此模式中,只有自上次觸發後新增至結果表格的新單字和/或新列才會寫入連接器。此模式適用於地圖、flatMap、篩選條件等查詢的無狀態串流。
-
更新模式:只有自上次觸發後更新或新增至結果表格的文字和/或列才會寫入連接器。
注意
工作階段時段不支援「更新」輸出模式。
處理延遲資料和浮水印
在處理即時資料時,資料可能會因為網路延遲和上游故障問題而延遲到達,因此我們需要一種機制來對遺漏的事件時段再次進行彙總。但是,要做到這一點,需要維持狀態。同時,較舊的資料必須清理掉,以限制狀態的大小。Spark 2.1 版新增了對浮水印功能的支援,該功能可維持狀態,並可讓用戶指定延遲資料的閾值。
參考上面的股票代碼範例,讓我們假設允許的延遲資料閾值為不超過 10 分鐘。為了簡化情況,我們假設使用輪轉時段,股票代碼為 AMZ,交易為買入。
在上圖中,我們在計算 10 分鐘輪轉時段的總量。在 17:00、17:10 和 17:20 皆有觸發事件。在時間軸箭頭上方,顯示的是輸入資料串流,下方則是無邊界結果表。
在第一個 10 分鐘輪轉時段中,我們根據 event_time 進行彙總,計算出的 total_volume 為 30。在第二個事件時段,Spark 在 event_time = 17:02 時,取得了第一個資料事件。由於這是 Spark 迄今為止可查看的 event_time 上限,因此浮水印閾值設為往回追溯 10 分鐘 (也就是 watermark_event_time = 16:52)。任何 event_time 在 16:52 之後的資料事件都會被視為有時限的彙總,而任何在此時間之前的資料事件則會被捨棄。這可讓 Spark 可以多維持 10 分鐘的中繼狀態,以容納延遲的資料。在時鐘時間大約 17:08 時,Spark 收到 event_time = 16:54 的事件,此時間正好落在閾值內。因此,Spark 重新計算了「16:50 - 17:00」這個事件時段,並將總量從 30 更新為 60。
但是,在 17:20 這個觸發時間,當 Spark 收到 event_time = 17:15 的事件時,其設定的 watermark_event_time = 17:05。因此,event_time = 17:03 的延遲資料事件便被認定為「延遲太久」,而被系統忽略。
Watermark Boundary = Max(Event Time) - Watermark Threshold
在 AWS Glue 使用浮水印
在超過浮水印邊界之前,Spark 不會將資料發出或寫入至外部連接器。若要在 AWS Glue 實作浮水印,請參閱以下範例。
grouped_df = parsed_df \ .withWatermark("event_time", "10 minutes") \ .groupBy(window("event_time", "5 minutes"), "ticker") \ .agg(sum("volume").alias("total_volume"))