my code location as below defs which uses k8s_job_executor
defs = Definitions(
assets=[the_asset], jobs=[asset_job, op_job], executor=k8s_job_executor
)
All jobs/schedules running fine with above defs.
Now i want to create an op based job, this new op should not run in parallel. so i created job with below code to limit concurrency
@op()
def run_query():
...
@job(
executor_def=multiprocess_executor,
config={
"execution": {
"config": {
"multiprocess": {
"max_concurrent": 1,
"tag_concurrency_limits": [
{
"key": "database",
"value": "graph",
"limit": 1,
}
],
},
}
}
},
)
def data_load_job() -> None:
But code is giving error
dagster._core.errors.DagsterInvalidConfigError: Invalid default_value for Field.
Error 1: Received unexpected config entry "multiprocess" at path root:config. Expected: "{ max_concurrent?: Int? retries?: { disabled?: { } enabled?: { } } start_method?: { forkserver?: { preload_modules?: [String] } spawn?: { } } tag_concurrency_limits?: [{ key: String limit: Int value?: (String | { applyLimitPerUniqueValue: Bool }) }] }".
how to fix the error ? Is there a way to limit the op concurrancy in job ?