Concurrent Execution of Workflow and Activities in One Worker Instance

68 Views Asked by At

I hope this message finds you well. I’m currently working on Temporal Implementation for managing workflows execution. I’ve encountered a challenge and would greatly appreciate your insights and expertise to help me overcome it.

Issue/Question:

We are running the worker as a POD (replica 1 - we don’t want to run more than 1) in cluster. We want to execute multiple workflow at the same time and get the output without waiting for the current workflow to complete it execution (we want parallel execution in same instance of worker)

Steps I’ve Taken:

I have configured the max concurrency of workflow and task in worker side configuration

Code or Configuration:

def slp(sec):
    sleep(sec)
    return f"slept {sec} sec"

@activity.defn(name="Sleeping 1")
async def sleeping1():
    response = slp(20)
    return response

@activity.defn(name="sleeping 2")
async def sleeping2():
    response = slp(120)
    return response

@workflow.defn
class sleepingWF:
    @workflow.run
    async def run(self, body):

        s1 = await workflow.execute_activity(
            sleeping1))
        s2 = await workflow.execute_activity(
            sleeping2)) 
        
async def main():

    TEMPORAL_ENDPOINT = "localhost:7233"
    TASK_QUEUE = "mymac"

    client = await Client.connect(
        target_host=TEMPORAL_ENDPOINT
    )
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue=TASK_QUEUE,
            workflows=[sleepingWF],
            activities=[sleeping1, sleeping2],
            activity_executor=activity_executor,
            max_concurrent_workflow_tasks=100,
            max_concurrent_activities=100,
            max_concurrent_workflow_task_polls=10,
            max_concurrent_activity_task_polls=10,
        )
        await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

I truly appreciate your time and assistance. If you have any suggestions, insights, or possible solutions, please feel free to share them. Your expertise is invaluable, and I look forward to learning from the community.

Expected Outcome:

workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:00:00 end_time -> 00:02:20

Total time take for two execution should be 140 sec

Environment: python 11

Additional Information:

Current total execution time taken:

workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:02:20 end_time -> 00:04:40

Total time take for two execution takes 280 sec now

1

There are 1 best solutions below

1
James On

Note: I'm still not yet sure what is happening and why, but in the meantime, let me expose a possible explanation and solutions.

TLDR

The call to sleep() at line 2 seems to block concurrent execution other threads.

Try one of these solutions:

  • Replace sleep() by asyncio.sleep() (and then, you can remove the ThreadPoolExecutor altogether); or
  • Make sure you are using a recent release of Python and the Temporal's Python SDK; or
  • Use a ProcessPoolExecutor instead of a ThreadPoolExecutor.

Details

By default, the Temporal's Python SDK executes both workflow tasks and activity tasks in a single thread, based on asyncio. As stated in Python SDK's documentation:

Blocking the async event loop in Python would turn your asynchronous program into a synchronous program that executes serially, defeating the entire purpose of using asyncio. This can also lead to potential deadlock, and unpredictable behavior that causes tasks to be unable to execute. Debugging these issues can be difficult and time consuming, as locating the source of the blocking call might not always be immediately obvious.

It is often possible, and preferable, to design your activities to not block the thread. For example, if your activities need to perform SQL queries, HTTP requests, download from or upload to S3, then you can use asyncio compatible libraries. In the case of the sleep() at line 2, you can use asyncio.sleep() instead.

If you really need to execute blocking code from your activities, for example if your activities run long CPU intensive logic, then you may instead configure your worker to execute activities through a ThreadPoolExecutor or a ProcessPoolExecutor.

Yet, you apparently did the former, and it didn't work. Why? I suppose that's due to the fact that the sleep function isn't releasing Python's Global Interpreter Lock (GIL), though I'm not sure why.

If you don't already know what is Python's GIL, I strongly suggest you get a look to answers on that other question before you continue reading my answer.

The key thing to note here is that Python's GIL is per-process, not per-thread as one would generally expect. That considerably limit the level of concurrency you may actually expect from a ThreadPoolExecutor. Or put another way, ThreadPoolExecutor will only help if your code is calling a native code that releases the GIL, doing I/O operations, or calling some other functions provided by Python that similarly release the GIL. Otherwise, other threads won't get the chance to continue executing.

I suppose that by sleep(), you are really talking about time.sleep(), right? time.sleep() is expected to release the GIL, though it looks like it isn't in your case. Could it be that sleep(), at line 2 in your code, is actually referring to a different function?

How to diagnose this kind of problem?

In your question, you gave these timing:

workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:02:20 end_time -> 00:04:40

I doubt these numbers are correct, as these would mean your second workflow execution don't get started until the first one complete, but in comments, you clearly indicated that you are starting both workflows at the same time.

I'd rather expect your timing to be roughly:

workflow_execution_1 : start_time ->  00:00:00  end_time -> *00:02:40*
workflow_execution_2 : start_time -> *00:00:00* end_time ->  00:04:40

Examining the Workflow History is really the best way to understand what's happening. Here's an example of what you should see in the Temporal UI assuming that there is no blocking-sleep issue. Both workflow executions should look the same:

enter image description here

The Workflow's start time and end time are the ones shown in the green boxes. You may also see the start and end time of your sleep activities (the blue boxes). It's also possible you may see some non-negligible time being spent in workflow task processing, or even time out errors, which would point at other issues.

If my previous suggestions haven't been enough to resolve your issue, please share a picture like that one of both workflow histories, or even better, share the full history in JSON format (press the Download button). Make sure to wait for your workflows to have completed before you capture history. Given the workflow code you shared, there should be at least 17 events in the history when the workflow completes, and the very last event in both history should be a WorkflowExecutionCompleted.

There's a lot to be said about Temporal's workflow history, more than I can write here. If you haven't yet done so, I suggest you take Temporal's 101 and 102 courses; both are available in Python. Among other things, these courses explain how to read the workflow history to debug various type of issues.