Migração do conector do Spark Kinesis para o SDK 2.x do Amazon EMR 7.0 - Amazon EMR

Migração do conector do Spark Kinesis para o SDK 2.x do Amazon EMR 7.0

O AWS SDK fornece um rico conjunto de APIs e bibliotecas para interagir com serviços de computação em nuvem da AWS, como gerenciar credenciais e conectar-se aos serviços do S3 e do Kinesis. O conector do Spark Kinesis é usado para consumir dados do Kinesis Data Streams, e os dados recebidos são transformados e processados no mecanismo de execução do Spark. Atualmente, esse conector é criado com base na versão 1.x do AWS SDK e do serviço Kinesis Client Library (KCL).

Como parte da migração do AWS SDK 2.x, o conector do Spark Kinesis também é atualizado para ser executado com o SDK 2.x. Na versão 7.0 do Amazon EMR, o Spark contém a atualização do SDK 2.x que ainda não está disponível na versão comunitária do Apache Spark. Se você usa o conector do Spark Kinesis de uma versão inferior à 7.0, é necessário migrar os códigos da sua aplicação para execução no SDK 2.x antes de poder migrar para o Amazon EMR 7.0.

Guias de migração

Esta seção descreve as etapas para migrar uma aplicação ao conector atualizado do Spark Kinesis. Estão inclusos guias de migração à Kinesis Client Library (KCL) 2.x, aos provedores de credenciais da AWS e aos clientes de serviços da AWS no AWS SDK 2.x. Para referência, a seção também inclui uma amostra do programa WordCount que usa o conector do Kinesis.

Migração da versão 1.x à 2.x do serviço KCL

  • Nível e dimensões das métricas em KinesisInputDStream

    Ao instanciar um KinesisInputDStream, você pode controlar o nível e as dimensões das métricas do fluxo. O seguinte exemplo demonstra como personalizar esses parâmetros com a KCL 1.x:

    import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()

    Na KCL 2.x, essas configurações têm nomes de pacotes diferentes. Para migrar à versão 2.x:

    1. Altere as instruções de importação de com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration e com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel para software.amazon.kinesis.metrics.MetricsLevel e software.amazon.kinesis.metrics.MetricsUtil, respectivamente.

      // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
    2. Substitua a linha metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet por metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)

    A seguir está uma versão atualizada de KinesisInputDStream com níveis e dimensões de métricas personalizados.

    import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
  • Função de manipulador de mensagens em KinesisInputDStream

    Ao instanciar um KinesisInputDStream, você também pode fornecer uma “função de manipulador de mensagens” que usa um registro do Kinesis e retorna um objeto genérico T, caso queira usar outros dados incluídos em um registro, como a chave de partição.

    Na KCL 1.x, a assinatura da função de manipulador de mensagens é: Record => T, com o registro sendo com.amazonaws.services.kinesis.model.Record. Na KCL 2.x, a assinatura do manipulador é alterada para: KinesisClientRecord => T, com KinesisClientRecord sendo software.amazon.kinesis.retrieval.KinesisClientRecord.

    A seguir está um exemplo de fornecimento de um manipulador de mensagens na KCL 1.x.

    import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Para migrar o manipulador de mensagens:

    1. Altere a instrução de importação de com.amazonaws.services.kinesis.model.Record para software.amazon.kinesis.retrieval.KinesisClientRecord.

      // import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
    2. Atualize a assinatura do método do manipulador de mensagens.

      //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5

    A seguir está um exemplo atualizado de fornecimento do manipulador de mensagens na KCL 2.x.

    import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Para obter mais informações sobre como migrar da KCL 1.x para a KCL 2.x, consulte Migração de consumidores da KCL 1.x para a KCL 2.x .

Migração de provedores de credenciais da AWS do AWS SDK 1.x para a versão 2.x

Os provedores de credenciais são usados para obter credenciais da AWS para fins de interações com a AWS. Há várias mudanças de interface e classe relacionadas aos provedores de credenciais na SDK 2.x, que podem ser encontradas aqui. O conector do Spark Kinesis definiu uma interface (org.apache.spark.streaming.kinesis.SparkAWSCredentials) e classes de implementação que retornam a versão 1.x dos provedores de credenciais da AWS. Esses provedores de credenciais são necessários ao inicializar clientes Kinesis. Por exemplo, se você estiver usando o método SparkAWSCredentials.provider nas aplicações, precisará atualizar os códigos para consumir a versão 2.x dos provedores de credenciais da AWS.

A seguir está um exemplo de uso dos provedores de credenciais no AWS SDK 1.x.

import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Para migrar ao SDK 2.x:
  1. Altere a instrução de importação de com.amazonaws.auth.AWSCredentialsProvider para software.amazon.awssdk.auth.credentials.AwsCredentialsProvider.

    //import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
  2. Atualize os códigos restantes que usam essa classe.

    import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")

Migração de clientes de serviços da AWS do AWS SDK 1.x para a versão 2.x

Clientes de serviços da AWS têm nomes de pacotes diferentes na versão 2.x (ou seja, software.amazon.awssdk), enquanto o SDK 1.x usa com.amazonaws. Para obter mais informações sobre as alterações de clientes, consulte aqui. Se você estiver usando esses clientes de serviços nos códigos, precisará migrá-los adequadamente.

A seguir está um exemplo de criação de um cliente no SDK 1.x.

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Para migrar à versão 2.x:
  1. Altere as instruções de importação dos clientes de serviços. Veja os clientes DynamoDB como exemplo. Você precisaria mudar com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient ou com.amazonaws.services.dynamodbv2.document.DynamoDB para software.amazon.awssdk.services.dynamodb.DynamoDbClient.

    // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
  2. Atualização dos códigos que inicializam os clientes

    // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();

    Para obter mais informações sobre como migrar do AWS SDK 1.x para a versão 2.x, consulte O que é diferente entre o AWS SDK for Java 1.x e o 2.x.

Exemplos de códigos para aplicações de streaming

import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }

Considerações ao usar o conector atualizado do Spark Kinesis

  • Se suas aplicações usam a Kinesis-producer-library com uma versão do JDK inferior à 11, você pode se deparar com exceções como java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter. Isso acontece porque o EMR 7.0 vem com o JDK 17 por padrão e os módulos J2EE foram removidos das bibliotecas padrão desde o Java 11+. Isso pode ser corrigido adicionando a dependência a seguir no arquivo pom. Substitua a versão da biblioteca pela dependência que preferir.

    <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
  • O arquivo jar do conector do Spark Kinesis pode ser encontrado neste caminho após a criação de um cluster do EMR: /usr/lib/spark/connector/lib/