Crie e execute um aplicativo Managed Service for Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink (Amazon MSF) era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie e execute um aplicativo Managed Service for Apache Flink

Neste exercício, será criado um aplicativo Managed Service for Apache Flink com fluxos de dados do Kinesis como fonte e coletor.

Crie recursos dependentes

Antes de criar um Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:

  • Um bucket do Amazon S3 para armazenar o código do aplicativo e para gravar a saída do aplicativo.

    nota

    Este tutorial pressupõe que você está implantando seu aplicativo na região us-east-1. Se você usa outra região, deve adaptar todas as etapas corretamente.

Crie um bucket do Amazon S3

Você pode criar um bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esse recurso, consulte os tópicos a seguir:

  • Para obter instruções, consulte Como criar um bucket do S3? no Guia do usuário do Amazon Simple Storage Service. Dê ao bucket do Amazon S3 um nome globalmente exclusivo anexando seu nome de login.

    nota

    Certifique-se de criar o bucket na região que você usa para este tutorial. O padrão do tutorial é us-east-1.

Outros recursos da

Quando você cria seu aplicativo, o Managed Service for Apache Flink cria os seguintes CloudWatch recursos da Amazon, caso eles ainda não existam:

  • Um grupo de logs chamado /AWS/KinesisAnalytics-java/<my-application>.

  • Um fluxo de logs chamado kinesis-analytics-log-stream.

Configurar seu ambiente de desenvolvimento local

Para desenvolvimento e depuração, você pode executar o aplicativo Apache Flink em sua máquina diretamente do IDE de sua escolha. Todas as dependências do Apache Flink são tratadas como dependências normais do Java usando o Maven.

nota

Na máquina de desenvolvimento, você deve ter o Java JDK 11, o Maven e o Git instalados. Recomenda-se o uso de um ambiente de desenvolvimento como o Eclipse Java Neon ou o IntelliJ Idea. Para verificar se você atende a todos os pré-requisitos, consulte Atenda os pré-requisitos para concluir os exercícios. Você não precisa instalar um cluster Apache Flink na máquina.

Autentique a sessão AWS

O aplicativo usa fluxos de dados do Kinesis para publicar dados. Ao executar localmente, você deve ter uma sessão AWS autenticada válida com permissões para gravar no stream de dados do Kinesis. Use as etapas a seguir para autenticar sua sessão:

  1. Se você não tiver o AWS CLI e um perfil nomeado com credencial válida configurado, consulteConfigure o AWS Command Line Interface (AWS CLI).

  2. Se o seu IDE tiver um plug-in com o qual se integrar AWS, você poderá usá-lo para passar as credenciais para o aplicativo em execução no IDE. Para obter mais informações, consulte AWS Toolkit for IntelliJ IDEA e Toolkit do AWS para compilar o aplicativo ou executar o Eclipse.

Baixar e examinar o código Java de fluxo do Apache Flink

O código do aplicativo para este exemplo está disponível em GitHub.

Para baixar o código de aplicativo Java
  1. Duplique o repositório remoto usando o seguinte comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Navegue até o diretório ./java/GettingStartedTable.

Analise os componentes do aplicativo

O aplicativo é totalmente implementado na classe com.amazonaws.services.msf.BasicTableJob. O método main() define fontes, transformações e coletores. A execução é iniciada por uma instrução de execução no final desse método.

nota

Para uma experiência de desenvolvedor ideal, o aplicativo foi projetado para ser executado sem nenhuma alteração de código no Amazon Managed Service for Apache Flink e localmente, para desenvolvimento em seu IDE.

  • Para ler a configuração de runtime para que ela funcione ao ser executada no Amazon Managed Service for Apache Flink e em seu IDE, o aplicativo detecta automaticamente se está sendo executado localmente de forma autônoma no IDE. Nesse caso, o aplicativo carrega a configuração do runtime de forma diferente:

    1. Quando o aplicativo detectar que está sendo executado no modo autônomo em seu IDE, forme o arquivo application_properties.json incluído na pasta de recursos do projeto. Os conteúdos do arquivo são os seguintes.

    2. Quando o aplicativo é executado no Amazon Managed Service for Apache Flink, o comportamento padrão carrega a configuração do aplicativo a partir das propriedades de runtime que você definirá no aplicativo Amazon Managed Service for Apache Flink. Consulte Crie e configure o aplicativo do Managed Service for Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • O método main() define o fluxo de dados do aplicativo e o executa.

    • Inicializa os ambientes de transmissão padrão. Neste exemplo, mostramos como criar tanto o StreamExecutionEnvironment para usar com a DataStream API quanto o StreamTableEnvironment para usar com SQL e a API de tabela. Os dois objetos de ambiente são duas referências separadas ao mesmo ambiente de tempo de execução, para uso diferente APIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    • Carregue os parâmetros de configuração do aplicativo. Isso os carregará automaticamente a partir do local correto, dependendo de onde o aplicativo está sendo executado:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • O conector FileSystem coletor que o aplicativo usa para gravar resultados nos arquivos de saída do Amazon S3 quando o Flink conclui um ponto de verificação. Você deve ativar os pontos de verificação para gravar arquivos no destino. Quando o aplicativo está sendo executado no Amazon Managed Service for Apache Flink, a configuração do aplicativo controla o ponto de verificação e o ativa por padrão. Por outro lado, ao executar localmente, os pontos de verificação são desabilitados por padrão. O aplicativo detecta que ele é executado localmente e configura o ponto de verificação a cada 5000 ms.

      if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
    • Esse aplicativo não recebe dados de uma fonte externa real. Ele gera dados aleatórios para serem processados por meio do DataGen conector. Esse conector está disponível para DataStream API, SQL e API de tabela. Para demonstrar a integração entre eles APIs, o aplicativo usa a versão da DataStram API porque ela oferece mais flexibilidade. Cada registro é gerado por uma função geradora chamada StockPriceGeneratorFunction nesse caso, na qual você pode adotar uma lógica personalizada.

      DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class));
    • Na DataStream API, os registros podem ter classes personalizadas. As classes devem seguir regras específicas para que o Flink possa usá-las como registro. Para obter mais informações, consulte Tipos de dados com suporte. Neste exemplo, a classe StockPrice é do tipo POJO.

    • A fonte é então anexada ao ambiente de execução, gerando um DataStream de StockPrice. Esse aplicativo não usa semântica de event-time e não gera uma marca d'água. Execute a DataGenerator fonte com um paralelismo de 1, independente do paralelismo do resto do aplicativo.

      DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1);
    • O que segue no fluxo de processamento de dados é definido usando a API Table e o SQL. Para fazer isso, convertemos o DataStream of StockPrices em uma tabela. O esquema da tabela é automaticamente inferido da classe StockPrice.

      Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    • O trecho de código a seguir mostra como definir uma visualização e uma consulta usando a API Table programática:

      Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    • É definida uma tabela de coletor para gravar os resultados em um bucket do Amazon S3 como arquivos JSON. Para ilustrar a diferença com a definição programática de uma visualização, com a API Table, a tabela de coletor é definida usando SQL.

      tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")");
    • A última etapa é um executeInsert() que insere a visualização filtrada dos preços das ações na tabela do coletor. Esse método inicia a execução do fluxo de dados que definimos até agora.

      filteredStockPricesTable.executeInsert("s3_sink");

Usar o arquivo pom.xml

O arquivo pom.xml define todas as dependências exigidas pelo aplicativo e configura o plug-in Maven Shade para criar o fat-jar que contém todas as dependências exigidas pelo Flink.

  • Algumas dependências têm escopo provided. Essas dependências estão disponíveis automaticamente quando o aplicativo é executado no Amazon Managed Service for Apache Flink. Eles são necessários para compilar o aplicativo ou para executá-lo localmente em seu IDE. Para obter mais informações, consulte (atualizar para API Table) Execute o aplicativo localmente. Verifique se você está usando a mesma versão do Flink que o runtime que você usará no Amazon Managed Service for Apache Flink. Para usar a API Table e o SQL, você deve incluir o flink-table-planner-loader e flink-table-runtime-dependencies, ambos com escopo provided.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • Você deve adicionar dependências adicionais do Apache Flink ao pom com o escopo padrão. Por exemplo, o DataGen conector, o conector FileSystem SQL e o formato JSON.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
  • Para gravar no Amazon S3 quando executado localmente, o S3 Hadoop File System também está incluído no escopo provided.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • O plug-in Maven Java Compiler garante que o código seja compilado em Java 11, a versão do JDK atualmente suportada pelo Apache Flink.

  • O plug-in Maven Shade empacota o fat-jar, excluindo algumas bibliotecas que são fornecidas pelo runtime. Também especifica dois transformadores: ServicesResourceTransformer e ManifestResourceTransformer. O último configura a classe que contém o método main para iniciar o aplicativo. Se você renomear a classe principal, não se esqueça de atualizar esse transformador.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Execute o aplicativo localmente

É possível executar e depurar seu aplicativo Flink localmente em seu IDE.

nota

Antes de continuar, verifique se os fluxos de entrada e saída estão disponíveis. Consulte Criar dois fluxos de dados do Amazon Kinesis. Além disso, verifique se você tem permissão para ler e gravar em ambos os fluxos. Consulte Autentique a sessão AWS.

A configuração do ambiente de desenvolvimento local requer Java 11 JDK, Apache Maven e um IDE para desenvolvimento em Java. Verifique se você atende aos pré-requisitos exigidos. Consulte Atenda os pré-requisitos para concluir os exercícios.

Importe o projeto Java para o IDE

Para começar a trabalhar no aplicativo em seu IDE, você deve importá-lo como um projeto Java.

O repositório que você clonou contém vários exemplos. Cada exemplo é um projeto separado. Para este tutorial, importe o conteúdo do subdiretório ./jave/GettingStartedTable para o seu IDE.

Insira o código como um projeto Java existente usando o Maven.

nota

O processo exato para importar um novo projeto Java varia de acordo com o IDE utilizado.

Modifique a configuração do aplicativo local

Ao ser executado localmente, o aplicativo usa a configuração no arquivo application_properties.json na pasta de recursos do projeto sob ./src/main/resources. Para este tutorial de aplicativo, os parâmetros de configuração são o nome do bucket e o caminho em que os dados serão gravados.

Edite a configuração e modifique o nome do bucket do Amazon S3 para corresponder ao bucket criado no início deste tutorial.

[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "<bucket-name>", "path": "output" } } ]
nota

A propriedade de configuração name deve conter somente o nome do bucket, por exemplo my-bucket-name. Não inclua nenhum prefixo, como s3:// ou barra final.

Se você modificar o caminho, omita as barras iniciais ou finais.

Defina a configuração de execução do IDE

Você pode executar e depurar o aplicativo Flink diretamente do seu IDE executando a classe principal com.amazonaws.services.msf.BasicTableJob, da mesma forma que executaria qualquer aplicativo Java. Antes de executar o aplicativo, defina a configuração Run (Executar). A configuração depende do IDE utilizado. Por exemplo, consulte as Configurações de execução/depuração na documentação do IntelliJ IDEA. Em particular, você deve configurar o seguinte:

  1. Adicione as dependências provided ao classpath. Isso é necessário para garantir que as dependências com escopo provided sejam passadas para o aplicativo quando executado localmente. Sem essa configuração, o aplicativo exibe um erro class not found imediatamente.

  2. Passe as AWS credenciais para acessar os streams do Kinesis para o aplicativo. A maneira mais rápida é usar o AWS Toolkit for IntelliJ IDEA. Usando esse plug-in do IDE na configuração Executar, você pode selecionar um AWS perfil específico. AWS a autenticação acontece usando esse perfil. Não é necessário passar as credenciais AWS diretamente.

  3. Verifique se o IDE executa o aplicativo usando o JDK 11.

Execute o aplicativo no IDE

Depois de definir a configuração Executar para o BasicTableJob, você pode executar ou depurar como um aplicativo Java comum.

nota

Não é possível executar o fat-jar gerado pelo Maven diretamente com java -jar ... na linha de comando. Esse jar não contém as dependências principais do Flink necessárias para executar o aplicativo independentemente.

Quando o aplicativo é iniciado com êxito, ele registra em log algumas informações sobre o minicluster autônomo e a inicialização dos conectores. Isso é acompanhado por vários registros de INFO e alguns de WARN que o Flink normalmente emite quando o aplicativo é iniciado.

21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob [] - s3Path is ExampleBucket/my-output-bucket ...

Depois que a inicialização for concluída, o aplicativo não emitirá mais nenhuma entrada de log. Enquanto os dados estão fluindo, nenhum log é emitido.

Para verificar se o aplicativo está processando dados corretamente, inspecione o conteúdo do bucket de saída, conforme descrito na seção a seguir.

nota

Não emitir logs sobre o fluxo de dados é o comportamento normal de um aplicativo Flink. A emissão de logs em cada registro pode ser conveniente para a depuração, mas pode adicionar uma sobrecarga considerável durante a execução na produção.

Observe o aplicativo gravando dados em um bucket do S3

Esse aplicativo de exemplo gera dados aleatórios internamente e grava esses dados no bucket S3 de destino que foi configurado. A menos que você tenha modificado o caminho de configuração padrão, os dados serão gravados no caminho output seguido pelos dados e pelo particionamento por hora, no formato ./output/<yyyy-MM-dd>/<HH>.

O conector do FileSystem coletor cria novos arquivos no ponto de verificação do Flink. Ao ser executado localmente, o aplicativo executa um ponto de verificação a cada 5 segundos (5000 milissegundos), conforme especificado no código.

if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
Para navegar no bucket do S3 e observar o arquivo gravado pelo aplicativo
  1. Escolha o bucket que você criou anteriormente.

  2. Navegue até o caminho output e, em seguida, até as pastas de data e hora que correspondem à hora atual no fuso horário UTC.

  3. Atualize periodicamente para observar novos arquivos aparecendo a cada 5 segundos.

  4. Selecione e baixe um arquivo para observar o conteúdo.

    nota

    Por padrão, os arquivos não têm extensões. O conteúdo é formatado como JSON. Você pode abrir os arquivos com qualquer editor de texto para inspecionar o conteúdo.

Interrompa seu aplicativo em execução localmente

Interrompa o aplicativo em execução no IDE. O IDE geralmente fornece uma opção de “interromper”. A localização e o método exatos dependem do IDE.

Compile e empacote o código do aplicativo

Nesta seção, será usado o Apache Maven para compilar o código Java e empacotá-lo em um arquivo JAR. Você pode compilar e empacotar seu código usando a ferramenta de linha de comando do Maven ou o IDE.

Para compilar e empacotar usando a linha de comando do Maven

Vá para o diretório que contém o GettingStarted projeto Jave e execute o seguinte comando:

$ mvn package

Para compilar e empacotar usando seu IDE

Execute mvn package a partir da integração com o IDE Maven.

Em ambos os casos, o arquivo JAR target/amazon-msf-java-table-app-1.0.jar é criado.

nota

A execução de um projeto de compilação a partir do IDE pode não criar o arquivo JAR.

Faça upload do arquivo JAR do código do aplicativo

Nesta seção, você faz o upload do arquivo JAR criado na seção anterior no bucket do Amazon S3 criado no início deste tutorial. Se você já fez isso, conclua Crie um bucket do Amazon S3.

Para fazer upload do código do aplicativo
  1. Abra o console do Amazon S3 em https://console.aws.amazon.com/s3/.

  2. Selecione o bucket que você criou anteriormente para o código do aplicativo.

  3. Escolha o campo Upload.

  4. Escolha Adicionar arquivos.

  5. Navegue até o arquivo JAR gerado na seção anterior: target/amazon-msf-java-table-app-1.0.jar.

  6. Escolha Fazer upload sem alterar nenhuma outra configuração.

    Atenção

    Selecione o arquivo JAR correto em <repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar.

    O diretório de destino também contém outros arquivos JAR que você não precisa carregar.

Crie e configure o aplicativo do Managed Service for Apache Flink

É possível criar e configurar um aplicativo Managed Service for Apache Flink usando o console ou a AWS CLI. Para este tutorial, você usará o console.

nota

Quando você cria o aplicativo usando o console, seus recursos AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs são criados para você. Ao criar o aplicativo usando a AWS CLI, esses recursos devem ser criados separadamente.

Criar a aplicação

  1. Faça login no e abra Console de gerenciamento da AWS o console Amazon MSF em https://console.aws.amazon.com /flink.

  2. Verifique se a região correta está selecionada: Leste dos EUA (Norte da Virgínia) us-east-1.

  3. No menu à direita, escolha aplicativos Apache Flink e, em seguida, escolha Criar aplicativo de transmissão. Como alternativa, escolha Criar aplicativo de transmissão na seção Introdução da página inicial.

  4. Na página Criar aplicativo de transmissão, faça o seguinte:

    • Em Escolha um método para configurar o aplicativo de processamento de fluxo, escolha Criar do zero.

    • Para Configuração do Apache Flink, versão do aplicativo Flink, escolha Apache Flink 1.19.

    • Na seção Configuração do aplicativo, conclua as seguintes etapas:

      • Em Nome do aplicativo, insira MyApplication.

      • Em Descrição, insira My Java Table API test app.

      • Para Acesso aos recursos do aplicativo, escolha Create/update IAM role kinesis-analytics-MyApplication-us -east-1 com as políticas necessárias.

    • Em Modelo para configurações do aplicativo, faça o seguinte:

      • Em Modelos, selecione Desenvolvimento.

  5. Escolha Criar aplicativo de transmissão.

nota

Ao criar um aplicativo Managed Service for Apache Flink usando o console, você tem a opção de ter um perfil do IAM e uma política criada para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses recursos do IAM são nomeados usando o nome do aplicativo e a região da seguinte forma:

  • Política: kinesis-analytics-service-MyApplication-us-east-1

  • Função: kinesisanalytics-MyApplication-us-east-1

Editar a política do IAM

Edite a política do IAM para adicionar permissões para acessar o bucket do Amazon S3.

Editar a política do IAM para adicionar permissões do bucket do S3
  1. Abra o console do IAM em https://console.aws.amazon.com/iam/.

  2. Selecione Políticas. Selecione a política kinesis-analytics-service-MyApplication-us-east-1 que o console criou na seção anterior.

  3. Escolha a guia Edit (Editar) e escolha a guia JSON.

  4. Adicione a seção destacada do exemplo de política a seguir à política. Substitua o ID da conta de exemplo (012345678901) pelo ID da sua conta e <bucket-name> pelo nome do bucket do S3 que você criou.

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:123456789012:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "WriteOutputBucket", "Effect": "Allow", "Action": "s3:*", "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket2" ] } ] }
  5. Escolha Próximo e, em seguida, escolha Salvar alterações.

Configure o aplicativo

Edite o aplicativo para definir o artefato do código do aplicativo.

Configurar o aplicativo
  1. Na MyApplicationpágina, escolha Configurar.

  2. Na seção Localização do código do aplicativo, selecione Configurar.

    • Para o bucket do Amazon S3, selecione o bucket criado anteriormente para o código do aplicativo. Escolha Procurar, selecione o bucket correto e, em seguida, selecione Escolher. Não clique no nome do bucket.

    • Em Caminho do objeto do Amazon S3, insira amazon-msf-java-table-app-1.0.jar.

  3. Em Permissões de acesso, selecione Criar/atualizar o perfil do IAM kinesis-analytics-MyApplication-us-east-1.

  4. Na seção Propriedades do runtime, adicione as propriedades a seguir.

  5. Selecione Adicionar novo item e adicione cada um dos seguintes parâmetros:

    ID do grupo Chave Valor
    bucket name your-bucket-name
    bucket path output
  6. Não modifique outras configurações.

  7. Escolha Salvar alterações.

nota

Quando você opta por ativar o CloudWatch registro na Amazon, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:

  • Grupo de logs: /aws/kinesis-analytics/MyApplication

  • Fluxo de logs: kinesis-analytics-log-stream

Execute o aplicativo

Agora o aplicativo está configurado e pronto para execução.

Executar o aplicativo
  1. Volte para a página do console no Amazon Managed Service para Apache Flink e escolha. MyApplication

  2. Escolha Executar para iniciar o aplicativo.

  3. Na Configuração de restauração do aplicativo, escolha Executar com o snapshot mais recente.

  4. Escolha Executar.

  5. O Status nos Detalhes do aplicativo muda de Ready para Starting e depois para Running quando o aplicativo é iniciado.

Quando o aplicativo está no status Running, você pode abrir o painel do Flink.

Para abrir o painel e visualizar o trabalho
  1. Selecione Abrir painel do Apache Flink. O painel é aberto em uma nova página.

  2. Na lista Trabalhos em execução, escolha o único trabalho que você pode ver.

    nota

    Se você definiu as propriedades do runtime ou editou as políticas do IAM incorretamente, o status do aplicativo pode mudar para Running, mas o painel do Flink mostra o trabalho sendo reiniciado continuamente. Esse é um cenário de falha comum quando o aplicativo estiver configurado incorretamente ou não tiver permissões para acessar os recursos externos.

    Quando isso acontecer, verifique a guia Exceções no painel do Flink para investigar a causa do problema.

Observe as métricas do aplicativo em execução

Na MyApplicationpágina, na seção de CloudWatch métricas da Amazon, você pode ver algumas das métricas fundamentais do aplicativo em execução.

Para visualizar as métricas
  1. Ao lado do botão Atualizar, selecione 10 segundos na lista suspensa.

  2. Quando o aplicativo está em execução e íntegro, é possível ver a métrica de tempo de atividade aumentando continuamente.

  3. A métrica fullrestarts deve ser zero. Se estiver aumentando, a configuração pode ter problemas. Consulte a guia Exceções no painel do Flink para investigar o problema.

  4. A métrica Número de pontos de verificação com falha deve ser zero em um aplicativo íntegro.

    nota

    Esse painel exibe um conjunto fixo de métricas com uma granularidade de 5 minutos. Você pode criar um painel de aplicativos personalizado com qualquer métrica no CloudWatch painel.

Observe o aplicativo gravando dados no bucket de destino

Agora você consegue observar o aplicativo em execução no Amazon Managed Service for Apache Flink gravando arquivos no Amazon S3.

Para observar os arquivos, siga o mesmo processo usado para verificar os arquivos que estavam sendo gravados quando o aplicativo estava sendo executado localmente. Consulte Observe o aplicativo gravando dados em um bucket do S3 .

Lembre-se de que o aplicativo grava novos arquivos no ponto de verificação do Flink. Quando executados no Amazon Managed Service for Apache Flink, os pontos de verificação são habilitados por padrão e executados a cada 60 segundos. O aplicativo cria novos arquivos aproximadamente a cada 1 minuto.

Interromper o aplicativo

Para interromper o aplicativo, acesse a página do console do aplicativo do Managed Service for Apache Flink, MyApplication.

Como interromper o aplicativo
  1. Na lista suspensa Ação, escolha Interromper.

  2. O Status nos Detalhes do aplicativo muda de Running para Stopping e depois para Ready quando o aplicativo é completamente interrompido.

    nota

    Não se esqueça também de parar de enviar dados para o fluxo de entrada a partir do script Python ou do Kinesis Data Generator.