How to create custom metrics with labels (python SDK + Flink Runner)

11 Views Asked by At

I am using beam python SDK with flink runner, and I am trying to add custom labels to the metrics.

It seems like the provided function (link) doesn't allow me to add labels.

 @staticmethod
  def counter(namespace, name): 

Taking a deeper look on the code, it is using MetricName:

return Metrics.DelegatingCounter(MetricName(namespace, name)) 

and the MetricName does support for labels (link):

 def __init__(self, namespace, name, urn=None, labels=None):

Therefore I am trying to manually update the metricName labels by creating a new class that does pass the labels:

class MetricWithLabels(object):

    @staticmethod
    def counter(namespace, name, labels):
        namespace = beam.metrics.Metrics.get_namespace(namespace)
        return beam.metrics.Metrics.DelegatingCounter(beam.metrics.metricbase.MetricName(namespace, name, labels=labels))

However, when I test this in code

class addExample(beam.DoFn):

    def __init__(self):
        beam.DoFn.__init__(self)
        self.example_counter = MetricWithLabels.counter('example_counter', {'label1': 'value1'})


    def process(self, element):
        self.example_counter.inc()
        return element

The final output still don't have the labels I created, and it seems like the output metrics is having labels completely overwrite by Flink runner.:

flink_taskmanager_job_task_operator_custom_example_counter{job_id="41b6a8a793e47f8edaa91e2dccc90a5f",task_id="ecddd3e2ade3edda3cd8430d7b243742",task_attempt_id="72497124f6cc6d7c73674fcbba0664b4",host="172.19.0.5",operator_id="04ff9a876e7d6fb91800bbfb5f72ce25",operator_name="[3]{Convert format, metrics example, logging}",task_name="Source: Reading message from kafka/Read from kafka topic ['test']/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> Map -> [1]Reading message from kafka/Read from kafka topic ['test']/Remove Kafka Metadata -> [3]{Convert format, metrics example, logging}",task_attempt_num="0",job_name="None",tm_id="172.19.0.5:37847-bb580c",subtask_index="0",}

I wonder if there's a way that we can pass in the custom labels to the metrics with flink runner, and if not, where are the labels being overwritten so that I might be able to update the code to support custom labels. Thanks

0

There are 0 best solutions below