本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將 DynamoDB 與 Amazon Managed Streaming for Apache Kafka 整合
Amazon Managed Streaming for Apache Kafka (Amazon MSK) 可透過全受管、高可用性的 Apache Kafka 服務,輕鬆即時擷取和處理串流資料。
Apache Kafka
由於這些功能,Apache Kafka 通常用於建置即時串流資料管道。資料管道能夠可靠地處理資料,並將資料從一個系統移動到另一個系統,當透過使用多個支援不同使用案例的資料庫以實現專用資料庫策略時,這可能會是重要的功能。
Amazon DynamoDB 是這些資料管道中的常見目標,可支援使用鍵值或文件資料模型的應用程式,並希望具有幾毫秒一致效能的無限可擴展性。
運作方式
Amazon MSK 和 DynamoDB 之間的整合使用 Lambda 函式來取用來自 Amazon MSK 的記錄,並將其寫入 DynamoDB。
Lambda 會在內部輪詢 Amazon MSK 中的新訊息,然後同步調用目標 Lambda 函式。Lambda 函式的事件承載包含來自 Amazon MSK 的訊息批次。對於 Amazon MSK 和 DynamoDB 之間的整合,Lambda 函式會將這些訊息寫入 DynamoDB。
設定 Amazon MSK 與 DynamoDB 之間的整合
注意
您可以在下列 GitHub 儲存庫
下列步驟說明如何設定 Amazon MSK 和 Amazon DynamoDB 之間的範例整合。此範例代表由物聯網 (IoT) 裝置產生並擷取至 Amazon MSK 的資料。當資料導入 Amazon MSK 時,它可以與分析服務或與 Apache Kafka 相容的第三方工具整合,從而實現各種分析使用案例。整合 DynamoDB 也提供個別裝置記錄的索引鍵值查詢。
此範例將示範 Python 指令碼如何將 IoT 感應器資料寫入 Amazon MSK。然後,Lambda 函式會將具有分割區索引鍵「deviceid」的項目寫入 DynamoDB。
提供的 CloudFormation 範本將建立下列資源:Amazon S3 儲存貯體、Amazon VPC、Amazon MSK 叢集,以及用於測試資料操作的 AWS CloudShell。
若要產生測試資料,請建立 Amazon MSK 主題,然後建立 DynamoDB 資料表。您可以從管理主控台使用 Session Manager 登入 CloudShell 作業系統並執行 Python 指令碼。
執行 CloudFormation 範本後,您可以執行下列操作來完成建置此架構。
-
執行 CloudFormation 範本
S3bucket.yaml以建立 S3 儲存貯體。對於任何後續指令碼或操作,請在相同區域中執行它們。輸入ForMSKTestS3做為 CloudFormation 堆疊名稱。
完成後,記下輸出下方的 S3 儲存貯體名稱輸出。您在步驟 3 中將需要此名稱。
-
將下載的 ZIP 檔案
fromMSK.zip上傳至您剛建立的 S3 儲存貯體。
-
執行 CloudFormation 範本
VPC.yaml來建立 VPC、Amazon MSK 叢集和 Lambda 函式。在參數輸入畫面上,輸入您在步驟 1 中被要求提供 S3 儲存貯體時建立的 S3 儲存貯體名稱。將 CloudFormation 堆疊名稱設定為ForMSKTestVPC。
-
準備在 CloudShell 中執行 Python 指令碼的環境。您可以在 AWS 管理主控台 上使用 CloudShell。如需使用 CloudShell 的詳細資訊,請參閱 AWS CloudShell 入門。啟動 CloudShell 後,建立屬於您剛建立的 VPC 的 CloudShell,以連線至 Amazon MSK 叢集。在私有子網路中建立 CloudShell。填寫下列欄位:
-
名稱 - 可以設定為任何名稱。例如 MSK-VPC
-
VPC - 選取 MSKTest
-
子網路 - 選取 MSKTest 私有子網路 (AZ1)
-
SecurityGroup - 選取 ForMSKSecurityGroup
屬於私有子網路的 CloudShell 啟動後,請執行下列命令:
pip install boto3 kafka-python aws-msk-iam-sasl-signer-python -
-
從 S3 儲存貯體下載 Python 指令碼。
aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./ unzip pythonScripts.zip -
檢查管理主控台,並在 Python 指令碼中設定代理程式 URL 和區域值的環境變數。在管理主控台中檢查 Amazon MSK 叢集代理程式端點。
-
在 CloudShell 上設定環境變數。如果您使用的是美國西部 (奧勒岡):
export AWS_REGION="us-west-2" export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098" -
執行下列 Python 指令碼。
建立 Amazon MSK 主題:
python ./createTopic.py建立 DynamoDB 資料表:
python ./createTable.py將測試資料寫入 Amazon MSK 主題:
python ./kafkaDataGen.py -
檢查所建立 Amazon MSK、Lambda 和 DynamoDB 資源的 CloudWatch 指標,並使用 DynamoDB Data Explorer 驗證存放在
device_status資料表中的資料,以確保所有程序都正確執行。如果每個程序執行均無錯誤,您可以檢查從 CloudShell 寫入 Amazon MSK 的測試資料是否也會寫入 DynamoDB。
-
當您完成此範例時,請刪除本教學課程中建立的資源。刪除兩個 CloudFormation 堆疊:
ForMSKTestS3和ForMSKTestVPC。如果堆疊刪除成功完成,則會刪除所有資源。
後續步驟
注意
如果您在遵循此範例時建立了資源,請記得將其刪除,以避免任何非預期的費用。
整合已識別連結 Amazon MSK 和 DynamoDB 的架構,以啟用串流資料以支援 OLTP 工作負載。從這裡,可以將 DynamoDB 與 OpenSearch Service 連結來實現更複雜的搜尋。請考慮與 EventBridge 整合以因應更複雜的事件驅動需求,以及與 Amazon Managed Service for Apache Flink 等擴充功能整合以因應更高輸送量和更低延遲的需求。