Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Prácticas recomendadas de Managed Service para Apache Flink
Esta sección contiene información y recomendaciones para desarrollar un servicio gestionado estable y eficaz para las aplicaciones de Apache Flink.
Temas
Minimice el tamaño del Uber JAR
La aplicación Java/Scala debe estar empaquetada en un JAR súper (super/gordo) e incluir todas las dependencias adicionales requeridas que no estén incluidas en el motor de ejecución. Sin embargo, el tamaño del Uber JAR afecta a los tiempos de inicio y reinicio de la aplicación y puede provocar que el JAR supere el límite de 512 MB.
Para optimizar el tiempo de despliegue, tu Uber JAR no debe incluir lo siguiente:
-
Cualquier dependencia proporcionada por el tiempo de ejecución, como se ilustra en el siguiente ejemplo. Deben tener su
providedalcance en el archivo POM ocompileOnlyen la configuración de Gradle. -
Cualquier dependencia que se utilice únicamente para realizar pruebas, por ejemplo, JUnit o Mockito. Deben tener su
testalcance en el archivo POM otestImplementationen la configuración de Gradle. -
Cualquier dependencia que no utilice realmente tu aplicación.
-
Cualquier dato estático o metadato que requiera la aplicación. La aplicación debe cargar los datos estáticos en tiempo de ejecución, por ejemplo, desde un almacén de datos o desde Amazon S3.
-
Consulte este archivo de ejemplo de POM
para obtener detalles sobre los ajustes de configuración anteriores.
Dependencias proporcionadas
El tiempo de ejecución de Managed Service para Apache Flink proporciona un número de dependencias. Estas dependencias no deben incluirse en el archivo JAR grueso y deben provided estar incluidas en el archivo POM o excluirse explícitamente de la configuración. maven-shade-plugin Todas estas dependencias incluidas en el JAR gordo se ignoran en tiempo de ejecución, pero aumentan el tamaño del JAR, lo que supone una sobrecarga durante la implementación.
Dependencias que proporciona el motor de ejecución en las versiones 1.18, 1.19 y 1.20:
-
org.apache.flink:flink-core -
org.apache.flink:flink-java -
org.apache.flink:flink-streaming-java -
org.apache.flink:flink-scala_2.12 -
org.apache.flink:flink-table-runtime -
org.apache.flink:flink-table-planner-loader -
org.apache.flink:flink-json -
org.apache.flink:flink-connector-base -
org.apache.flink:flink-connector-files -
org.apache.flink:flink-clients -
org.apache.flink:flink-runtime-web -
org.apache.flink:flink-metrics-code -
org.apache.flink:flink-table-api-java -
org.apache.flink:flink-table-api-bridge-base -
org.apache.flink:flink-table-api-java-bridge -
org.apache.logging.log4j:log4j-slf4j-impl -
org.apache.logging.log4j:log4j-api -
org.apache.logging.log4j:log4j-core -
org.apache.logging.log4j:log4j-1.2-api
Además, el motor de ejecución proporciona la biblioteca que se utiliza para obtener las propiedades del tiempo de ejecución de las aplicaciones en Managed Service para Apache Flink. com.amazonaws:aws-kinesisanalytics-runtime:1.2.0
Todas las dependencias proporcionadas por el motor de ejecución deben seguir las siguientes recomendaciones para no incluirlas en el Uber JAR:
-
En Maven (
pom.xml) y SBT (build.sbt), usa scope.provided -
En Gradle (
build.gradle), usa la configuración.compileOnly
Cualquier dependencia proporcionada que se incluya accidentalmente en el Uber JAR se ignorará en tiempo de ejecución debido a que Apache Flink carga primero la clase principal. Para obtener más información, consulte Específico de S3
Connectors
La mayoría de los conectores, excepto el conector FileSystem, que no se incluyen en el tiempo de ejecución deben incluirse en el archivo POM con el alcance predeterminado (). compile
Otras recomendaciones
Por regla general, el uber JAR de Apache Flink suministrado a Managed Service para Apache Flink debe contener el código mínimo necesario para ejecutar la aplicación. No se deben incluir en este archivo jar las dependencias que incluyan las clases fuente, los conjuntos de datos de prueba o el estado de arranque. Si es necesario incorporar recursos estáticos en tiempo de ejecución, separe esta preocupación en un recurso como Amazon S3. Algunos ejemplos de esto son los bootstraps estatales o un modelo de inferencia.
Tómate un tiempo para considerar tu árbol de dependencias profundo y eliminar las dependencias que no estén relacionadas con el tiempo de ejecución.
Si bien Managed Service para Apache Flink admite tamaños de jar de 512 MB, esto debería considerarse una excepción a la regla. Actualmente, Apache Flink admite tarros de aproximadamente 104 MB a través de su configuración predeterminada, y ese debería ser el tamaño objetivo máximo necesario para un tarro.
Tolerancia a fallos: puntos de control y puntos de almacenamiento
Utilice puntos de control y puntos de almacenamiento para implementar la tolerancia a errores en su aplicación de Managed Service para Apache Flink. Tenga en cuenta las siguientes consideraciones al desarrollar y mantener la aplicación:
Se recomienda que deje habilitada la verificación de puntos de control de la aplicación. Los puntos de control permiten que la aplicación tolere los errores durante el mantenimiento programado, así como en caso de que se produzcan fallos inesperados debidos a problemas de servicio, fallos de dependencia de la aplicación y otros problemas. Para obtener más información sobre mantenimiento, consulte Administración de las tareas de mantenimiento de Managed Service para Apache Flink.
Establezca ApplicationSnapshotConfiguration: :SnapshotsEnabled en
falsedurante el desarrollo de la aplicación o la solución de problemas. Se crea una instantánea cada vez que se detiene una aplicación, lo que puede provocar problemas si la aplicación se encuentra en mal estado o no funciona correctamente. EstablecerSnapshotsEnabledentruecuando la aplicación esté en producción y esté estable.nota
Se recomienda que la aplicación cree una instantánea varias veces al día para que se reinicie correctamente con los datos de estado correctos. La frecuencia correcta de las instantáneas depende de la lógica de negocios de la aplicación. Tomar instantáneas frecuentes permite recuperar datos más recientes, pero aumenta el costo y requiere más recursos del sistema.
Para obtener información sobre la supervisión del tiempo de inactividad de las aplicaciones, consulte Métricas y dimensiones en Managed Service para Apache Flink.
Para obtener más información acerca de la implementación de tolerancias ante fallos, consulte Implementación de tolerancia a errores.
Versiones de conector no compatibles
La versión 1.15 de Managed Service para Apache Flink impedirá automáticamente que las aplicaciones se inicien o se actualicen si utilizan versiones de Kinesis Connector no compatibles (incluidas en los JAR de las aplicaciones). Al actualizar a la versión 1.15 de Managed Service para Apache Flink, asegúrese de utilizar el Kinesis Connector más reciente. Se trata de cualquier versión igual o posterior a la 1.15.2. Las versiones restantes no serán compatibles con Managed Service para Apache Flink, ya que pueden provocar problemas de coherencia o fallos, ya que la característica Parada con punto de guardado impide realizar operaciones de parada o actualización sin problemas. Para obtener más información sobre la compatibilidad de conectores en las versiones de Amazon Managed Service para Apache Flink, consulte Conectores Apache Flink.
Rendimiento y paralelismo
Su aplicación puede escalarse para cumplir con cualquier nivel de rendimiento ajustando el paralelismo de la aplicación y evitando los problemas de rendimiento. Tenga en cuenta las siguientes consideraciones al desarrollar y mantener la aplicación:
Compruebe que todas las fuentes y receptores de las aplicaciones estén suficientemente aprovisionadas y que no estén siendo restringidas. Si las fuentes y los receptores son otros servicios AWS, supervise esos servicios mediante CloudWatch.
En el caso de aplicaciones con un paralelismo muy alto, compruebe si los niveles altos de paralelismo se aplican a todos los operadores de la aplicación. De forma predeterminada, Apache Flink aplica el mismo paralelismo de aplicación a todos los operadores del gráfico de la aplicación. Esto puede provocar problemas de aprovisionamiento en las fuentes o los receptores, o provocar cuellos de botella en el procesamiento de los datos de los operadores. Se puede cambiar el paralelismo de cada operador en el código con setParallelism.
Comprenda el significado de la configuración de paralelismo para los operadores de su aplicación. Si cambia el paralelismo de un operador, es posible que no pueda restaurar la aplicación a partir de una instantánea creada cuando el operador tenía un paralelismo que no es compatible con la configuración actual. Para obtener más información sobre cómo configurar el paralelismo del operador, consulte Set maximum parallelism for operators explicitly
.
Para obtener más información acerca de la implementación del escalado, consulte Implementación del escalado de aplicaciones.
Establecimiento del paralelismo por operador
De forma predeterminada, todos los operadores tienen el paralelismo establecido en el nivel de la aplicación. Se puede anular el paralelismo de un solo operador mediante la API DataStream mediante .setParallelism(x). Se puede establecer el paralelismo de un operador en cualquier paralelismo igual o inferior al paralelismo de la aplicación.
Si es posible, defina el paralelismo del operador como una función del paralelismo de la aplicación. De esta forma, el paralelismo del operador variará con el paralelismo de la aplicación. Si se utiliza el escalado automático, por ejemplo, todos los operadores variarán su paralelismo en la misma proporción:
int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);
En algunos casos, es posible que desee establecer el paralelismo del operador en una constante. Por ejemplo, establecer el paralelismo de una fuente de Kinesis Stream con la cantidad de particiones. En estos casos, considere la posibilidad de incluir el paralelismo del operador como parámetro de configuración de la aplicación para cambiarlo sin cambiar el código, por ejemplo, para volver a fragmentar el flujo de origen.
Registro
Se puede supervisar el rendimiento y las condiciones de error de la aplicación mediante CloudWatch Logs. Tenga en cuenta las siguientes consideraciones al configurar los registros de la aplicación:
Habilite el registro de CloudWatch para la aplicación de modo que se pueda depurar cualquier problema de tiempo de ejecución.
No cree una entrada de registro para cada registro que se esté procesando en la aplicación. Esto provoca graves cuellos de botella durante el procesamiento y puede provocar retrasos en el procesamiento de los datos.
Cree alarmas de CloudWatch para que le notifiquen cuando la aplicación no se ejecute correctamente. Para obtener más información, consulte Uso de alarmas de CloudWatch con Amazon Managed Service para Apache Flink
Para obtener más información acerca de la implementación de los registros, consulte .
Codificación
Se puede hacer que su aplicación funcione y sea estable mediante las prácticas de programación recomendadas. Tenga en cuenta las siguientes consideraciones al escribir el código de aplicación:
No utilice
system.exit()en el código de la aplicación, ni en el métodomainde la aplicación ni en las funciones definidas por el usuario. Si quiere cerrar la aplicación desde el código, lance una excepción derivada deExceptionoRuntimeExceptionque contenga un mensaje sobre el problema de la aplicación.Tenga en cuenta lo siguiente sobre cómo gestiona el servicio esta excepción:
Si la excepción proviene del método
mainde la solicitud, el servicio la incluirá en unaProgramInvocationExceptioncuando la solicitud pase al estadoRUNNINGy el administrador del trabajo no podrá enviar el trabajo.Si la excepción proviene de una función definida por el usuario, el administrador de tareas fallará en la tarea y la reiniciará, y los detalles de la excepción se escribirán en el registro de excepciones.
Considere la posibilidad de sombrear el archivo JAR de la aplicación y las dependencias incluidas en él. Se recomienda sombrear cuando haya posibles conflictos en los nombres de los paquetes entre la aplicación y el tiempo de ejecución de Apache Flink. Si se produce un conflicto, es posible que los registros de la aplicación contengan alguna excepción de tipo
java.util.concurrent.ExecutionException. Para obtener más información sobre cómo sombrear el archivo JAR de la aplicación, consulte Apache Maven Shade Plugin.
Administración de credenciales
No debe incluir credenciales a largo plazo en las aplicaciones de producción (ni en ninguna otra). Es probable que las credenciales a largo plazo se registren en un sistema de control de versiones y se pierdan fácilmente. En su lugar, se puede asociar un rol a la aplicación Managed Service para Apache Flink y conceder permisos a ese rol. La aplicación Flink en ejecución puede seleccionar credenciales temporales del entorno con los privilegios correspondientes. En caso de que sea necesaria la autenticación para un servicio que no esté integrado de forma nativa con IAM, por ejemplo, una base de datos que requiera un nombre de usuario y una contraseña para la autenticación, debería considerar la posibilidad de almacenar los secretos en AWSSecrets Manager
Muchos servicios nativos de AWS admiten la autenticación:
Flujo de datos de Kinesis: ProcessTaxiStream.java
Amazon MSK: https://github.com/aws/aws-msk-iam-auth/#using-the-amazon-msk-library-for-iam-authentication
Amazon Elasticsearch Service: AmazonElasticsearchSink.java
Amazon S3: funciona de forma inmediata en Managed Service para Apache Flink
Lectura de fuentes con pocas particiones
Al leer desde Apache Kafka o un flujo de datos de Kinesis, es posible que no coincida el paralelismo del flujo (es decir, el número de particiones de Kafka y el número de particiones de Kinesis) y el paralelismo de la aplicación. Con un diseño ingenuo, el paralelismo de una aplicación no puede ampliarse más allá del paralelismo de un flujo: cada subtarea de un operador fuente solo puede leer de 1 o más particiones. Esto significa que, en el caso de una transmisión con solo 2 fragmentos y una aplicación con un paralelismo de 8, solo se consumen realmente dos subtareas de la transmisión y 6 subtareas permanecen inactivas. Esto puede limitar considerablemente el rendimiento de la aplicación, especialmente si la deserialización es cara y la lleva a cabo la fuente (que es la opción predeterminada).
Para mitigar este efecto, puede escalar la transmisión. Pero eso no siempre es deseable o posible. Alternativamente, puede reestructurar la fuente para que no realice ninguna serialización y simplemente transmita la byte[]. A continuación, puede volver a equilibrar
Intervalo de actualización del cuaderno de Studio
Si cambia el intervalo de actualización de resultados de párrafos, configúrelo en un valor de al menos 1000 milisegundos.
Rendimiento óptimo del cuaderno de Studio
Probamos con la siguiente afirmación y obtuvimos el mejor rendimiento cuando events-per-second multiplicado por number-of-keys dio menos de 25 000 000. Esto fue para events-per-second por debajo de 150 000.
SELECT key, sum(value) FROM key-values GROUP BY key
Cómo afectan las estrategias de marcas de agua y las particiones inactivas a las ventanas temporales
Al leer los eventos de Apache Kafka y flujo de datos de Kinesis, la fuente puede establecer la hora del evento en función de los atributos de la transmisión. En el caso de Kinesis, la hora del evento es igual a la hora aproximada de llegada de los eventos. Sin embargo, establecer la hora del evento en el origen de los eventos no es suficiente para que una aplicación de Flink utilice la hora del evento. El origen también debe generar marcas de agua que propaguen la información sobre la hora del evento desde el origen a todos los demás operadores. La documentación de Flink
De forma predeterminada, la marca de tiempo de un evento leído en Kinesis se establece en la hora de llegada aproximada determinada por Kinesis. Un requisito previo adicional para que la hora del evento funcione en la aplicación es una estrategia de marca de agua.
WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));
Luego, la estrategia de marca de agua se aplica a DataStream con el método assignTimestampsAndWatermarks. Hay algunas estrategias integradas útiles:
-
forMonotonousTimestamps()solo utilizará la hora del evento (hora aproximada de llegada) y emitirá periódicamente el valor máximo como marca de agua (para cada subtarea específica) -
forBoundedOutOfOrderness(Duration.ofSeconds(...))similar a la estrategia anterior, pero utilizará el tiempo y la duración del evento para generar marcas de agua.
De la documentación de Flink
Cada subtarea paralela de una función fuente suele generar sus marcas de agua de forma independiente. Estas marcas de agua definen la hora del evento en esa fuente paralela en particular.
A medida que las marcas de agua recorren el programa de streaming, adelantan la hora del evento en los operadores a los que llegan. Cada vez que un operador adelanta la hora de su evento, genera una nueva marca de agua para sus operadores subsiguientes aguas abajo.
Algunos operadores consumen varios flujos de entrada; una unión, por ejemplo, o los operadores que siguen una función KeyBy(...) o partition. El tiempo de evento actual de un operador de este tipo es el mínimo de los tiempos de evento de sus flujos de entrada. A medida que sus flujos de entrada actualizan los tiempos de sus eventos, también lo hace el operador.
Esto significa que, si una subtarea de origen consume contenido de una partición inactiva, los operadores intermedios no reciben nuevas marcas de agua de esa subtarea y, por lo tanto, el procesamiento se detiene para todos los operadores intermedios que utilizan ventanas temporales. Para evitarlo, los clientes pueden añadir la opción withIdleness a la estrategia de marcas de agua. Con esta opción, el operador excluye las marcas de agua de las subtareas previas inactivas al calcular la hora del evento del operador. Por lo tanto, las subtareas inactivas ya no bloquean el avance de la hora del evento en los operadores intermedios.
Según el asignador de particiones que utilice, es posible que a algunos trabajadores no se les asigne ninguna partición de Kinesis. En ese caso, estos trabajadores manifestarán el comportamiento de fuente inactiva aunque todos las particiones de Kinesis entreguen datos de eventos de forma continua. Se puede mitigar este riesgo si utiliza uniformShardAssigner con el operador de origen. Esto garantiza que todas las subtareas de origen tengan particiones que procesar siempre que el número de trabajadores sea menor o igual al número de particiones activas.
Sin embargo, la opción de inactividad con las estrategias de marca de agua integradas no adelantará la hora del evento si ninguna subtarea lee ningún evento, es decir, si no hay ningún evento en la transmisión. Esto resulta especialmente visible en los casos de prueba en los que se lee un conjunto finito de eventos de la secuencia. Como el tiempo del evento no avanza después de haber leído el último evento, la última ventana (que contiene el último evento) no se cerrará.
Resumen
La configuración
withIdlenessno generará nuevas marcas de agua en caso de que un fragmento esté inactivo. Excluirá la última marca de agua enviada por las subtareas inactivas del cálculo de la marca de agua mínima en los operadores aguas abajo.Con las estrategias de marcas de agua integradas, la última ventana abierta no se cerrará (a menos que se envíen nuevos eventos que hagan avanzar la marca de agua, pero que creen una nueva ventana que luego permanecerá abierta)
Incluso si la transmisión de Kinesis establece la hora, pueden producirse eventos que lleguen tarde si una partición se consume más rápido que otras (por ejemplo, durante la inicialización de la aplicación o cuando se usa,
TRIM_HORIZONcuando todas las particiones existentes se consumen en paralelo sin tener en cuenta su relación padre/hijo)La configuración
withIdlenessde la estrategia de marca de agua parece interrumpir la configuración específica de origen de Kinesis para las particiones inactivas(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS.
Ejemplo
La siguiente aplicación lee una transmisión y crea ventanas de sesión en función de la hora del evento.
Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });
En el siguiente ejemplo, se escriben 8 eventos en una secuencia de 16 particiones (los 2 primeros y el último evento ocurren en la misma partición).
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022
Esta entrada debería dar como resultado 5 ventanas de sesión: evento 1,2,3; evento 4,5; evento 6; evento 7; evento 8. Sin embargo, el programa solo muestra las 4 primeras ventanas.
11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7
La salida solo muestra 4 ventanas (falta la última ventana que contiene el evento 8). Esto se debe a la hora del evento y a la estrategia de marca de agua. La última ventana no se puede cerrar porque, con las estrategias de marca de agua preintegradas, el tiempo nunca pasa más allá de la hora del último evento que se lee desde la transmisión. Sin embargo, para que la ventana se cierre, el tiempo debe avanzar más de 10 segundos después del último evento. En este caso, la última marca de agua es 2022-03-23T 10:21:27.170Z, pero para que la ventana de la sesión se cierre, es necesaria una marca de agua 10 segundos y 1 ms después.
Si se elimina la opción withIdleness de la estrategia de marca de agua, no se cerrará nunca ninguna ventana de sesión, ya que la “marca de agua global” del operador de la ventana no podrá avanzar.
Cuando se inicia la aplicación Flink (o si los datos están sesgados), es posible que algunos fragmentos se consuman más rápido que otros. Esto puede provocar que algunas marcas de agua se emitan demasiado pronto desde una subtarea (es posible que la subtarea emita la marca de agua en función del contenido de una partición sin haber consumido contenido de las demás particiones a las que está suscrita). Las formas de mitigarlo son distintas estrategias de marca de agua que añadan un margen de seguridad (forBoundedOutOfOrderness(Duration.ofSeconds(30)) o permitan explícitamente que los eventos (allowedLateness(Time.minutes(5)) lleguen tarde.
Establecimiento de un UUID para todos los operadores
Cuando Managed Service para Apache Flink inicia un trabajo de Flink para una aplicación con una instantánea, es posible que el trabajo de Flink no se inicie debido a ciertos problemas. Uno de ellos es la disparidad de ID de los operadores. Flink espera identificadores de operador explícitos y consistentes para los operadores de gráficos de trabajos de Flink. Si no se establece de forma explícita, Flink genera un ID para los operadores. Esto se debe a que Flink usa estos ID de operador para identificar de forma única a los operadores en un gráfico de trabajos y los usa para almacenar el estado de cada operador en un punto de guardado.
El problema de disparidad de ID de los operadores se produce cuando Flink no encuentra una correlación de 1:1 entre los ID de los operadores de un gráfico de trabajos y los ID de los operadores definidos en un punto de guardado. Esto ocurre cuando no se establecen ID de operador explícitos y consistentes, y Flink genera ID de operador que pueden no ser consistentes con la creación de cada gráfico de trabajo. La probabilidad de que las aplicaciones presenten este problema es alta durante las operaciones de mantenimiento. Para evitarlo, se recomienda a los clientes configurar el UUID para todos los operadores en código flink. Para obtener más información, consulte el tema Set a UUID for all operators en Production readiness.
Agregación de ServiceResourceTransformer al plugin Maven Shade
Flink utiliza las interfaces de proveedor de servicios (SPI)
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>