Here's my situation:
I have a storage in my Azure account that contains my tables from Dynamics 365 F&O and I have a JSON file with the column's name and type. This is the 'header' file and i have another csv file (can it be 1 or more csv to the same table) with the data.
So, i need to combine these 2 for each table, and then load it to my Fabric Lakehouse. So far, i'm trying to do this using this code:
import json
import os
def get_cdm_files(directory_path):
cdm_files = []
for root, diers, files in os.walk(directory_path):
for file in files:
if file.endswith('.cdm.json'):
cdm_files.append(os.path.join(root, file))
return cdm_files
def load_table_cdm_file(cdm_file_path):
with open(cdm_file_path.replace("abfss://[email protected]/", "/dbfs/mnt/dynamics/")) as f:
cdm_json = json.load(f)
colss = []
for item in cdm_json['definitions'][0]['hasAttributes']:
colss.append(item["name"])
return spark.read.csv(cdm_file_path.replace("cdm.json", ".csv"), header=False, inferSchema=True)
def load_all_tables(cdm_files):
tables = {}
for cdm_file in cdm_files:
table_name = cdm_file.split("/")[-1].replace(".cdm.json", "").lower()
tables[table_name] = load_table_cdm_file(cdm_file)
return tables
def write_table_delta(table_name, table_df):
spark.sql(f"DROP TABLE IF EXISTS Lakehousename.Dynamics365_{table_name}")
table_df.write.mode("overwrite").format("delta").saveAsTable(f"Dynamics365_{table_name}")
def main():
cdm_files = get_cdm_files("abfss://[email protected]/domainname.operations.dynamics.com/Tables/")
if "TABLENAME1.cdm.json" in cdm_files:
cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLENAME1.cdm.json")
if "TABLENAME2.cdm.json" in cdm_files:
cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLENAME2.cdm.json")
if "TABLE3.cdm.json" in cdm_files:
cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLE3.cdm.json")
tables = load_all_tables(cdm_files)
for table_name, table_df in tables.items():
write_table_delta(table_name, table_df)
I tried looking for guides but as it's a new thing isn't there much to search for, even the AI could help at all.
Alter each of your functions as below.
get_cdm_files
will get
.cdm.jsonfiles.Next,
load_table_cdm_file
For reading csv files using the schema in json files.
There is no change in load_all_tables keep it has it is. Now writing the table to lakehouse, if you are using notebook in lakehouse itself the write_table_delta function works fine.
Or
if you are using notebook in databricks use below code to write. Before running this code make sure to check mark Enable credential passthrough for user-level data access under Advanced option.
Copy
abfsspath to the lakehouse table.Go to properties of the table and copy the path, it is something similar to below one.
abfss://<kjfneldqw>@msit-onelake.dfs.fabric.microsoft.com/<6382ey398e>/Tableswrite_table_delta
Now run your main code.
Output:
and in lakehouse.
Again you can read this table using
spark.read.format("delta").load("abfss_path")providing lakehouse abfss table path.