Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Fallo en el punto de control de la aplicación Apache Beam
Si la aplicación Beam está configurada con ShutdownSourceSafterIdLems
Síntoma
Vaya a los registros de CloudWatch de la aplicación Managed Service para Apache Flink y compruebe si se ha registrado el siguiente mensaje de registro. El siguiente mensaje de registro indica que el punto de control no se pudo activar porque algunas tareas estaban terminadas.
{ "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)", "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator", "message": "Failed to trigger checkpoint for jobyour job IDsince some tasks of jobyour job IDhas been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.", "threadName": "Checkpoint Timer", "applicationARN":your application ARN, "applicationVersionId": "5", "messageSchemaVersion": "1", "messageType": "INFO" }
También se puede encontrar en el panel de control de Flink, donde algunas tareas han pasado al estado “FINALIZADO” y ya no es posible establecer puntos de control.
Causa
shutdownSourceAfterIdleMs es una variable de configuración de Beam que cierra las fuentes que han estado inactivas durante el tiempo configurado de milisegundos. Una vez que se ha cerrado una fuente, establecer puntos de control ya no es posible. Esto podría provocar una falla en el punto de control
Una de las causas por las que las tareas pasan al estado “FINALIZADO” es cuando shutdownSourcesAfterIdleMs está establecido en 0 ms, lo que significa que las tareas que estén inactivas se cerrarán inmediatamente.
Solución
Para evitar que las tareas pasen inmediatamente al estado “FINALIZADO”, defina shutdownSourceAfterIdleMs en Long.MAX_VALUE. Se puede hacer esto de dos formas:
-
Opción 1: si su configuración de beam está establecida en la página de configuración de la aplicación Managed Service para Apache Flink, puede añadir un nuevo par clave-valor para configurar shutdpwnSourcesAfteridleMs de la siguiente manera:
-
Opción 2: si su configuración de beam está establecida en el archivo JAR, puede configurar shutdownSourcesAfterIdleMs de la siguiente manera:
FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline