I have an activated AWS service called AWS Cost and Usage Report(CUR), which generates on an Hourly basis.
The format of the CUR is in parquet format.
I have a Python code that takes the parquet and reads it.
The issue is that there is a difference in total cost calculated fetched from the parquet file and showing in AWS Billing Dashboard.
The total cost difference is not small but very huge, from example EC2 in Billing Dashboard shows a cost of around $12,000, but the cost calculated from CUR is roughly $4500.
Here is my Python code:
import sys
import time
from fastparquet import ParquetFile
import json
import datetime
from collections import defaultdict
tags_key = 'resource_tags_'
def read_parquet_file(parquet_file_path):
start_time_rf = time.time()
dataset = ParquetFile(parquet_file_path)
# Convert the Parquet table to a Pandas DataFrame
df = dataset.to_pandas()
resource_tags_columns = [col for col in df.columns if col.startswith(tags_key)]
# print(resource_tags_columns)
relevant_columns = ['identity_line_item_id', 'product_sku', 'line_item_line_item_type',
'line_item_resource_id', 'line_item_product_code', 'product_region',
'bill_billing_period_start_date', 'bill_billing_period_end_date',
'line_item_usage_start_date', 'line_item_usage_end_date',
'product_usagetype'] + resource_tags_columns + ['line_item_unblended_cost']
df = df[relevant_columns]
# Perform the aggregation directly without grouping if possible
grouped_data = df.groupby(relevant_columns[:-1])['line_item_unblended_cost'].sum()
# Sort the grouped data by line_item_product_code
grouped_data = grouped_data.sort_index(level='line_item_product_code')
# Record the end time
end_time_rf = time.time()
# Calculate the elapsed time
elapsed_time = end_time_rf - start_time_rf
# print(f'Time taken to read the Parquet file: {elapsed_time} seconds')
return grouped_data, resource_tags_columns
def parse_data(grouped_data, resource_tags_columns):
start_time_rf = time.time()
# Print the total cost for each 'identity_line_item_id'
final_sum = 0
tax = 0
usage = 0
# grouping data based in username
final_grouped_data = defaultdict(list)
def convert_cost(cost):
try:
# Attempt direct string formatting for potential efficiency
return f"{cost:.{len(str(cost).split('.')[-1])}f}"
except ValueError:
# Handle cases where direct formatting fails (e.g., scientific notation)
exponent_str = str(cost)
base, power = exponent_str.split('e')
decimal = len(base.split('.')[1]) if len(base.split('.')) > 1 else 0
power = abs(int(power))
return f"{cost:.{power + decimal}f}"
for (identity_line_item_id,product_sku,line_item_line_item_type,line_item_resource_id,line_item_product_code,
product_region ,bill_billing_period_start_date,
bill_billing_period_end_date, line_item_usage_start_date, line_item_usage_end_date,
product_usagetype, *resource_tags),total_cost in grouped_data.items():
final_sum += total_cost
if (line_item_line_item_type == 'Tax' ):
tax += total_cost
elif (line_item_line_item_type == 'Usage'):
usage += total_cost
entry = {
'identityLineItemId': identity_line_item_id,
'line_item_resource_id': line_item_resource_id,
'totalCost': convert_cost(total_cost),
'product_sku': product_sku,
'region': product_region,
'line_item_line_item_type': line_item_line_item_type,
'line_item_product_code':line_item_product_code,
'product_usagetype': product_usagetype,
'start_date': datetime.datetime.strftime(bill_billing_period_start_date, '%Y-%m-%d %H:%M:%S'),
'end_date': datetime.datetime.strftime(bill_billing_period_end_date,'%Y-%m-%d %H:%M:%S'),
'usage_start_date':datetime.datetime.strftime(line_item_usage_start_date, '%Y-%m-%d %H:%M:%S'),
'usage_end_date':datetime.datetime.strftime(line_item_usage_end_date, '%Y-%m-%d %H:%M:%S'),
}
final_grouped_data[line_item_product_code].append(entry)
for col, tag_key in enumerate(resource_tags_columns):
if tag_value := resource_tags[col]:
entry[tag_key] = tag_value
def groupByRegion(data):
group_by_region = defaultdict(list) # Use defaultdict for efficient handling of new keys
for entry in data:
if region := entry.get('region'):
group_by_region[region].append(entry)
return group_by_region
def groupByIdentityLineItemId(data):
obj = {}
for entry in data:
# Extract values
identity_id = entry["identityLineItemId"]
total_cost = float(entry["totalCost"])
# Add to existing entry or create a new one
if identity_id in obj:
obj[identity_id]["total_cost"] = convert_cost(float(obj[identity_id]["total_cost"]) + total_cost)
else:
obj[identity_id] = {
"total_cost": convert_cost(total_cost),
"identityLineItemId": entry['identityLineItemId'],
"line_item_resource_id": entry['line_item_resource_id'],
"region": entry['region'],
"line_item_product_code": entry['line_item_product_code'],
"product_usagetype": entry['product_usagetype']
}
for key, value in entry.items():
if key.startswith(tags_key):
obj[identity_id][key] = value
return obj
def groupByProductUsageType(data, type):
group_by_type = defaultdict(list) # Use defaultdict for efficient handling of new keys
for entry in data:
if type == 'AmazonEC2':
if 'EBS' in entry['product_usagetype']:
group_by_type['EBS'].append(entry)
elif 'AmazonEC2Stopped' in entry['product_usagetype']:
group_by_type['AmazonEC2Stopped'].append(entry)
else:
group_by_type['AmazonEC2Running'].append(entry)
# group_by_type['EBS' if 'EBS' in entry['product_usagetype'] else 'AmazonEC2Running'].append(entry)
else:
group_by_type[entry['product_usagetype']].append(entry)
return getCostByproductUsageType(group_by_type)
def getCostByRegion(data, type):
group_by_region = defaultdict(list) # Use defaultdict for efficient handling
for region, entries in data.items():
region_wise_sum = sum(float(entry['totalCost']) for entry in entries) # Sum total cost using generator expression
details = groupByProductUsageType(entries, type) # Group by product usage type
usage_sum = 0
for entry in entries:
if entry['line_item_line_item_type'] == 'Usage':
usage_sum += float(entry['totalCost'])
group_by_region[region].append({'total_cost': convert_cost(region_wise_sum),
'usage_cost': convert_cost(usage_sum), 'details': details})
return group_by_region
def getCostByproductUsageType(data):
group_by_type = defaultdict(list)
for key, value in data.items():
regionWiseSum = sum(float(entry['totalCost']) for entry in value) # Efficient summation
# Group and convert cost within the loop for clarity
group_by_type[key].append({
f'{key}_total_cost': convert_cost(regionWiseSum),
f'{key}_details': groupByIdentityLineItemId(value)
})
return group_by_type
def get_cost_each_line_item_product(grouped_data):
cost_details = {
'details': {},
'cost_details': {
'total_cost': f'{final_sum:.2f}',
'tax_cost': f'{tax:.2f}',
'usage_cost': f'{usage:.2f}'
}
}
for key, data in grouped_data.items():
total_cost = sum(float(entry['totalCost']) for entry in data)
tax_service = sum(float(entry['totalCost']) for entry in data if entry['line_item_line_item_type'] == 'Tax')
usage_service = sum(float(entry['totalCost']) for entry in data if entry['line_item_line_item_type'] == 'Usage')
region_wise_details = groupByRegion(data)
cost_details['details'][key] = {
'total_cost': convert_cost(total_cost),
'usage_cost': convert_cost(usage_service),
'tax_cost': convert_cost(tax_service),
'region_wise_details': getCostByRegion(region_wise_details, key)
}
formatted_json = json.dumps(cost_details, indent=2, ensure_ascii=False)
# with open('final_code_cpy.json', 'w') as json_file:
# json.dump(cost_details, json_file)
return formatted_json
data = get_cost_each_line_item_product(final_grouped_data)
print(data)
# Record the end time
end_time_rf = time.time()
elapsed_time = end_time_rf - start_time_rf
print(f'Time taken to scan the Parquet file: {elapsed_time} seconds')
if __name__ == '__main__':
if len(sys.argv) < 2:
sys.exit(1)
parquet_file_path = sys.argv[1]
grouped_data, resource_tags_columns = read_parquet_file(parquet_file_path)
parse_data(grouped_data, resource_tags_columns)
Note:
parquetfile that I am using is latestparquetfile.