I'm trying to access the metadata of some files stored in azure blob storage programmatically, specifically using the python SDK, the metadata should be set to the values of the source blob properties, so that it can be used to verify if it's an up to date copy in another container.
When I try to compare this so that the function either returns the existing copy or proceeds with a new translation if this is out of date, it currently breaks the function and returns a NoneType Error, even though the code has an else clause to allow it to move on to the next piece of functionality instead.
This is my current code example:
from translate.functions import readBlob, fileType
import requests
import json
import time
import logging
import os
import azure.functions as func
from datetime import datetime, timedelta
from azure.storage.blob import BlobClient
from pathlib import Path
from azure.storage.blob import BlobServiceClient
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
endpoint = os.environ.get("TRANSLATE_ENDPOINT")
key = os.environ.get("TRANSLATE_KEY")
path = 'translator/text/batch/v1.1/batches'
constructed_url = endpoint + path
container_name = os.environ.get("TR_CONTAINER_NAME")
account_name = os.environ.get("ACCOUNT_NAME")
target_url = os.environ.get("TARGET_URL")
account_key = os.environ.get("ACCOUNT_KEY")
connection_string = os.environ.get("STORAGE_CONNECTION_STRING")
output_metadata = {}
try:
body = req.get_body()
logging.info('Received request body')
request_data = json.loads(body.decode('utf-8'))
inputs = request_data.get('inputs', [])
input_data = inputs[0]
# extract source url from data
source_url = input_data.get('source', {}).get('sourceUrl')
# Extract language from the targets list
targets = input_data.get('targets', [])
if not targets:
targets = []
if targets:
language_code = targets[0].get('language')
list_of_languages = ["af", "sq", "am", "ar", "hy", "as", "az", "bn", "ba", "eu", "bho", "brx", "bs", "bg",
"yue", "ca", "lzh", "zh-Hans", "zh-Hant", "sn", "hr", "cs", "da", "prs", "dv", "doi",
"nl", "en", "et", "fo", "fj", "fil", "fi", "fr", "fr-ca", "gl", "ka", "de", "el", "gu",
"ht", "ha", "he", "hi", "mww", "hu", "is", "ig", "id", "ikt", "iu", "iu-Latn", "ga",
"it", "ja", "kn", "ks", "kk", "km", "rw", "tlh-Latn", "tlh-Piqd", "gom", "ko", "ku",
"kmr", "ky", "lo", "lv", "lt", "ln", "dsb", "lug", "mk", "mai", "mg", "ms", "ml", "mt",
"mi", "mr", "mn-Cyrl", "mn-Mong", "my", "ne", "nb", "nya", "or", "ps", "fa", "pl", "pt",
"pt-pt", "pa", "otq", "ro", "un", "ru", "sm", "sr-Cyrl", "sr-Latn", "st", "nso", "tn",
"sd", "si", "sk", "sl", "so", "es", "sw", "sv", "ty", "ta", "tt", "te", "th", "bo", "ti",
"to", "tr", "tk", "uk", "hsb", "ur", "ug", "uz", "vi", "cy", "xh", "yo", "yua", "zu"]
if language_code not in list_of_languages:
logging.error(f"Language code {language_code} not supported")
raise ValueError("Language not Supported")
else:
language_code = None
logging.error("Language code not found")
raise ValueError("Language code not found")
last_slash_index = source_url.rfind('/')
if last_slash_index == -1:
raise ValueError("An invalid source url was provided")
# Extract the part after the last slash
file_path = Path(source_url)
if file_path.suffix == '':
raise ValueError("The provided file name does not have a file extension")
# Insert language code before the last dot
new_filename = f"{file_path.stem}_{language_code}{file_path.suffix}"
final_url = target_url + new_filename
# Get Input Blob Properties
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_blob_client(container="km3-data", blob=file_path.name)
blob_properties = blob_client.get_blob_properties()
# Access properties
input_last_modified = blob_properties.last_modified
input_etag = blob_properties.etag
logging.info(f"Input Last Modified: {blob_properties.last_modified}")
logging.info(f"Input ETag: {blob_properties.etag}")
for key, value in blob_properties.metadata.items():
logging.info(f"{key}: {value}")
# Create output metadata dictionary
output_metadata = {
"etag": input_etag,
"last_modified": str(input_last_modified)
}
# Check if the blob already exists
conn_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob = BlobClient.from_connection_string(conn_str=conn_str, container_name=container_name, blob_name=new_filename, metadata=output_metadata)
exists = blob.exists()
logging.info(exists)
if exists:
# If the blob exists, check metadata
# Retrieve existing metadata
blob_metadata_response = blob.get_blob_properties()
if blob_metadata_response is not None:
blob_metadata = blob_metadata_response.metadata
# Check if metadata exists
if blob_metadata is not None:
metadata_last_modified = blob_metadata.get("last_modified")
metadata_etag = blob_metadata.get("etag")
if metadata_last_modified == output_metadata["last_modified"] and metadata_etag == output_metadata["etag"]:
# If metadata matches return the final URL
logging.info(final_url)
return final_url
else:
logging.info("Metadata does not match, creating new translation")
try:
blob_client = blob_service_client.get_blob_client(container=container_name, blob=new_filename)
blob_client.delete_blob()
except:
# Create the new JSON body
new_request_data = {
"inputs": [
{
"storageType": "File",
"source": {
"sourceUrl": source_url,
"storageSource": "AzureBlob"
},
"targets": [
{
"targetUrl": final_url,
"storageSource": "AzureBlob",
"category": "general",
"language": language_code
}
]
}
]
}
# Make the POST request with the new JSON body
headers = {
'Ocp-Apim-Subscription-Key': key,
'Content-Type': 'application/json',
'ocp-Apim-subscription-reigon': 'northeurope'
}
response = requests.post(constructed_url, headers=headers, json=new_request_data)
logging.info(f"Sending response via POST {response}")
response_headers = response.headers
logging.info(f'response status code: {response.status_code}\nresponse status: {response.reason}\n\nresponse headers:\n')
if response.status_code == 202:
# Extract the operation location from the response headers
operation_location = response_headers.get('Operation-Location')
# Loop until the translation is completed (status is not 'NotStarted' or 'Running')
start_time = datetime.now()
while True:
# Make a GET request to the operation location to get the status
operation_response = requests.get(operation_location, headers=headers)
operation_data = operation_response.json()
logging.info(operation_response)
# Check if the status is 'Succeeded' or 'Failed'
if operation_data['status'] in ['Succeeded', 'Failed']:
break
elapsed_time = datetime.now() - start_time
if elapsed_time > timedelta(minutes=3):
logging.error("Translation operation timed out.")
break
time.sleep(5)
# Retrieve and download the last created blob
output_file = readBlob.list_and_download_blobs(container_name, conn_str)
fileType.add_disclaimer(output_file)
logging.info("Disclaimer added")
readBlob.reuploadBlob(container_name, account_name, output_file, account_key)
logging.info(f"{output_file} uploaded")
output_path = Path(output_file)
return f"https://{account_name}.blob.core.windows.net/{container_name}/{output_path.name}"
else:
logging.error(f"Translation request failed. Status code: {response.status_code}, Reason: {response.reason}")
return func.HttpResponse(f"Translation request failed. Status code: {response.status_code}", status_code=500)
except Exception as e:
# Handle exceptions
logging.error(f"An error occurred: {str(e)}")
return func.HttpResponse(f"Error processing the request. {str(e)}", status_code=500)
The issue is at line 116, when that's commented out the code runs just fine, and checks whether the blob already exists and runs on if not.
How do I fix this so that it compares the user defined metadata of the blob against the properties being created?
EDIT: It's no-longer failing when the metadata does match, but fails out with a NoneType error when it doesn't, isntead of moving onto the next block