

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 處理持續串流資料的串流任務
<a name="jobs-streaming"></a>

EMR Serverless 中的串流任務是一種任務模式，可讓您近乎即時地分析和處理串流資料。這些長時間執行的任務會輪詢串流資料，並在資料送達時持續處理結果。串流任務最適合需要即時資料處理的任務，例如近乎即時的分析、詐騙偵測和建議引擎。EMR Serverless 串流任務提供最佳化，例如內建任務彈性、即時監控、增強型日誌管理，以及與串流連接器的整合。

以下是具有串流任務的一些使用案例：
+ **近乎即時的分析** – Amazon EMR Serverless 中的串流任務可讓您近乎即時地處理串流資料，因此您可以對連續資料串流執行即時分析，例如日誌資料、感應器資料或點擊串流資料，以衍生洞見並根據最新資訊及時做出決策。
+ **詐騙偵測** – 當您分析資料串流，並在發生可疑模式或異常時，使用串流任務在金融交易、信用卡操作或線上活動中執行近乎即時的詐騙偵測。
+ **建議引擎** – 串流任務可以處理使用者活動資料和更新建議模型。這樣做會根據行為和偏好開啟個人化和即時建議的可能性。
+ **社交媒體分析** – 串流任務可以處理社交媒體資料，例如推文、評論和文章，讓組織可以近乎即時地監控趨勢、情緒分析和管理品牌評價。
+ **物聯網 (IoT) 分析** – 串流任務可以處理和分析來自 IoT 裝置、感應器和連線機器的高速資料串流，因此執行異常偵測、預測性維護和其他 IoT 分析使用案例。
+ **Clickstream 分析** – 串流任務可以處理和分析來自網站或行動應用程式的 Clickstream 資料。使用這類資料的企業可以執行分析，以進一步了解使用者行為、個人化使用者體驗，以及最佳化行銷活動。
+ **日誌監控和分析** – 串流任務也可以處理來自伺服器、應用程式和網路裝置的日誌資料。這可為您提供異常偵測、故障診斷，以及系統運作狀態和效能。

**主要優點**

EMR Serverless 中的串流任務會自動提供*任務彈性*，這是下列因素的組合：
+ **自動重試** – EMR Serverless 會自動重試失敗的任何任務，而無需您進行任何手動輸入。
+ **可用區域 (AZ) 彈性** – 如果原始 AZ 遇到問題，EMR Serverless 會自動將串流任務切換到運作狀態良好的 AZ。
+ **日誌管理：**
  + **日誌輪換** – 為了更有效率的磁碟儲存管理，EMR Serverless 會定期輪換長時間串流任務的日誌。這樣做可以防止可能耗用所有磁碟空間的日誌累積。
  + **日誌壓縮** – 可協助您以受管持久性有效率地管理和最佳化日誌檔案。壓縮也會改善您使用 受管 Spark 歷史記錄伺服器的偵錯體驗。

**支援的資料來源和資料接收器**

EMR Serverless 可與多個輸入資料來源和輸出資料接收器搭配使用：
+ 支援的輸入資料來源 – Amazon Kinesis Data Streams、Amazon Managed Streaming for Apache Kafka 和自我管理 Apache Kafka 叢集。根據預設，Amazon EMR 7.1.0 版和更新版本包含 [Amazon Kinesis Data Streams 連接器](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-structured-streaming-kinesis.html)，因此您不需要建置或下載任何其他套件。
+ 支援的輸出資料接收器 – AWS Glue Data Catalog 資料表、Amazon S3、Amazon Redshift、MySQL、PostgreSQL Oracle、Oracle、Microsoft SQL、Apache Iceberg、Delta Lake 和 Apache Hudi。

## 考量和限制
<a name="jobs-spark-streaming-considerations"></a>

當您使用串流任務時，請記住下列考量和限制。
+ [Amazon EMR 7.1.0 版及更高版本](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-710-release.html)支援串流任務。
+ EMR Serverless 預期串流任務會長時間執行，因此您無法設定執行逾時來限制任務的執行時間。
+ 串流任務僅與 Spark 引擎相容，該引擎以[結構化串流架構](https://spark.apache.org/streaming/)為基礎。
+ EMR Serverless 會無限期重試串流任務，您無法自訂最大嘗試次數。如果失敗的嘗試次數超過每小時時段設定的閾值，則會自動包含預防威脅以停止任務重試。預設閾值是在一小時內嘗試失敗 5 次。您可以將此閾值設定為 1 到 10 次嘗試。如需詳細資訊，請參閱 [任務彈性](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/SECTION-jobs-resiliency.xml.html)。
+ 串流任務具有檢查點來儲存執行時間狀態和進度，因此 EMR Serverless 可以從最新的檢查點繼續串流任務。如需詳細資訊，請參閱 Apache Spark 文件中的[使用檢查點從失敗中復原](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)。

# 開始使用串流任務
<a name="jobs-spark-streaming-getting-started"></a>

請參閱下列指示，了解如何開始使用串流任務。

1. 遵循 [Amazon EMR Serverless 入門來建立應用程式。](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/getting-started.html)請注意，您的應用程式必須執行 [Amazon EMR 7.1.0 版](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-710-release.html)或更新版本。

1. 一旦應用程式準備就緒，請將 `mode` 參數設定為 `STREAMING` 以提交串流任務，類似下列 AWS CLI 範例。

   ```
   aws emr-serverless start-job-run \
   --application-id <APPPLICATION_ID> \
   --execution-role-arn <JOB_EXECUTION_ROLE> \
   --mode 'STREAMING' \
   --job-driver '{
       "sparkSubmit": {
           "entryPoint": "s3://<streaming script>",
           "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
           "sparkSubmitParameters": "--conf spark.executor.cores=4
               --conf spark.executor.memory=16g 
               --conf spark.driver.cores=4
               --conf spark.driver.memory=16g 
               --conf spark.executor.instances=3"
       }
   }'
   ```

# 支援的串流連接器
<a name="jobs-spark-streaming-connectors"></a>

串流連接器有助於從串流來源讀取資料，也可以將資料寫入串流接收器。

以下是支援的串流連接器：

**Amazon Kinesis Data Streams 連接器**

適用於 Apache Spark 的 [Amazon Kinesis Data Streams 連接器](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-structured-streaming-kinesis.html)可讓您建置串流應用程式和管道，以取用來自 的資料，並將資料寫入 Amazon Kinesis Data Streams。連接器支援增強的廣發耗用，專用讀取輸送量速率高達每個碎片 2MB/秒。根據預設，Amazon EMR Serverless 7.1.0 及更高版本包含 連接器，因此您不需要建置或下載任何其他套件。如需連接器的詳細資訊，請參閱 [ GitHub 上的 spark-sql-kinesis-connector 頁面](https://github.com/awslabs/spark-sql-kinesis-connector/)。

以下是如何使用 Kinesis Data Streams 連接器相依性啟動任務執行的範例。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kinesis-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --jars /usr/share/aws/kinesis/spark-sql-kinesis/lib/spark-streaming-sql-kinesis-connector.jar"
    }
}'
```

若要連線至 Kinesis Data Streams，請使用 VPC 存取設定 EMR Serverless 應用程式，並使用 VPC 端點來允許私有存取。 或使用 NAT Gateway 來取得公有存取。如需詳細資訊，請參閱[設定 VPC 存取](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)。您還必須確保任務執行時間角色具有必要的讀取和寫入許可，以存取所需的資料串流。若要進一步了解如何設定任務執行期角色，請參閱 [Amazon EMR Serverless 的任務執行期角色](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/security-iam-runtime-role.html)。如需所有必要許可的完整清單，請參閱 [ GitHub 上的 spark-sql-kinesis-connector 頁面](https://github.com/awslabs/spark-sql-kinesis-connector/?tab=readme-ov-file#how-to-use-it)。

**Apache Kafka 連接器**

適用於 Spark 結構化串流的 Apache Kafka 連接器是 Spark 社群的開放原始碼連接器，可在 Maven 儲存庫中使用。此連接器有助於 Spark 結構化串流應用程式從自我管理的 Apache Kafka 和 Amazon Managed Streaming for Apache Kafka 讀取和寫入資料。如需連接器的詳細資訊，請參閱 Apache Spark 文件中的[結構化串流 \$1 Kafka 整合指南](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。

下列範例示範如何在任務執行請求中包含 Kafka 連接器。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>"
    }
}'
```

Apache Kafka 連接器版本取決於您的 EMR Serverless 發行版本和對應的 Spark 版本。若要尋找正確的 Kafka 版本，請參閱 [結構化串流 \$1 Kafka 整合指南](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)。

若要搭配 IAM 身分驗證使用 Amazon Managed Streaming for Apache Kafka，請包含另一個相依性，讓 Kafka 連接器能夠透過 IAM 連線至 Amazon MSK。如需詳細資訊，請參閱 [ GitHub 上的 aws-msk-iam-auth 儲存庫](https://github.com/aws/aws-msk-iam-auth)。您也必須確保任務執行時間角色具有必要的 IAM 許可。下列範例示範如何使用具有 IAM 身分驗證的連接器。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>,software.amazon.msk:aws-msk-iam-auth:<MSK_IAM_LIB_VERSION>"
    }
}'
```

若要使用 Kafka 連接器和來自 Amazon MSK 的 IAM 身分驗證程式庫，請設定具有 VPC 存取的 EMR Serverless 應用程式。您的子網路必須具有網際網路存取權，並使用 NAT Gateway 來存取 Maven 相依性。如需詳細資訊，請參閱[設定 VPC 存取](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)。子網路必須具有網路連線才能存取 Kafka 叢集。無論您的 Kafka 叢集是自我管理還是使用 Amazon Managed Streaming for Apache Kafka，都是如此。

# 串流任務日誌管理
<a name="jobs-spark-streaming-log-management"></a>

串流任務支援 Spark 應用程式日誌和事件日誌的日誌輪換，以及 Spark 事件日誌的日誌壓縮。這可協助您有效管理 資源。

**日誌輪換**

串流任務支援 Spark 應用程式日誌和事件日誌的日誌輪換。日誌輪換可防止長串流任務產生可能佔用所有可用磁碟空間的大型日誌檔案。日誌輪換可協助您儲存磁碟儲存體，並防止任務因磁碟空間不足而失敗。如需詳細資訊，請參閱[輪換日誌](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/rotating-logs.html)。

**日誌壓縮**

只要有可用的受管日誌，串流任務也支援 Spark 事件日誌的日誌壓縮。如需受管記錄的詳細資訊，請參閱[使用受管儲存記錄](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/logging.html#jobs-log-storage-managed-storage)。串流任務可能會長時間執行，而且事件資料量可能會隨著時間累積，並大幅增加日誌檔案大小。Spark 歷史記錄伺服器會讀取這些事件並將其載入至 Spark 應用程式 UI 的記憶體。此程序可能會導致高延遲和成本，特別是在 Amazon S3 中存放的事件日誌非常大時。

日誌壓縮會減少事件日誌大小，因此 Spark 歷史記錄伺服器不需要隨時載入超過 1 GB 的事件日誌。如需詳細資訊，請參閱 Apache Spark 文件中的[監控和檢測](https://spark.apache.org/docs/latest/monitoring.html)。