Anteriormente, o Amazon Managed Service for Apache Flink (Amazon MSF) era conhecido como Amazon Kinesis Data Analytics for Apache Flink.
Falha no ponto de verificação do aplicativo Apache Beam
Se seu aplicativo Beam estiver configurado com shutdownSourcesAfterIdlems
Sintomas
Acesse os logs do CloudWatch do aplicativo Managed Service for Apache Flink e verifique se a seguinte mensagem de log foi registrada. A mensagem de log a seguir indica que o ponto de verificação falhou ao ser acionado, pois algumas tarefas foram concluídas.
{ "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)", "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator", "message": "Failed to trigger checkpoint for jobyour job IDsince some tasks of jobyour job IDhas been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.", "threadName": "Checkpoint Timer", "applicationARN":your application ARN, "applicationVersionId": "5", "messageSchemaVersion": "1", "messageType": "INFO" }
Isso também pode ser encontrado no painel do Flink, onde algumas tarefas entraram no estado “CONCLUÍDO” e o ponto de verificação não é mais possível.
Causa
ShutdownSourcesAfterIdlems é uma variável de configuração do Beam que desliga fontes que ficaram inativas pelo tempo configurado de milissegundos. Depois que uma fonte é desligada, o ponto de verificação não é mais possível. Isso pode levar à falha do ponto de verificação
Uma das causas para as tarefas entrarem no estado “FINALIZADO” é quando ShutdownSourcesAfterIdlems é definido como 0ms, o que significa que as tarefas que estão ociosas serão encerradas imediatamente.
Solução
Para evitar que as tarefas entrem imediatamente no estado “FINALIZADO”, defina ShutdownSourcesAfterIdlems como Long.MAX_VALUE. Isso pode ser feito de duas maneiras:
-
Opção 1: Se a configuração do Beam estiver definida na página de configuração do aplicativo Managed Service for Apache Flink, você poderá adicionar um novo par de valores-chave para definir ShutdPwnSourcesAfterIdlems da seguinte forma:
-
Opção 2: Se a configuração do Beam estiver definida no arquivo JAR, você poderá definir ShutdownSourcesAfterIdlems da seguinte forma:
FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline