使用适用于 Rust 的 SDK 的 Amazon MSK 示例 - 适用于 Rust 的 AWS SDK

使用适用于 Rust 的 SDK 的 Amazon MSK 示例

以下代码示例演示了如何通过将适用于 Rust 的 AWS SDK 与 Amazon MSK 结合使用,来执行操作和实现常见场景。

每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。

无服务器示例

以下代码示例演示如何实现一个 Lambda 函数,该函数接收通过接收来自 Amazon MSK 集群的记录而触发的事件。该函数检索 MSK 有效负载,并记录下记录内容。

适用于 Rust 的 SDK
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Rust 将 Amazon MSK 事件与 Lambda 结合使用。

use aws_lambda_events::event::kafka::KafkaEvent; use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; use base64::prelude::*; use serde_json::{Value}; use tracing::{info}; /// Pre-Requisites: /// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html /// 2. Add packages tracing, tracing-subscriber, serde_json, base64 /// /// This is the main body for the function. /// Write your code inside it. /// There are some code example in the following URLs: /// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples /// - https://github.com/aws-samples/serverless-rust-demo/ async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> { let payload = event.payload.records; for (_name, records) in payload.iter() { for record in records { let record_text = record.value.as_ref().ok_or("Value is None")?; info!("Record: {}", &record_text); // perform Base64 decoding let record_bytes = BASE64_STANDARD.decode(record_text)?; let message = std::str::from_utf8(&record_bytes)?; info!("Message: {}", message); } } Ok(().into()) } #[tokio::main] async fn main() -> Result<(), Error> { // required to enable CloudWatch error logging by the runtime tracing::init_default_subscriber(); info!("Setup CW subscriber!"); run(service_fn(function_handler)).await }