The question is somehow related to this one
I need to create a Http triggered Azure Durable Function that:
- Gets an API Call with some parameters, where the most important one is the list of search_terms
- Downloads a source data from Azure Blob (once)
- Conducts some operations / matching search term with the data from 2.
Based on the response from the linked question I was thinking about the chaining pattern mixed with fan-out-fan-in pattern.
My functions are below (simplified, without exceptions handling etc.)
Durable Function Http Start
import json
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
requestBody = json.loads(req.get_body().decode())
instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Orchestrator
import logging
import json
import pandas as pd
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms = requestBody['search_terms']
num_results = requestBody['num_results']
params = requestBody['params']
# Call ActivityFunction1 to read the Parquet file
blob_data = yield context.call_activity("ActivityFunction1", None)
tasks = []
for search_term in search_terms:
# Call ActivityFunction2 to perform matching
activityVar = {
'blob_data': blob_data,
'messy_term': search_term,
'num_matches': num_results
}
tasks.append(context.call_activity("ActivityFunction2", activityVar))
results = yield context.task_all(tasks)
response = {
"success": 1,
"error_code": 0,
**params,
"results": results
}
return response
main = df.Orchestrator.create(orchestrator_function)
ActivityFunction1
import os
import io
import pandas as pd
from azure.storage.blob import BlobServiceClient
def main(name):
account_name = os.getenv('accountname')
account_key = os.getenv('accountkey')
container_name = os.getenv('containername')
blob_name = os.getenv('blobname')
# Create a connection string to the Azure Blob storage account
connect_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
# Create a BlobServiceClient object using the connection string
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Get a reference to the Parquet blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Download the blob data as a stream
blob_data = blob_client.download_blob()
# Read the Parquet data from the stream into a pandas DataFrame
df = pd.read_parquet(io.BytesIO(blob_data.readall()))
# Convert DataFrame to a JSON-serializable dictionary
df_dict = df.to_dict(orient='records')
clean_list = [item['column'] for item in df_dict]
return clean_list
ActivityFunction2
import logging
import time
import numpy as np
import pandas as pd
def main(activityVar: dict) -> dict:
'''
Matching function
'''
# Read the DataFrame from Azure Blob storage
print('Reading data from blob...')
clean_list = activityVar['blob_data']
...further processing
return results
results is a response to an API call. API call example:
{
"search_terms": ["term1", "term2", "term3"],
"num_results": 3,
"params": {
"search_id": "123",
"engine_name": "engine"}
}
Questions:
- Something feels off here, but I'm not sure what it is. Based on the logs the function reads the data every time, not only once - is it possible with such construction? If so - what can//should I change?
When I trigger it locally the initial terminal logs are:
For detailed output, run func with --verbose flag.
[2023-06-05T09:30:43.504Z] Worker process started and initialized.
[2023-06-05T09:30:47.329Z] Host lock lease acquired by instance ID '00000000000000000000000014B240CF'.
[2023-06-05T09:30:51.136Z] Executing 'Functions.DurableFunctionsHttpStart' (Reason='This function was programmatically called via the host APIs.', Id=3befc3b3-c8c0-430e-a240-0c3273c5106f)
[2023-06-05T09:30:51.447Z] Started orchestration with ID = 'b77f67233edc4a6a910c5312ab49455a'.
[2023-06-05T09:30:51.508Z] Executed 'Functions.DurableFunctionsHttpStart' (Succeeded, Id=3befc3b3-c8c0-430e-a240-0c3273c5106f, Duration=389ms)
[2023-06-05T09:30:51.531Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=388c0ef0-608e-465d-bbd2-8ae88e0fd547)
[2023-06-05T09:30:51.574Z] Executed 'Functions.Orchestrator' (Succeeded, Id=388c0ef0-608e-465d-bbd2-8ae88e0fd547, Duration=47ms)
[2023-06-05T09:30:51.665Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=66206934-9d82-4fbf-b83a-d24765b99f2c)
[2023-06-05T09:30:51.674Z] Request URL: 'https://myaccount.blob.core.windows.net/mycontainer/myfile.parquet'
Request method: 'GET'
Request headers:
'x-ms-range': 'REDACTED'
'x-ms-version': 'REDACTED'
'Accept': 'application/xml'
'User-Agent': 'azsdk-python-storage-blob/12.16.0 Python/3.10.11 (Windows-10-10.0.22621-SP0)'
'x-ms-date': 'REDACTED'
'x-ms-client-request-id': 'a9a34d03-0383-11ee-a0ba-f889d283a98c'
'Authorization': 'REDACTED'
No body was attached to the request
[2023-06-05T09:30:51.889Z] Response status: 206
Response headers:
'Content-Length': '33554432'
'Content-Type': 'application/octet-stream'
'Content-Range': 'REDACTED'
'Last-Modified': 'Thu, 01 Jun 2023 08:00:30 GMT'
'Accept-Ranges': 'REDACTED'
'ETag': '"0x8DB627644CFEA3E"'
'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
'x-ms-request-id': '9ae77aaa-d01e-001e-1290-972efb000000'
'x-ms-client-request-id': 'a9a34d03-0383-11ee-a0ba-f889d283a98c'
'x-ms-version': 'REDACTED'
'x-ms-creation-time': 'REDACTED'
'x-ms-blob-content-md5': 'REDACTED'
'x-ms-lease-status': 'REDACTED'
'x-ms-lease-state': 'REDACTED'
'x-ms-blob-type': 'REDACTED'
'Content-Disposition': 'REDACTED'
'x-ms-server-encrypted': 'REDACTED'
'Date': 'Mon, 05 Jun 2023 09:30:50 GMT'
where the part responsible, probably, for reading the parquet from Blob repeats many times.
Then it either states a Success but I get no output in my runtime/webhook durabletask or continues to endlessly:
[2023-06-05T09:32:19.050Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=22e8638a-ea0b-43ef-b039-b72917b47216)
[2023-06-05T09:32:25.231Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=4ac12c65-7b65-4c6d-954d-e74930ba0ab9)
[2023-06-05T09:32:30.864Z] Executed 'Functions.ActivityFunction1' (Succeeded, Id=66206934-9d82-4fbf-b83a-d24765b99f2c, Duration=99199ms)
[2023-06-05T09:32:51.586Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=d6740019-b687-49fa-b047-f5c5f4ecb95d)
[2023-06-05T09:32:54.766Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=7c421729-5e8a-45f3-9a25-2be647b1b8bd)
[2023-06-05T09:33:22.473Z] Error writing message type InvocationRequest to workerId: 499a8573-4cb0-4e06-824d-ca721786c04e
[2023-06-05T09:33:22.474Z] Grpc.AspNetCore.Server: Request aborted while sending the message.
[2023-06-05T09:35:23.431Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=81b99052-e35f-4179-a332-1c3114fc2191)
[2023-06-05T09:35:23.489Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=86d5b3aa-fc9b-4ffd-b808-4c775062c441)
[2023-06-05T09:35:30.044Z] Executing 'Functions.ActivityFunction2' (Reason='(null)', Id=e8b7fd1f-3fae-4db6-b980-e31fb5c3a911)
[2023-06-05T09:35:31.555Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=23197fcf-0235-4044-8482-aa534bfb229c)
[2023-06-05T09:35:41.798Z] Executing 'Functions.ActivityFunction2' (Reason='(null)', Id=03162147-f450-46d0-adc5-1167e1033f61)
- At the moment I convert the result of ActivityFunction1 from dataframe to dict so I pass it to the ActivityFunction2 - is it possible to pass a pandas DataFrame instead of the dictionary?
I feel that I messed something here, maybe ActivityFunction1 shouldn't be defined as an activity function but within the orchestrator? I'm not sure how to continue with it and would appreciate any help.
EDIT: I also tried with a slightly different approach, with Orchestrator created like this:
Alternative Orchestrator
import logging
import json
import os
import pandas as pd
import azure.functions as func
import azure.durable_functions as df
from azure.storage.blob import BlobServiceClient
import io
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms = requestBody['search_terms']
num_results = requestBody['num_results']
params = requestBody['params']
# Call ActivityFunction1 to read the Parquet file
blob_data = yield context.call_activity("ActivityFunction1", None)
# Call ActivityFunction2 to perform matching
activityVar = {
'blob_data': blob_data,
'search_terms': search_terms,
'num_matches': num_results
}
results = yield context.call_activity("ActivityFunction2", activityVar)
response = {
"success": 1,
"error_code": 0,
**params,
"results": results
}
return response
main = df.Orchestrator.create(orchestrator_function)
And for loop in ActivityFunction2:
Alternative ActivityFunction2
def main(activityVar: dict) -> dict:
'''
Matching function
'''
start_time = time.time()
clean_list = activityVar['blob_data']
print('Starting matching...')
start_time_matching = time.time()
results = []
messy_list = activityVar['search_terms']
for i in tqdm(range(len(messy_list))):
term = messy_list[i]
...
Which works locally but fails due to 137 error when deployed to Azure, even when I use a Premium Plan with 14 GB RAM.
For question 2: No you cannot pass a dataframe from Activity1 to Activity2. What you are doing is correct that is convert to dict and pass. Also, the logic in Activity1 should not be inside the Orchestrator. It should be separate as you have done. The parquet file may be 200MB but when converted to a dataframe, it will be more.
I found some online documentation regarding out of memory errors in Durable functions and some of the suggestions may help. It seems it has something to do with Orchestrator replays. But I cannot reproduce the error as I do not have a sample of the parquet file to work with. I have never encountered this error myself.
Have a look at this question and its proposed solution.