Crie e execute um aplicativo Managed Service for Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink (Amazon MSF) era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

Crie e execute um aplicativo Managed Service for Apache Flink

Nesta etapa, será criado um aplicativo Managed Service for Apache Flink com fluxos de dados do Kinesis como fonte e coletor.

Crie recursos dependentes

Antes de criar um aplicativo Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:

  • Dois fluxos de dados do Kinesis para entrada e saída.

  • Um bucket do Amazon S3 para armazenar o código do aplicativo

    nota

    Este tutorial pressupõe que você está implantando seu aplicativo na região us-east-1 Leste dos EUA (Norte da Virgínia). Se você usa outra região, adapte todas as etapas corretamente.

Criar dois fluxos de dados do Amazon Kinesis

Antes de criar um aplicativo do Managed Service for Apache Flink para este exercício, crie dois fluxos de dados do Kinesis (ExampleInputStream e ExampleOutputStream). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.

É possível criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para obter instruções sobre o console, consulte Criar e atualizar fluxos de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams. Para criar os fluxos usando o AWS CLI, use os comandos a seguir, ajustando-se à região que você usa para seu aplicativo.

Como criar os fluxos de dados (AWS CLI)
  1. Para criar o primeiro fluxo (ExampleInputStream), use o seguinte comando create-stream AWS CLI do Amazon Kinesis.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. Execute o mesmo comando, alterando o nome do fluxo para , a fim de criar o segundo fluxo que o aplicativo usará para gravar a saída ExampleOutputStream:

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

Crie um bucket do Amazon S3 para o código do aplicativo

Você pode criar um bucket do Amazon S3 usando o console. Para saber como criar um bucket do Amazon S3 usando o console, consulte Criação de um bucket no Guia do usuário do Amazon S3. Dê ao bucket do Amazon S3 um nome globalmente exclusivo anexando seu nome de login, por exemplo.

nota

Crie o bucket na região que utilizada neste tutorial (us-east-1).

Outros recursos

Quando você cria seu aplicativo, o Managed Service for Apache Flink cria automaticamente os seguintes recursos do Amazon CloudWatch, caso eles ainda não existam:

  • Um grupo de logs chamado /AWS/KinesisAnalytics-java/<my-application>

  • Um fluxo de logs chamado kinesis-analytics-log-stream

Configurar seu ambiente de desenvolvimento local

Para desenvolvimento e depuração, você pode executar o aplicativo Apache Flink em sua máquina diretamente do IDE de sua escolha. Todas as dependências do Apache Flink são tratadas como dependências Java regulares usando o Apache Maven.

nota

Na máquina de desenvolvimento, você deve ter o Java JDK 11, o Maven e o Git instalados. Recomenda-se o uso de um ambiente de desenvolvimento como o Eclipse Java Neon ou o IntelliJ Idea. Para verificar se você atende a todos os pré-requisitos, consulte Atenda os pré-requisitos para concluir os exercícios. Você não precisa instalar um cluster Apache Flink na máquina.

Autentique a sessão AWS

O aplicativo usa fluxos de dados do Kinesis para publicar dados. Ao executar localmente, você deve ter uma sessão AWS autenticada válida com permissões para gravar no fluxo de dados do Kinesis. Use as etapas a seguir para autenticar sua sessão:

  1. Se você não tiver o AWS CLI e um perfil nomeado com credencial válida configurado, consulte Configurar a AWS Command Line Interface (AWS CLI).

  2. Verifique se o seu AWS CLI está configurado corretamente e se seus usuários têm permissões para gravar no fluxo de dados do Kinesis publicando o seguinte registro de teste:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Se o seu IDE tiver um plug-in para integrar-se a AWS, você poderá usá-lo para passar as credenciais para o aplicativo em execução no IDE. Para obter mais informações, consulte Toolkit do AWS para IntelliJ IDEA e Toolkit do AWS para Eclipse.

Baixar e examinar o código Java de fluxo do Apache Flink

O código de aplicativo Java deste exemplo está disponível no GitHub. Para fazer download do código do aplicativo, faça o seguinte:

  1. Duplique o repositório remoto usando o seguinte comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Navegue até o diretório amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted.

Analise os componentes do aplicativo

O aplicativo é totalmente implementado na classe com.amazonaws.services.msf.BasicStreamingJob. O método main() define o fluxo de dados para processar os dados de transmissão e executá-los.

nota

Para uma experiência de desenvolvedor otimizada, o aplicativo foi projetado para ser executado sem nenhuma alteração de código no Amazon Managed Service for Apache Flink e localmente, para desenvolvimento em seu IDE.

  • Para ler a configuração de runtime para que ela funcione ao ser executada no Amazon Managed Service for Apache Flink e em seu IDE, o aplicativo detecta automaticamente se está sendo executado localmente de forma autônoma no IDE. Nesse caso, o aplicativo carrega a configuração do runtime de forma diferente:

    1. Quando o aplicativo detectar que está sendo executado no modo autônomo em seu IDE, forme o arquivo application_properties.json incluído na pasta de recursos do projeto. Os conteúdos do arquivo são os seguintes.

    2. Quando o aplicativo é executado no Amazon Managed Service for Apache Flink, o comportamento padrão carrega a configuração do aplicativo a partir das propriedades de runtime que você definirá no aplicativo Amazon Managed Service for Apache Flink. Consulte Crie e configure o aplicativo do Managed Service for Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • O método main() define o fluxo de dados do aplicativo e o executa.

    • Inicializa os ambientes de transmissão padrão. Neste exemplo, mostramos como criar a StreamExecutionEnvironment a ser usada com a API DataSteam e a StreamTableEnvironment a ser usada com SQL e a API Table. Os dois objetos de ambiente são duas referências separadas ao mesmo ambiente de runtime, para usar APIs diferentes.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • Carregue os parâmetros de configuração do aplicativo. Isso os carregará automaticamente a partir do local correto, dependendo de onde o aplicativo está sendo executado:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • O aplicativo define uma fonte usando o conector Kinesis Consumer para ler dados do fluxo de entrada. A configuração do fluxo de entrada é definido em PropertyGroupId=InputStream0. O nome e a região do fluxo estão nas propriedades nomeadas stream.name e aws.region, respectivamente. Para simplificar, essa fonte lê os registros como string.

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • Em seguida, o aplicativo define um coletor usando o conector Kinesis Streams Sink para enviar dados para o fluxo de saída. O nome e a região do fluxo de saída são definidos em PropertyGroupId=OutputStream0, semelhante ao fluxo de entrada. O coletor é conectado diretamente ao DataStream interno que está recebendo dados da fonte. Em um aplicativo real, há alguma transformação entre fonte e coletor.

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • Por fim, execute o fluxo de dados que acabou de definir. Essa deve ser a última instrução do método main() depois de definir todos os operadores que o fluxo de dados exige:

      env.execute("Flink streaming Java API skeleton");

Usar o arquivo pom.xml

O arquivo pom.xml define todas as dependências exigidas pelo aplicativo e configura o plug-in Maven Shade para criar o fat-jar que contém todas as dependências exigidas pelo Flink.

  • Algumas dependências têm escopo provided. Essas dependências estão disponíveis automaticamente quando o aplicativo é executado no Amazon Managed Service for Apache Flink. Eles são necessários para compilar o aplicativo ou para executá-lo localmente em seu IDE. Para obter mais informações, consulte Execute o aplicativo localmente. Verifique se você está usando a mesma versão do Flink que o runtime que você usará no Amazon Managed Service for Apache Flink.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • Você deve adicionar dependências extra do Apache Flink ao pom com o escopo padrão, como o conector Kinesis usado por esse aplicativo. Para obter mais informações, consulte Use os conectores do Apache Flink. Você também pode adicionar quaisquer dependências Java adicionais exigidas pelo aplicativo.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • O plug-in Maven Java Compiler garante que o código seja compilado em Java 11, a versão do JDK atualmente suportada pelo Apache Flink.

  • O plug-in Maven Shade empacota o fat-jar, excluindo algumas bibliotecas que são fornecidas pelo runtime. Também especifica dois transformadores: ServicesResourceTransformer e ManifestResourceTransformer. O último configura a classe que contém o método main para iniciar o aplicativo. Se você renomear a classe principal, não se esqueça de atualizar esse transformador.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Gravação de registros de amostra no fluxo de entrada

Nesta seção, você enviará amostras de registros para o fluxo a serem processados pelo aplicativo. Você tem duas opções para gerar dados de amostra, usando um script Python ou o Kinesis Data Generator.

Gerar dados de amostra usando um script Python

É possível usar um script Python para enviar registros de amostra para o fluxo.

nota

Para executar esse script do Python, você deve usar o Python 3.x e ter a biblioteca AWS SDK para Python (Boto) instalada.

Para começar a enviar dados de teste para o fluxo de entrada do Kinesis:

  1. Baixe o script stock.py Python do gerador de dados desde o repositório GitHub do gerador de dados.

  2. Execute o script stock.py:

    $ python stock.py

Mantenha o script em execução enquanto você conclui o restante do tutorial. Agora você pode executar o aplicativo Apache Flink.

Gerar dados de amostra usando o Kinesis Data Generator

Como alternativa ao script Python, você pode usar o Kinesis Data Generator, também disponível em uma versão hospedada, para enviar dados de amostra aleatórios para o fluxo. O Kinesis Data Generator é executado no seu navegador e você não precisa instalar nada na máquina.

Para configurar e executar o Kinesis Data Generator:

  1. Siga as instruções na documentação do Kinesis Data Generator para configurar o acesso à ferramenta. Você executará um modelo CloudFormation que configura um usuário e uma senha.

  2. Acesse o Kinesis Data Generator por meio do URL gerado pelo modelo CloudFormation. Encontre o URL na guia Saída após a conclusão do modelo do CloudFormation.

  3. Configure o gerador de dados:

    • Região: selecione a região que utilizada neste tutorial: us-east-1

    • Fluxo/fluxo de entrega: selecione o fluxo de entrada que o aplicativo usará: ExampleInputStream

    • Registros por segundo: 100

    • Modelo de registro: copie e cole o modelo a seguir:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Teste o modelo: selecione Modelo de teste e verifique se o registro gerado é semelhante ao seguinte:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Inicie o gerador de dados: escolha Selecionar dados a enviar.

O Kinesis Data Generator agora está enviando dados para o ExampleInputStream.

Execute o aplicativo localmente

É possível executar e depurar seu aplicativo Flink localmente em seu IDE.

nota

Antes de continuar, verifique se os fluxos de entrada e saída estão disponíveis. Consulte Criar dois fluxos de dados do Amazon Kinesis. Além disso, verifique se você tem permissão para ler e gravar em ambos os fluxos. Consulte Autentique a sessão AWS.

A configuração do ambiente de desenvolvimento local requer Java 11 JDK, Apache Maven e IDE para desenvolvimento em Java. Verifique se você atende aos pré-requisitos exigidos. Consulte Atenda os pré-requisitos para concluir os exercícios.

Importe o projeto Java para o IDE

Para começar a trabalhar no aplicativo em seu IDE, você deve importá-lo como um projeto Java.

O repositório que você clonou contém vários exemplos. Cada exemplo é um projeto separado. Para este tutorial, importe o conteúdo do subdiretório ./java/GettingStarted para o seu IDE.

Insira o código como um projeto Java existente usando o Maven.

nota

O processo exato para importar um novo projeto Java varia de acordo com o IDE utilizado.

Verifique a configuração do aplicativo local

Ao ser executado localmente, o aplicativo usa a configuração no arquivo application_properties.json na pasta de recursos do projeto sob ./src/main/resources. Você pode editar esse arquivo para usar diferentes nomes ou regiões de fluxos do Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Defina a configuração de execução do IDE

Você pode executar e depurar o aplicativo Flink diretamente do seu IDE executando a classe principal com.amazonaws.services.msf.BasicStreamingJob, da mesma forma que executaria qualquer aplicativo Java. Antes de executar o aplicativo, defina a configuração Run (Executar). A configuração depende do IDE utilizado. Por exemplo, consulte as Configurações de execução/depuração na documentação do IntelliJ IDEA. Em particular, você deve configurar o seguinte:

  1. Adicione as dependências provided ao classpath. Isso é necessário para garantir que as dependências com escopo provided sejam passadas para o aplicativo quando executado localmente. Sem essa configuração, o aplicativo exibe um erro class not found imediatamente.

  2. Passe as credenciais AWS para acessar os fluxos do Kinesis para o aplicativo. A maneira mais rápida é usar o AWS Toolkit for IntelliJ IDEA. Usando esse plug-in do IDE na configuração Executar, você pode selecionar um perfil AWS específico. A autenticação AWS é feita usando esse perfil. Não é necessário passar as credenciais AWS diretamente.

  3. Verifique se o IDE executa o aplicativo usando o JDK 11.

Execute o aplicativo no IDE

Depois de definir a configuração Executar para o BasicStreamingJob, você pode executar ou depurar como um aplicativo Java comum.

nota

Não é possível executar o fat-jar gerado pelo Maven diretamente com java -jar ... na linha de comando. Esse jar não contém as dependências principais do Flink necessárias para executar o aplicativo independentemente.

Quando o aplicativo é iniciado com êxito, ele registra em log algumas informações sobre o minicluster autônomo e a inicialização dos conectores. Isso é acompanhado por vários registros de INFO e alguns de WARN que o Flink normalmente emite quando o aplicativo é iniciado.

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

Depois que a inicialização for concluída, o aplicativo não emitirá mais nenhuma entrada de log. Enquanto os dados estão fluindo, nenhum log é emitido.

Para verificar se o aplicativo está processando dados corretamente, você pode inspecionar os fluxos de entrada e saída do Kinesis, conforme descrito na seção a seguir.

nota

Não emitir logs sobre o fluxo de dados é o comportamento normal de um aplicativo Flink. A emissão de logs em cada registro pode ser conveniente para a depuração, mas pode adicionar uma sobrecarga considerável durante a execução na produção.

Observe os dados de entrada e saída nos fluxos do Kinesis

Você pode observar os registros enviados ao fluxo de entrada pelo Python (geração de amostra) ou pelo Kinesis Data Generator (link) usando o Visualizador de dados no console do Amazon Kinesis.

Para observar registros
  1. Abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Verifique se a região é a mesma executada este tutorial, que é us-east-1 Leste dos EUA (Norte da Virgínia) por padrão. Altere a região se ela não corresponde.

  3. Escolha Fluxos de dados.

  4. Selecione o fluxo que deseja observar, ExampleInputStream ou ExampleOutputStream.

  5. Selecione a guia Visualizador de dados.

  6. Escolha qualquer Fragmento, mantenha Último como Posição inicial e, em seguida, escolha Obter registros. Talvez seja gerado o erro “Nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha Tentar obter registros novamente. Os registros mais recentes publicados no fluxo são exibidos.

  7. Selecione o valor na coluna Dados para inspecionar o conteúdo do registro no formato JSON.

Interrompa seu aplicativo em execução localmente

Interrompa o aplicativo em execução no IDE. O IDE geralmente fornece uma opção de “interromper”. A localização e o método exatos dependem do IDE utilizado.

Compile e empacote o código do aplicativo

Nesta seção, será usado o Apache Maven para compilar o código Java e empacotá-lo em um arquivo JAR. Você pode compilar e empacotar seu código usando a ferramenta de linha de comando do Maven ou o IDE.

Para compilar e empacotar usando a linha de comando do Maven:

Navegue até o diretório que contém o projeto Java GettingStarted e execute o seguinte comando:

$ mvn package

Para compilar e empacotar usando o IDE:

Execute mvn package a partir da integração com o IDE Maven.

Em ambos os casos, o seguinte arquivo JAR é criado: target/amazon-msf-java-stream-app-1.0.jar.

nota

A execução de um “projeto de compilação” a partir do seu IDE pode não criar o arquivo JAR.

Faça upload do arquivo JAR do código do aplicativo

Nesta seção, você faz o upload do arquivo JAR criado na seção anterior no bucket do Amazon Simple Storage Service (Amazon S3) criado no início deste tutorial. Se você não concluiu essa etapa, consulte o (link).

Para fazer upload do arquivo JAR do código do aplicativo
  1. Abra o console do Amazon S3, em https://console.aws.amazon.com/s3/.

  2. Selecione o bucket que você criou anteriormente para o código do aplicativo.

  3. Escolha Carregar.

  4. Escolha Adicionar arquivos.

  5. Navegue até o arquivo JAR gerado na etapa anterior: target/amazon-msf-java-stream-app-1.0.jar.

  6. Escolha Fazer upload sem alterar nenhuma outra configuração.

Atenção

Selecione o arquivo JAR correto em <repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar.

O diretório target também contém outros arquivos JAR que você não precisa carregar.

Crie e configure o aplicativo do Managed Service for Apache Flink

É possível criar e executar um aplicativo Managed Service for Apache Flink usando o console ou a AWS CLI. Para este tutorial, você usará o console.

nota

Ao criar o aplicativo usando o console, os recursos do AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs também são criados. Ao criar o aplicativo usando a AWS CLI, esses recursos devem ser criados separadamente.

Criar o aplicativo

Para criar o aplicativo
  1. Faça login no Console de gerenciamento da AWS e abra o console do Amazon MSF em https://console.aws.amazon.com/flink.

  2. Verifique se a região correta está selecionada: us-east-1 Leste dos EUA (Norte da Virgínia)

  3. Abra o menu à direita e escolha aplicativos Apache Flink e, em seguida, Criar aplicativo de transmissão. Como alternativa, escolha Criar aplicativo de transmissão no contêiner Get Started da página inicial.

  4. Na página Criar aplicativo de transmissão:

    • Selecione um método para configurar o aplicativo de processamento de fluxo: escolha Criar do zero.

    • Configuração do Apache Flink, versão do aplicativo Flink: escolha Apache Flink 1.20.

  5. Configure seu aplicativo

    • Nome do aplicativo: insira MyApplication.

    • Descrição: insira My java test app.

    • Acesso aos recursos do aplicativo: escolha Criar/atualizar o perfil do IAM kinesis-analytics-MyApplication-us-east-1 com as políticas necessárias.

  6. Configure seu Modelo para as configurações do aplicativo

    • Modelos: selecione Desenvolvimento.

  7. Escolha Criar aplicativo de transmissão na parte inferior da página.

nota

Ao criar um aplicativo Managed Service for Apache Flink usando o console, você tem a opção de ter um perfil do IAM e uma política criada para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses recursos do IAM são nomeados usando o nome do aplicativo e a região da seguinte forma:

  • Política: kinesis-analytics-service-MyApplication-us-east-1

  • perfil: kinesisanalytics-MyApplication-us-east-1

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Kinesis Data Analytics. O nome dos recursos que são criados automaticamente recebe o prefixo kinesis-analytics- para compatibilidade com versões anteriores.

Editar a política do IAM

Edite a política do IAM para adicionar permissões de acesso aos fluxos de dados do Kinesis.

Para editar a política
  1. Abra o console do IAM em https://console.aws.amazon.com/iam/.

  2. Selecione Políticas. Selecione a política kinesis-analytics-service-MyApplication-us-east-1 que o console criou na seção anterior.

  3. Escolha a guia Edit (Editar) e escolha a guia JSON.

  4. Adicione a seção destacada do exemplo de política a seguir à política. Substitua os exemplos de IDs de conta (012345678901) pelo ID da conta.

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Escolha Avançar na parte inferior da página e Salvar alterações.

Configure o aplicativo

Edite a configuração do aplicativo para definir o artefato do código do aplicativo.

Para editar a configuração
  1. Na página MyApplication, selecione Configure (Configurar).

  2. Na seção Localização do código do aplicativo:

    • Para o bucket do Amazon S3, selecione o bucket criado anteriormente para o código do aplicativo. Escolha Procurar, selecione o bucket correto e, em seguida, selecione Escolher. Não clique no nome do bucket.

    • Em Caminho do objeto do Amazon S3, insira amazon-msf-java-stream-app-1.0.jar.

  3. Em Permissões de acesso, selecione Criar/atualizar o perfil do IAM kinesis-analytics-MyApplication-us-east-1 com as políticas necessárias.

  4. Na seção Propriedades do runtime, adicione as propriedades a seguir.

  5. Selecione Adicionar novo item e adicione cada um dos seguintes parâmetros:

    ID do grupo Chave Valor
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. Não modifique nenhuma das outras seções.

  7. Escolha Salvar alterações.

nota

Ao optar por habilitar o registro em log do Amazon CloudWatch, o Managed Service for Apache Flink cria um grupo de logs e um fluxo de logs para você. Os nomes desses recursos são os seguintes:

  • Grupo de logs: /aws/kinesis-analytics/MyApplication

  • Fluxo de logs: kinesis-analytics-log-stream

Execute o aplicativo

Agora o aplicativo está configurado e pronto para execução.

Executar o aplicativo
  1. No console do Amazon Managed Service for Apache Flink, escolha Meu aplicativo e escolha Executar.

  2. Na próxima página, que é a página de configuração de restauração do aplicativo, escolha Executar com o snapshot mais recente e, em seguida, escolha Executar.

    O Status nos Detalhes do aplicativo muda de Ready para Starting e depois para Running quando o aplicativo é iniciado.

Quando o aplicativo está no status Running, aí é possível abrir o painel do Flink.

Para abrir o painel do
  1. Selecione Abrir painel do Apache Flink. O painel é aberto em uma nova página.

  2. Na lista Trabalhos em execução, escolha o único trabalho exibido.

    nota

    Se você definiu as propriedades do Runtime ou editou as políticas do IAM incorretamente, o status do aplicativo pode se transformar em Running, mas o painel do Flink mostra que o trabalho está sendo reiniciado continuamente. Esse é um cenário de falha comum se o aplicativo estiver configurado incorretamente ou não tiver permissões para acessar recursos externos.

    Quando isso acontecer, verifique a guia Exceções no painel do Flink para verificar a causa do problema.

Observe as métricas do aplicativo em execução

Na página MyApplication, na seção de métricas do Amazon CloudWatch, você pode ver algumas das métricas fundamentais do aplicativo em execução.

Para visualizar as métricas
  1. Ao lado do botão Atualizar, selecione 10 segundos na lista suspensa.

  2. Quando o aplicativo está em execução e íntegro, é possível ver a métrica de tempo de atividade aumentando continuamente.

  3. A métrica fullrestarts deve ser zero. Se estiver aumentando, a configuração pode ter problemas. Para investigar o problema, revise a guia Exceções no painel do Flink.

  4. A métrica Número de pontos de verificação com falha deve ser zero em um aplicativo íntegro.

    nota

    Esse painel exibe um conjunto fixo de métricas com uma granularidade de 5 minutos. Você pode criar um painel de aplicativos personalizado com qualquer métrica no painel do CloudWatch.

Observe os dados de saída nos fluxos do Kinesis

Verifique se você ainda está publicando dados na entrada, usando o script Python ou o Kinesis Data Generator.

Agora você pode observar a saída do aplicativo em execução no Managed Service for Apache Flink usando o Visualizador de dados em https://console.aws.amazon.com/kinesis/, da mesma forma que você já fez anteriormente.

Para visualizar a saída
  1. Abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Verifique se a região é a mesma utilizada para executar este tutorial. Por padrão, é us-east-1 Leste dos EUA (Norte da Virgínia). Altere a região se necessário.

  3. Escolha Fluxos de dados.

  4. Selecione o fluxo que deseja observar. Para este tutorial, use ExampleOutputStream.

  5. Selecione a guia Visualizador de dados.

  6. Selecione qualquer Fragmento, mantenha Último como Posição inicial e, em seguida, escolha Obter registros. Talvez você veja o erro “Nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha Tentar obter registros novamente. Os registros mais recentes publicados no fluxo são exibidos.

  7. Selecione o valor na coluna Dados para inspecionar o conteúdo do registro no formato JSON.

Interromper o aplicativo

Para interromper o aplicativo, acesse a página do console do aplicativo do Managed Service for Apache Flink, MyApplication.

Como interromper o aplicativo
  1. Na lista suspensa Ação, escolha Interromper.

  2. O Status nos Detalhes do aplicativo muda de Running para Stopping e depois para Ready quando o aplicativo é completamente interrompido.

    nota

    Não se esqueça também de parar de enviar dados para o fluxo de entrada a partir do script Python ou do Kinesis Data Generator.

Próxima etapa

Limpar recursos da AWS