从 KCL 2.x 迁移至 KCL 3.x
本主题提供了将消费端从 KCL 2.x 迁移至 KCL 3.x 的分步说明。KCL 3.x 支持 KCL 2.x 消费端进行就地迁移。在滚动迁移工作程序时,可继续使用 Kinesis 数据流中的数据。
重要
KCL 3.x 的接口和方法与 KCL 2.x 保持一致。因此,在迁移期间无需更新记录处理代码。但必须设置正确的配置,并检查迁移所需的步骤。我们强烈建议遵循以下迁移步骤,以获得顺畅的迁移体验。
步骤 1:先决条件
开始使用 KCL 3.x 之前,请确保已具备以下条件:
-
Java Development Kit (JDK) 8 或更高版本
-
适用于 Java 的 AWS SDK 2.x
-
用于依赖项管理的 Maven 或 Gradle
重要
请勿在 KCL 3.x 中使用 2.27.19 到 2.27.23 版的 适用于 Java 的 AWS SDK。这些版本出现的问题会导致使用 KCL 的 DynamoDB 时出现相关异常错误。我们建议使用 2.28.0 或更高版本的 适用于 Java 的 AWS SDK 来避免这一问题。
步骤 2:添加依赖项
如果您使用的是 Maven,请将以下依赖项添加到您的 pom.xml 文件中。确保将 3.x.x 替换为最新的 KCL 版本。
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
如果使用 Gradle,请在 build.gradle 文件中添加以下信息。确保将 3.x.x 替换为最新的 KCL 版本。
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
可以在 Maven Central 存储库
步骤 3:设置迁移相关配置
要从 KCL 2.x 迁移至 KCL 3.x,就必须设置以下配置参数:
-
CoordinatorConfig.clientVersionConfig:该配置决定了应用程序将运行在哪个 KCL 版本兼容模式下。从 KCL 2.x 迁移至 3.x 时,需要将该配置设置为
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X。要设置此配置,请在创建调度器对象时添加以下行:
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
下面是如何设置 CoordinatorConfig.clientVersionConfig 从 KCL 2.x 迁移至 3.x 的示例。您可以按照自己的具体需求,根据需要调整其他配置:
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
重要的是,由于 KCL 2.x 和 3.x 使用的负载均衡算法不同,消费端应用程序中的所有工作程序在给定时间均使用相同的负载均衡算法。如果使用不同的负载平衡算法运行工作程序,可能会导致负载分配不够理想,因为这两种算法是独立运行的。
借助于这种 KCL 2.x 兼容性设置,KCL 3.x 应用程序可以在兼容 KCL 2.x 的模式下运行,并使用 KCL 2.x 的负载平衡算法,直至消费端应用程序中的所有工作程序都升级到 KCL 3.x。迁移完成后,KCL 会自动切换到完整版 KCL 3.x 功能模式,并开始为所有正在运行的工作程序使用新的 KCL 3.x 负载平衡算法。
重要
如果未使用 ConfigsBuilder,而是创建 LeaseManagementConfig 对象来设置配置,则必须在 KCL 3.x 或更高版本中再添加一个称为 applicationName 的参数。有关详细信息,请参阅 LeaseManagementConfig 构造函数编译错误。我们建议使用 ConfigsBuilder 来设置 KCL 配置。ConfigsBuilder 提供了一种更加灵活、更易于维护的方式来配置 KCL 应用程序。
第 4 步:遵循 shutdownRequested() 方法实现的最佳实践
KCL 3.x 中推出了一项名为“优雅移交租约”的功能,在租约重新分配过程中将租约移交给其他工作程序时,该功能可最大程度减少对数据的再处理。其实现方法是在租约移交之前,对租约表中最后处理的序列号进行检查点操作。为确保优雅移交租约功能的正常运行,必须保证在 RecordProcessor 类的 shutdownRequested 方法中调用 checkpointer 对象。如果在 shutdownRequested 方法中未调用 checkpointer 对象,可以按照以下示例所示进行实现。
重要
-
以下实现示例是优雅移交租约的最低要求。需要时可以进行扩展,以添加与检查点相关的其他逻辑。如果当前正在执行任何异步处理,请确保在调用检查点操作之前已对所有传送到下游的记录进行了处理。
-
虽然优雅移交租约可以显著减小租约转移期间进行数据再处理的可能性,但不能完全消除这种可能性。为保持数据的完整性和一致性,请将下游消费端应用程序设计为具有幂等性。这意味着消费端应用程序应该能够处理潜在的记录重复处理,而不会对整个系统带来不利影响。
/** * Invoked when either Scheduler has been requested to gracefully shutdown * or lease ownership is being transferred gracefully so the current owner * gets one last chance to checkpoint. * * Checkpoints and logs the data a final time. * * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint * before the shutdown is completed. */ public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { // Ensure that all delivered records are processed // and has been successfully flushed to the downstream before calling // checkpoint // If you are performing any asynchronous processing or flushing to // downstream, you must wait for its completion before invoking // the below checkpoint method. log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } }
步骤 5:检查 KCL 3.x 收集工作程序指标的先决条件
KCL 3.x 收集 CPU 利用率指标(例如工作程序的 CPU 利用率),以期均匀地平衡各工作程序之间的负载。消费端应用程序的工作程序可运行在 Amazon EC2、Amazon EKS、Amazon EKS 或 AWS Fargate 上。只有在满足以下先决条件时,KCL 3.x 才能从工作程序收集 CPU 利用率指标:
Amazon Elastic Compute Cloud(Amazon EC2)
-
操作系统必须是 Linux 操作系统。
-
必须在 EC2 实例中启用 IMDSv2。
Amazon EC2 上的 Amazon Elastic Container Service (Amazon ECS)
-
操作系统必须是 Linux 操作系统。
-
必须启用 ECS 任务元数据端点版本 4。
-
Amazon ECS 容器代理版本必须为 1.39.0 或更高版本。
上的 Amazon ECSAWS Fargate
-
必须启用 Fargate 任务元数据端点版本 4。如果使用的是 Fargate 平台版本 1.4.0 或更高版本,则默认启用此功能。
-
Fargate(平台版本 1.4.0 或更高版本)。
Amazon EC2 上的 Amazon Elastic Kubernetes Service (Amazon EKS)
-
操作系统必须是 Linux 操作系统。
上的 Amazon EKSAWS Fargate
-
Fargate(平台版本 1.3.0 或更高版本)。
重要
如果 KCL 3.x 因为未满足先决条件而无法从工作程序收集 CPU 利用率指标,它将根据每个租约的吞吐量级别重新平衡负载。这种后备再平衡机制可确保所有工作程序都能从分配给各工作程序的租约中获得类似级别的总吞吐量。有关更多信息,请参阅 KCL 如何向工作程序分配租约并平衡负载。
第 6 步:更新 KCL 3.x 的 IAM 权限
必须向与 KCL 3.x 消费端应用程序有关的 IAM 角色或策略添加以下权限。这涉及对 KCL 应用程序使用的现有 IAM 策略进行更新。有关更多信息,请参阅 KCL 消费端应用程序所必需的 IAM 权限。
重要
您的现有 KCL 应用程序可能没有在 IAM 策略中添加以下 IAM 操作与资源,因为 KCL 2.x 不需要这些操作与资源。在运行 KCL 3.x 应用程序之前,请确保已添加这些操作与资源:
-
操作:
UpdateTable-
资源 (ARN):
arn:aws:dynamodb:region:account:table/KCLApplicationName
-
-
操作:
Query-
资源 (ARN):
arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*
-
-
操作:
CreateTable、DescribeTable、Scan、GetItem、PutItem、UpdateItem、DeleteItem-
资源 (ARN):
arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats,arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState
将 ARN 中的“region”、“account”和“kclApplicationName”分别替换为您自己的 AWS 区域、AWS 账户 编号和 KCL 应用程序名称。如果使用配置来自定义 KCL 创建的元数据表的名称,请使用这些指定的表名称而不是 KCL 应用程序名称。
-
第 7 步:将 KCL 3.x 代码部署到工作程序
在设置了迁移所需的配置并完成之前所有的迁移清单之后,就可以构建代码并将其部署到工作程序中。
注意
如果发现 LeaseManagementConfig 构造函数出现编译错误,请参阅 LeaseManagementConfig 构造函数编译错误 获取故障排查信息。
步骤 8:完成迁移
在部署 KCL 3.x 代码期间,KCL 将继续使用来自 KCL 2.x 的租约分配算法。成功将 KCL 3.x 代码部署到所有工作程序后,KCL 会自动检测部署情况,并根据工作程序的资源利用率切换到新的租约分配算法。有关新租约分配算法的更多详细信息,请参阅KCL 如何向工作程序分配租约并平衡负载。
在部署期间,可使用发送至 CloudWatch 的指标来监控迁移过程。您可以监控 Migration 操作下的指标。所有指标都是每个 KCL 应用程序的指标,并设置为 SUMMARY 指标级别。如果 CurrentState:3xWorker 指标的 Sum 统计数据与 KCL 应用程序的工作程序总数一致,则表示已成功完成向 KCL 3.x 的迁移。
重要
在所有工作程序做好运行新租约分配算法的准备之后,KCL 至少需要 10 分钟才能切换到新算法。
| 指标 | 描述 |
|---|---|
CurrentState:3xWorker |
成功迁移至 KCL 3.x 并运行新租约分配算法的 KCL 工作程序数量。如果此指标的
|
CurrentState:2xCompatibleWorker |
迁移过程中在 KCL 2.x 兼容模式下运行的 KCL 工作程序数量。该指标若为非零值,表示迁移仍在进行中。
|
Fault |
迁移过程中遇到的异常数量。这些异常大多数是瞬时错误,KCL 3.x 会自动重试以完成迁移。如果发现永久性的
|
GsiStatusReady |
租约表上创建全局二级索引 (GSI) 的状态。该指标表示租约表上的 GSI 是否已创建,这是运行 KCL 3.x 的一个先决条件。其值为 0 或 1,其中 1 表示创建成功。在回滚状态下,不会发出该指标。再次向前滚动后,可以继续监控该指标。
|
workerMetricsReady |
所有工作程序指标的发送状态。该指标表示是否所有工作程序都发出诸如 CPU 利用率之类的指标。其值为 0 或 1,其中 1 表示所有工作程序均已成功发出指标,并准备好使用新的租约分配算法。在回滚状态下,不会发出该指标。再次向前滚动后,可以继续监控该指标。
|
KCL 在迁移期间提供回滚至 2.x 兼容模式的能力。成功迁移至 KCL 3.x 后,如果不再需要回滚,我们建议移除 CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X 的 CoordinatorConfig.clientVersionConfig 设置。移除该配置之后,就不会从 KCL 应用程序发出与迁移相关的指标。
注意
我们建议在迁移期间和完成迁移后,监控应用程序的性能和稳定性一段时间。如果发现任何问题,可使用 KCL 迁移工具