Uso de métricas personalizadas con Amazon Managed Service para Apache Flink - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Uso de métricas personalizadas con Amazon Managed Service para Apache Flink

Managed Service para Apache Flink expone 19 métricas a CloudWatch, incluidas las métricas de uso y rendimiento de los recursos. Además, puede crear sus propias métricas para realizar un seguimiento de los datos específicos de la aplicación, como el procesamiento de eventos o el acceso a recursos externos.

Funcionamiento

Las métricas personalizadas de Managed Service para Apache Flink utilizan el sistema de métricas de Apache Flink. Las métricas de Apache Flink tienen los siguientes atributos:

  • Tipo: el tipo de métrica describe cómo mide e informa los datos. Los tipos de métricas de Apache Flink disponibles incluyen Recuento, Indicador, Histograma y Medidor. Para obtener más información sobre los tipos de métricas de Apache Flink, consulte Metric Types.

    nota

    AWSCloudWatch Metrics no admite el tipo de métrica Histograma de Apache Flink. CloudWatch solo puede mostrar métricas de Apache Flink de los tipos Recuento, Indicador y Medidor.

  • Ámbito: el ámbito de una métrica consta de su identificador y un conjunto de pares clave-valor que indican cómo se informará de la métrica a CloudWatch. El identificador de una métrica consta de los elementos siguientes:

    • El alcance del sistema, que indica el nivel en el que se informa de la métrica (por ejemplo, el operador).

    • Un ámbito de usuario, que define atributos como las variables de usuario o los nombres de los grupos de métricas. Estos atributos se definen mediante MetricGroup.addGroup(key, value) o MetricGroup.addGroup(name).

    Para obtener más información sobre esta métrica, consulte Scope.

Para obtener más información sobre las métricas de Apache Flink, consulte Metrics en la documentación de Apache Flink.

Para crear una métrica personalizada en Managed Service para Apache Flink, puede acceder al sistema de métricas de Apache Flink desde cualquier función de usuario que se amplíe mediante una llamada. RichFunction GetMetricGroup Este método devuelve un objeto MetricGroup que puede usar para crear y registrar métricas personalizadas. Managed Service para Apache Flink informa a CloudWatch de todas las métricas creadas con la clave de grupo KinesisAnalytics. Las métricas personalizadas que defina tienen las siguientes características:

  • Su métrica personalizada tiene un nombre de métrica y un nombre de grupo. Estos nombres deben constar de caracteres alfanuméricos según las reglas de nomenclatura de Prometheus.

  • Los atributos que defina en el ámbito de usuario (excepto el grupo de métricas KinesisAnalytics) se publican como dimensiones de CloudWatch.

  • Las métricas personalizadas se publican en el nivel Application de forma predeterminada.

  • Las dimensiones (Tarea/Operador/Paralelismo) se añaden a la métrica en función del nivel de supervisión de la aplicación. El nivel de supervisión de la aplicación se establece mediante el parámetro MonitoringConfiguration de la acción CreateApplication o el parámetro MonitoringConfigurationUpdate de la acción UpdateApplication.

Visualización de ejemplos para crear una clase de asignación

Los siguientes ejemplos de código muestran cómo crear una clase de asignación que cree e incremente una métrica personalizada, y cómo implementar la clase de mapeo en su aplicación agregándola a un objeto DataStream.

Métrica personalizada de recuento de registros

El siguiente ejemplo de código muestra cómo crear una clase de mapeo que cree una métrica que cuente los registros de un flujo de datos (la misma funcionalidad que la métrica numRecordsIn):

private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }

En el ejemplo anterior, la variable valueToExpose se incrementa para cada registro que procesa la aplicación.

Tras definir la clase de mapeo, se crea una secuencia en la aplicación que implementa el mapa:

DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));

Para ver el código completo de esta aplicación, consulte Record Count Custom Metric Application.

Métrica personalizada de recuento de palabras

El siguiente ejemplo de código muestra cómo crear una clase de mapeo que cree una métrica que cuente palabras en un flujo de datos:

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }

En el ejemplo anterior, la variable counter se incrementa para cada palabra que procesa la aplicación.

Tras definir la clase de mapeo, se crea una secuencia en la aplicación que implementa el mapa:

// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());

Para ver el código completo de esta aplicación, consulte Word Count Custom Metric Application.

Visualización de métricas personalizadas

Las métricas personalizadas de la aplicación aparecen en la consola de CloudWatch Metrics del panel AWS/KinesisAnalytics, en el grupo de métricas Aplicación.