How can I utilize Django with Celery to handle a scenario where a function downloads a file from Digital Ocean Spaces, and once the download is complete, I want to pass the downloaded file as a parameter to the StorageContext function?
Something like, whenever it has the file, then proceed.
Here is what i've tried:
@celery_app.task()
def download_file_from_digital_ocean(folder_key, local_folder):
try:
session = boto3.session.Session()
client = session.client(
"s3",
region_name=settings.AWS_S3_REGION_NAME,
endpoint_url=settings.AWS_S3_ENDPOINT_URL,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
)
object_list = [
f"test/{folder_key}/docstore.json",
f"test/{folder_key}/graph_store.json",
f"test/{folder_key}/index_store.json",
f"test/{folder_key}/vector_store.json",
]
subfolder_path = os.path.join(local_folder, folder_key)
os.makedirs(subfolder_path, exist_ok=True)
for file_path in object_list:
file_name = file_path.split("/")[-1]
local_file_path = os.path.join(subfolder_path, file_name)
client.download_file(
Bucket=settings.AWS_STORAGE_BUCKET_NAME,
Key=file_path,
Filename=local_file_path,
)
return True
except Exception as e:
print(f"Connection error: {e}")
return False
@celery_app.task()
def llma_index_new_update(url, url_text, question):
try:
API_KEY = settings.APP_OPEN_AI_API_KEY
url_id = url["id"]
openai.api_key = API_KEY
folder_key = f"storage{url_id}"
local_folder = "local_test/"
download_task = download_file_from_digital_ocean.delay(folder_key, local_folder)
storage_context = StorageContext.from_defaults(
persist_dir="local_test/storage" + str(url["id"])
)
index = load_index_from_storage(storage_context, index_id=url["url"])
query_engine = index.as_query_engine(response_mode="tree_summarize")
response = query_engine.query(question)
data = {
"url": url_instance,
"url_asked": url_text,
"question": question,
"response": response.response,
}
return_data = {
"url_asked": url_text,
"question": question,
"response": response.response,
}
save_db = save_to_db_question(data)
return return_data
except Exception as e:
print(e)
any idea on how can I implement this?