I am using beam python SDK with flink runner, and I am trying to add custom labels to the metrics.
It seems like the provided function (link) doesn't allow me to add labels.
@staticmethod
def counter(namespace, name):
Taking a deeper look on the code, it is using MetricName:
return Metrics.DelegatingCounter(MetricName(namespace, name))
and the MetricName does support for labels (link):
def __init__(self, namespace, name, urn=None, labels=None):
Therefore I am trying to manually update the metricName labels by creating a new class that does pass the labels:
class MetricWithLabels(object):
@staticmethod
def counter(namespace, name, labels):
namespace = beam.metrics.Metrics.get_namespace(namespace)
return beam.metrics.Metrics.DelegatingCounter(beam.metrics.metricbase.MetricName(namespace, name, labels=labels))
However, when I test this in code
class addExample(beam.DoFn):
def __init__(self):
beam.DoFn.__init__(self)
self.example_counter = MetricWithLabels.counter('example_counter', {'label1': 'value1'})
def process(self, element):
self.example_counter.inc()
return element
The final output still don't have the labels I created, and it seems like the output metrics is having labels completely overwrite by Flink runner.:
flink_taskmanager_job_task_operator_custom_example_counter{job_id="41b6a8a793e47f8edaa91e2dccc90a5f",task_id="ecddd3e2ade3edda3cd8430d7b243742",task_attempt_id="72497124f6cc6d7c73674fcbba0664b4",host="172.19.0.5",operator_id="04ff9a876e7d6fb91800bbfb5f72ce25",operator_name="[3]{Convert format, metrics example, logging}",task_name="Source: Reading message from kafka/Read from kafka topic ['test']/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> Map -> [1]Reading message from kafka/Read from kafka topic ['test']/Remove Kafka Metadata -> [3]{Convert format, metrics example, logging}",task_attempt_num="0",job_name="None",tm_id="172.19.0.5:37847-bb580c",subtask_index="0",}
I wonder if there's a way that we can pass in the custom labels to the metrics with flink runner, and if not, where are the labels being overwritten so that I might be able to update the code to support custom labels. Thanks