从 KPL 0.x 迁移至 KPL 1.x - Amazon Kinesis Data Streams

从 KPL 0.x 迁移至 KPL 1.x

本主题提供了将消费端从 KPL 0.x 迁移至 KPL 1.x 的分步说明。KPL 1.x 引入了对 适用于 Java 的 AWS SDK 2.x 的支持,同时保留了与先前版本的接口兼容性。无需更新核心数据处理逻辑即可迁移至 KPL 1.x。

  1. 确保有以下先决条件:

    • Java Development Kit (JDK) 8 或更高版本

    • 适用于 Java 的 AWS SDK 2.x

    • 用于依赖项管理的 Maven 或 Gradle

  2. 添加依赖项

    如果您使用的是 Maven,请将以下依赖项添加到您的 pom.xml 文件中。务必将 GroupID 从 com.amazonaws 更新到 software.amazon.kinesis,并将版本 1.x.x 更新到最新的 KPL 版本。

    <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-producer</artifactId> <version>1.x.x</version> <!-- Use the latest version --> </dependency>

    如果使用 Gradle,请在 build.gradle 文件中添加以下信息。请务必将 1.x.x 替换为最新 KPL 版本。

    implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'

    可以在 Maven Central 存储库中查看最新版本的 KPL。

  3. 更新 KPL 的导入语句

    KPL 1.x 使用 适用于 Java 的 AWS SDK 2.x,更新后的软件包名称以 software.amazon.kinesis 开头,而之前的 KPL 中的软件包名称则以 com.amazonaws.services.kinesis 开头。

    com.amazonaws.services.kinesis 的导入替换为 software.amazon.kinesis。下表列出了必须替换的导入。

    导入替换
    进行如下替换: 借助:

    import com.amazonaws.services.kinesis.producer.Attempt;

    import software.amazon.kinesis.producer.Attempt;

    import com.amazonaws.services.kinesis.producer.BinaryToHexConverter;

    import software.amazon.kinesis.producer.BinaryToHexConverter;

    import com.amazonaws.services.kinesis.producer.CertificateExtractor;

    import software.amazon.kinesis.producer.CertificateExtractor;

    import com.amazonaws.services.kinesis.producer.Daemon;

    import software.amazon.kinesis.producer.Daemon;

    import com.amazonaws.services.kinesis.producer.DaemonException;

    import software.amazon.kinesis.producer.DaemonException;

    import com.amazonaws.services.kinesis.producer.FileAgeManager;

    import software.amazon.kinesis.producer.FileAgeManager;

    import com.amazonaws.services.kinesis.producer.FutureTimedOutException;

    import software.amazon.kinesis.producer.FutureTimedOutException;

    import com.amazonaws.services.kinesis.producer.GlueSchemaRegistrySerializerInstance;

    import software.amazon.kinesis.producer.GlueSchemaRegistrySerializerInstance;

    import com.amazonaws.services.kinesis.producer.HashedFileCopier;

    import software.amazon.kinesis.producer.HashedFileCopier;

    import com.amazonaws.services.kinesis.producer.IKinesisProducer;

    import software.amazon.kinesis.producer.IKinesisProducer;

    import com.amazonaws.services.kinesis.producer.IrrecoverableError;

    import software.amazon.kinesis.producer.IrrecoverableError;

    import com.amazonaws.services.kinesis.producer.KinesisProducer;

    import software.amazon.kinesis.producer.KinesisProducer;

    import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;

    import software.amazon.kinesis.producer.KinesisProducerConfiguration;

    import com.amazonaws.services.kinesis.producer.LogInputStreamReader;

    import software.amazon.kinesis.producer.LogInputStreamReader;

    import com.amazonaws.services.kinesis.producer.Metric;

    import software.amazon.kinesis.producer.Metric;

    import com.amazonaws.services.kinesis.producer.ProcessFailureBehavior;

    import software.amazon.kinesis.producer.ProcessFailureBehavior;

    import com.amazonaws.services.kinesis.producer.UnexpectedMessageException;

    import software.amazon.kinesis.producer.UnexpectedMessageException;

    import com.amazonaws.services.kinesis.producer.UserRecord;

    import software.amazon.kinesis.producer.UserRecord;

    import com.amazonaws.services.kinesis.producer.UserRecordFailedException;

    import software.amazon.kinesis.producer.UserRecordFailedException;

    import com.amazonaws.services.kinesis.producer.UserRecordResult;

    import software.amazon.kinesis.producer.UserRecordResult;

    import com.amazonaws.services.kinesis.producer.protobuf.Messages;

    import software.amazon.kinesis.producer.protobuf.Messages;

    import com.amazonaws.services.kinesis.producer.protobuf.Config;

    import software.amazon.kinesis.producer.protobuf.Config;

  4. 更新 AWS 凭证提供程序类的导入语句

    迁移至 KPL 1.x 时,必须将 KPL 应用程序代码的导入中基于 适用于 Java 的 AWS SDK 1.x 的包和类更新为基于 适用于 Java 的 AWS SDK 2.x 的相应包和类。KPL 应用程序中通常导入的是凭证提供程序类。有关凭证提供程序更改的完整列表,请参阅 适用于 Java 的 AWS SDK 2.x 迁移指南文档中的凭证提供程序更改。下面是可能需要在 KPL 应用程序中进行的常见导入更改。

    KPL 0.x 中的导入

    import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;

    KPL 1.x 中的导入

    import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

    如果导入其他任何基于 适用于 Java 的 AWS SDK 1.x 的凭证提供程序,则必须将其更新为 适用于 Java 的 AWS SDK 2.x 的凭证提供程序。如果未从 适用于 Java 的 AWS SDK 1.x 中导入任何类/包,可忽略此步骤。

  5. 更新 KPL 配置中的凭证提供程序配置

    KPL 1.x 中的凭证提供程序配置需要 适用于 Java 的 AWS SDK 2.x 凭证提供程序。如果通过覆盖默认凭证提供程序来传递 KinesisProducerConfiguration 中的 适用于 Java 的 AWS SDK 1.x 凭证提供程序,则必须使用 适用于 Java 的 AWS SDK 2.x 凭证提供程序进行更新。有关凭证提供程序更改的完整列表,请参阅 适用于 Java 的 AWS SDK 2.x 迁移指南文档中的凭证提供程序更改。如果未覆盖 KPL 配置中的默认凭证提供程序,可忽略此步骤。

    例如,如果要使用以下代码覆盖 KPL 的默认凭证提供程序:

    KinesisProducerConfiguration config = new KinesisProducerConfiguration(); // SDK v1 default credentials provider config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());

    必须使用以下代码进行更新才能使用 适用于 Java 的 AWS SDK 2.x 凭证提供程序:

    KinesisProducerConfiguration config = new KinesisProducerConfiguration(); // New SDK v2 default credentials provider config.setCredentialsProvider(DefaultCredentialsProvider.create());