How to confirm if metadata from azure blob file matches given values?

47 Views Asked by At

I'm trying to access the metadata of some files stored in azure blob storage programmatically, specifically using the python SDK, the metadata should be set to the values of the source blob properties, so that it can be used to verify if it's an up to date copy in another container.

When I try to compare this so that the function either returns the existing copy or proceeds with a new translation if this is out of date, it currently breaks the function and returns a NoneType Error, even though the code has an else clause to allow it to move on to the next piece of functionality instead.

This is my current code example:

from translate.functions import readBlob, fileType
import requests
import json
import time
import logging
import os
import azure.functions as func
from datetime import datetime, timedelta
from azure.storage.blob import BlobClient
from pathlib import Path
from azure.storage.blob import BlobServiceClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    endpoint = os.environ.get("TRANSLATE_ENDPOINT")
    key = os.environ.get("TRANSLATE_KEY")
    path = 'translator/text/batch/v1.1/batches'
    constructed_url = endpoint + path
    container_name = os.environ.get("TR_CONTAINER_NAME")
    account_name = os.environ.get("ACCOUNT_NAME")
    target_url = os.environ.get("TARGET_URL")
    account_key = os.environ.get("ACCOUNT_KEY")
    connection_string = os.environ.get("STORAGE_CONNECTION_STRING")
    output_metadata = {}

    try:
        body = req.get_body()
        logging.info('Received request body')
        request_data = json.loads(body.decode('utf-8'))

        inputs = request_data.get('inputs', [])
        input_data = inputs[0]
        # extract source url from data
        source_url = input_data.get('source', {}).get('sourceUrl')

        # Extract language from the targets list
        targets = input_data.get('targets', [])

        if not targets:
            targets = []

        if targets:
            language_code = targets[0].get('language')
            list_of_languages = ["af", "sq", "am", "ar", "hy", "as", "az", "bn", "ba", "eu", "bho", "brx", "bs", "bg",
                                 "yue", "ca", "lzh", "zh-Hans", "zh-Hant", "sn", "hr", "cs", "da", "prs", "dv", "doi",
                                 "nl", "en", "et", "fo", "fj", "fil", "fi", "fr", "fr-ca", "gl", "ka", "de", "el", "gu",
                                 "ht", "ha", "he", "hi", "mww", "hu", "is", "ig", "id", "ikt", "iu", "iu-Latn", "ga",
                                 "it", "ja", "kn", "ks", "kk", "km", "rw", "tlh-Latn", "tlh-Piqd", "gom", "ko", "ku",
                                 "kmr", "ky", "lo", "lv", "lt", "ln", "dsb", "lug", "mk", "mai", "mg", "ms", "ml", "mt",
                                 "mi", "mr", "mn-Cyrl", "mn-Mong", "my", "ne", "nb", "nya", "or", "ps", "fa", "pl", "pt",
                                 "pt-pt", "pa", "otq", "ro", "un", "ru", "sm", "sr-Cyrl", "sr-Latn", "st", "nso", "tn",
                                 "sd", "si", "sk", "sl", "so", "es", "sw", "sv", "ty", "ta", "tt", "te", "th", "bo", "ti",
                                 "to", "tr", "tk", "uk", "hsb", "ur", "ug", "uz", "vi", "cy", "xh", "yo", "yua", "zu"]
            if language_code not in list_of_languages:
                logging.error(f"Language code {language_code} not supported")
                raise ValueError("Language not Supported")

        else:
            language_code = None
            logging.error("Language code not found")
            raise ValueError("Language code not found")

        last_slash_index = source_url.rfind('/')

        if last_slash_index == -1:
            raise ValueError("An invalid source url was provided")

        # Extract the part after the last slash
        file_path = Path(source_url)

        if file_path.suffix == '':
            raise ValueError("The provided file name does not have a file extension")

        # Insert language code before the last dot
        new_filename = f"{file_path.stem}_{language_code}{file_path.suffix}"
        final_url = target_url + new_filename

        # Get Input Blob Properties 
        blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        blob_client = blob_service_client.get_blob_client(container="km3-data", blob=file_path.name)
        
        blob_properties = blob_client.get_blob_properties()

        # Access properties
        input_last_modified = blob_properties.last_modified
        input_etag = blob_properties.etag
        logging.info(f"Input Last Modified: {blob_properties.last_modified}")
        logging.info(f"Input ETag: {blob_properties.etag}")
        for key, value in blob_properties.metadata.items():
            logging.info(f"{key}: {value}")

        # Create output metadata dictionary
        output_metadata = {
            "etag": input_etag,
            "last_modified": str(input_last_modified)
        }
        # Check if the blob already exists
        conn_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
        blob = BlobClient.from_connection_string(conn_str=conn_str, container_name=container_name, blob_name=new_filename, metadata=output_metadata)
        exists = blob.exists()
        logging.info(exists)
        
        if exists:
                # If the blob exists, check metadata
                    # Retrieve existing metadata
            blob_metadata_response = blob.get_blob_properties()
            if blob_metadata_response is not None:
                blob_metadata = blob_metadata_response.metadata

                # Check if metadata exists
                if blob_metadata is not None:
                    metadata_last_modified = blob_metadata.get("last_modified")
                    metadata_etag = blob_metadata.get("etag")

                    if metadata_last_modified == output_metadata["last_modified"] and metadata_etag == output_metadata["etag"]:
        
                        # If metadata matches return the final URL
                            logging.info(final_url)
                            return final_url

        else:
            logging.info("Metadata does not match, creating new translation")
            try:
                blob_client = blob_service_client.get_blob_client(container=container_name, blob=new_filename)
                blob_client.delete_blob()
            except:    
            # Create the new JSON body
                new_request_data = {
                        "inputs": [
                            {
                                "storageType": "File",
                                "source": {
                                    "sourceUrl": source_url,
                                    "storageSource": "AzureBlob"
                                },
                                "targets": [
                                    {
                                        "targetUrl": final_url,
                                        "storageSource": "AzureBlob",
                                        "category": "general",
                                        "language": language_code
                                    }
                                ]
                            }
                        ]
                    }

            # Make the POST request with the new JSON body
            headers = {
                'Ocp-Apim-Subscription-Key': key,
                'Content-Type': 'application/json',
                'ocp-Apim-subscription-reigon': 'northeurope'
            }

            response = requests.post(constructed_url, headers=headers, json=new_request_data)
            logging.info(f"Sending response via POST {response}")

            response_headers = response.headers

            logging.info(f'response status code: {response.status_code}\nresponse status: {response.reason}\n\nresponse headers:\n')

            if response.status_code == 202:
                # Extract the operation location from the response headers
                operation_location = response_headers.get('Operation-Location')

                # Loop until the translation is completed (status is not 'NotStarted' or 'Running')
                start_time = datetime.now()

                while True:
                    # Make a GET request to the operation location to get the status
                    operation_response = requests.get(operation_location, headers=headers)
                    operation_data = operation_response.json()
                    logging.info(operation_response)

                    # Check if the status is 'Succeeded' or 'Failed'
                    if operation_data['status'] in ['Succeeded', 'Failed']:
                        break

                    elapsed_time = datetime.now() - start_time
                    if elapsed_time > timedelta(minutes=3):
                        logging.error("Translation operation timed out.")
                        break

                    time.sleep(5)

                # Retrieve and download the last created blob
                output_file = readBlob.list_and_download_blobs(container_name, conn_str)
                fileType.add_disclaimer(output_file)

                logging.info("Disclaimer added")
                readBlob.reuploadBlob(container_name, account_name, output_file, account_key)
                logging.info(f"{output_file} uploaded")

                output_path = Path(output_file)

                return f"https://{account_name}.blob.core.windows.net/{container_name}/{output_path.name}"

            else:
                logging.error(f"Translation request failed. Status code: {response.status_code}, Reason: {response.reason}")
                return func.HttpResponse(f"Translation request failed. Status code: {response.status_code}", status_code=500)

    except Exception as e:
        # Handle exceptions
        logging.error(f"An error occurred: {str(e)}")
        return func.HttpResponse(f"Error processing the request. {str(e)}", status_code=500)


The issue is at line 116, when that's commented out the code runs just fine, and checks whether the blob already exists and runs on if not.

How do I fix this so that it compares the user defined metadata of the blob against the properties being created?

EDIT: It's no-longer failing when the metadata does match, but fails out with a NoneType error when it doesn't, isntead of moving onto the next block

0

There are 0 best solutions below