Dagster Limiting Concurrency in a Job

33 Views Asked by At

my code location as below defs which uses k8s_job_executor

defs = Definitions(
    assets=[the_asset], jobs=[asset_job, op_job], executor=k8s_job_executor
)

All jobs/schedules running fine with above defs.

Now i want to create an op based job, this new op should not run in parallel. so i created job with below code to limit concurrency


@op()
def run_query():
 ...

@job(
    executor_def=multiprocess_executor,
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 1,
                "tag_concurrency_limits": [
                        {
                            "key": "database",
                            "value": "graph",
                            "limit": 1,
                        }
                    ],
                },
            }
        }
    },
)
def data_load_job() -> None:

But code is giving error

dagster._core.errors.DagsterInvalidConfigError: Invalid default_value for Field.
    Error 1: Received unexpected config entry "multiprocess" at path root:config. Expected: "{ max_concurrent?: Int? retries?: { disabled?: { } enabled?: { } } start_method?: { forkserver?: { preload_modules?: [String] } spawn?: { } } tag_concurrency_limits?: [{ key: String limit: Int value?: (String | { applyLimitPerUniqueValue: Bool }) }] }".

how to fix the error ? Is there a way to limit the op concurrancy in job ?

0

There are 0 best solutions below