Making Downloaded Files a Parameter for Subsequent Functions in Django, Celery, and Digital Ocean Spaces Integration

27 Views Asked by At

How can I utilize Django with Celery to handle a scenario where a function downloads a file from Digital Ocean Spaces, and once the download is complete, I want to pass the downloaded file as a parameter to the StorageContext function?

Something like, whenever it has the file, then proceed.

Here is what i've tried:

@celery_app.task()
def download_file_from_digital_ocean(folder_key, local_folder):
    try:
        session = boto3.session.Session()
        client = session.client(
            "s3",
            region_name=settings.AWS_S3_REGION_NAME,
            endpoint_url=settings.AWS_S3_ENDPOINT_URL,
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
        )

        object_list = [
            f"test/{folder_key}/docstore.json",
            f"test/{folder_key}/graph_store.json",
            f"test/{folder_key}/index_store.json",
            f"test/{folder_key}/vector_store.json",
        ]

        subfolder_path = os.path.join(local_folder, folder_key)
        os.makedirs(subfolder_path, exist_ok=True)

        for file_path in object_list:
            file_name = file_path.split("/")[-1]
            local_file_path = os.path.join(subfolder_path, file_name)
            client.download_file(
                Bucket=settings.AWS_STORAGE_BUCKET_NAME,
                Key=file_path,
                Filename=local_file_path,
            )

        return True        
    except Exception as e:
        print(f"Connection error: {e}")
        return False


@celery_app.task()
def llma_index_new_update(url, url_text, question):
    try:
        API_KEY = settings.APP_OPEN_AI_API_KEY

        url_id = url["id"]

        openai.api_key = API_KEY

        folder_key = f"storage{url_id}"
        local_folder = "local_test/"
        download_task = download_file_from_digital_ocean.delay(folder_key, local_folder)

        storage_context = StorageContext.from_defaults(
        persist_dir="local_test/storage" + str(url["id"])
        )
        index = load_index_from_storage(storage_context, index_id=url["url"])
        query_engine = index.as_query_engine(response_mode="tree_summarize")

        response = query_engine.query(question)

        data = {
            "url": url_instance,
            "url_asked": url_text,
            "question": question,
            "response": response.response,
        }

        return_data = {
            "url_asked": url_text,
            "question": question,
            "response": response.response,
        }

        save_db = save_to_db_question(data)

        return return_data


    except Exception as e:
        print(e)

any idea on how can I implement this?

0

There are 0 best solutions below