Uploading a custom entitydef to Purview with no separate column entities

417 Views Asked by At

I want to make a custom entitydef for a dataframe with columns. I want the columns to be visible & clickable inside the 'schema' tab within the dataframe entity in Purview. I have found this code on the pyapacheatlas github repo, which does almost exactly what I want. This is the code:

type_spark_df = EntityTypeDef(
  name="custom_spark_dataframe",
  attributeDefs=[
    AtlasAttributeDef(name="format")
  ],
  superTypes = ["DataSet"],
  options = {"schemaElementAttribute":"columns"}
 )
type_spark_columns = EntityTypeDef(
  name="custom_spark_dataframe_column",
  attributeDefs=[
    AtlasAttributeDef(name="data_type")
  ],
  superTypes = ["DataSet"],
)

spark_column_to_df_relationship = RelationshipTypeDef(
  name="custom_spark_dataframe_to_columns",
  relationshipCategory="COMPOSITION",
  endDef1={
          "type": "custom_spark_dataframe",
          "name": "columns",
          "isContainer": True,
          "cardinality": "SET",
          "isLegacyAttribute": False
      },
  endDef2={
          "type": "custom_spark_dataframe_column",
          "name": "dataframe",
          "isContainer": False,
          "cardinality": "SINGLE",
          "isLegacyAttribute": False
      }
)

typedef_results = client.upload_typedefs(
  entityDefs = [type_spark_df, type_spark_columns ],
  relationshipDefs = [spark_column_to_df_relationship],
  force_update=True)
print(typedef_results)

In the part above I make some custom entities and upload them to Purview.

df = spark.read.csv("/databricks-datasets/flights/departuredelays.csv",header=True, inferSchema=True)

atlas_input_df = AtlasEntity(
  name="demo_dbfs_delays_data",
  qualified_name = "pyapacheatlas://demo_dbfs_delays_data",
  typeName="custom_spark_dataframe",
  guid=guid.get_guid(),
)

atlas_input_df_columns = []
for column in df.schema:
  temp_column = AtlasEntity(
    name = column.name,
    typeName = "custom_spark_dataframe_column",
    qualified_name = "pyapacheatlas://demo_dbfs_delays_data#"+column.name,
    guid=guid.get_guid(),
    attributes = {"data_type":str(column.dataType)},
    relationshipAttributes = {"dataframe":atlas_input_df.to_json(minimum=True)}
  )
  atlas_input_df_columns.append(temp_column)

batch = [atlas_input_df] + atlas_input_df_columns

client.upload_entities(batch=batch)

Then I fill the entities (the dataframe and the columns) in with some data and upload them to Purview.

The result is this, a dataframe entity with an entity for every single column: enter image description here

This is not desirable, because if I am going to upload multiple dataframes with multiple columns, the data catalog is going to be chaotic.

I want to achieve this, clickable columns inside the schema of the dataframe, but without having separate columns uploaded to Purview:

enter image description here

I tried simply removing the + atlas_input_df_columns in the batch variable, but that results in no schema at all for the dataframe.

Any suggestions please?

1

There are 1 best solutions below

1
Harish On

I know this is a very late response, however it might help for someone who is looking for similar issue. Below sample worked for me.

ts = AtlasEntity(
   name="demotabschema",
   typeName="tabular_schema",
   qualified_name="pyapache://demotabschema",
   guid = -2
)
# Create a Column entity that references your tabular schema
col01 = AtlasEntity(
  name="democolumn1",
  typeName="column",
  qualified_name="pyapche://mycolumn1",
  guid=-3,
  attributes={
    "type":"string",
    "description": "Col1"
  },
relationshipAttributes = {
    "composeSchema": ts.to_json(minimum=True)
  }
  )
col02 = AtlasEntity(
  name="democolumn2",
  typeName="column",
  qualified_name="pyapche://mycolumn2",
  guid=-4,
  attributes={
    "type":"string",
    "description": "Col2"
  },
  relationshipAttributes = {
    "composeSchema": ts.to_json(minimum=True)
  }
)
colsarray = [col01.to_json(),col02.to_json()]

# Create a resource set that references the tabular schema
rs = AtlasEntity(
  name="demoresourceset",
  typeName="azure_datalake_gen2_resource_set",
  qualified_name="pyapache://demors",
  guid = -1,
  relationshipAttributes = {
    "tabular_schema": ts.to_json(minimum=True)
  }
)

# Upload entities
results = self.client.upload_entities(
 [rs.to_json(), ts.to_json()]+ colsarray
)
# Print out results to see the guid assignemnts
print(json.dumps(results, indent=2))