I didn't see any examples of this so I am wondering if this is a bad practice to extend DAG class. Is it a bad practice and why, if it is?
Example of where I can see this useful follows...
Let's say we have a number of DAGs which all share the same behaviour: calling a specific function as a very last thing, regardless of success or failure. This function could be something like invoking some external API, for instance.
My idea to approach this would be something along these lines:
- extend the DAG class creating a new class DAGWithFinishAction
- implement on_success_callback and on_failure_callback in DAGWithFinishAction to do what I wanted to achieve
- use the new class in
with DAGWithFinishAction(dag_id=..., ...) as dag: ... - schedule tasks in each of implementing DAGs
- expect that each of those DAGs call it's success/failure callbacks after all tasks are finished (in any state)
Is there anything wrong with this approach? I couldn't find anything similar which makes me believe I am missing something.
class DAGWithFinishAction(DAG):
def __init__(self, dag_id, **kwargs):
self.metric_callback = publish_execution_time
on_success_callback = kwargs.get("on_success_callback")
if on_success_callback is None:
on_success_callback = self.metric_callback
else:
if isinstance(on_success_callback, list):
on_success_callback.append(self.metric_callback)
else:
on_success_callback = [on_success_callback, self.metric_callback]
kwargs["on_success_callback"] = on_success_callback
super().__init__(dag_id, **kwargs)
with DAGWithFinishAction(dag_id=..., ...) as dag:
...
The code above works but I am still not sure if this is something that should be avoided or is it a legitimate approach when designing DAGs.