Create tables in Databricks using FastAPI - Python code

90 Views Asked by At

I am developing a FastApi which suppose to do some calculations based on a request in JSON format and then sends the response and stores it in several Databricks catalog tables.

So, in the API, I convert the response and also create the tables

What I am struggling with is what would be the correct databricks API endpoint that I should connect to?

As you can see from the code below, I defined:

url = f"{self.databricks_host}/api/2.0/sql/createTable"

but it is not working.

def send_to_dtb_catalog(self, df, table_name):
        # doing some stuff here ....

        # Prepare data payload for Databricks API
        data = {
            "tableName": f"my_database.my_schema.{table_name}",
            "data": df_json
        }   

        # Make HTTP request to Databricks REST API
        # suppose databricks_host and databricks_token are pre-defined 
        url = f"{self.databricks_host}/api/2.0/sql/createTable"

        headers = {
            "Authorization": f"Bearer {self.databricks_token}",
            "Content-Type": "application/json"
        }

        response = requests.post(url, headers=headers, json=data)

Then I will use send_to_dtb_catalog to send the created tables to Databricks catalog tables, something like this

self.send_to_dtb_catalog(table1_df, "table1_databricks")
self.send_to_dtb_catalog(table2_df, "table2_databricks")

I appreciate any help as I am new to both Databricks and API development.

1

There are 1 best solutions below

2
JayashankarGS On BEST ANSWER

You can use the following API to execute SQL statements.

Execute a SQL statement

Alter your function like below.

Code:

import requests

def send_to_dtb_catalog(df, table_name):
    url = f"{databricks_host}/api/2.0/sql/statements/"

    headers = {
        "Authorization": f"Bearer {databricks_token}",
        "Content-Type": "application/json"
    }
    sql_q = f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
        id INT,
        name STRING
        )
    '''

    body = {
        "warehouse_id": "a415c87c62c279a5",
        "statement": sql_q,
        "wait_timeout": "30s",
        "on_wait_timeout": "CANCEL"
    }
    response = requests.post(url, headers=headers, json=body)
    if response.json()['status']['state'] == 'SUCCEEDED':
        print("Inserting values....")
    
        t = df.rdd.map(lambda row: tuple(row)).collect()
        insert_query = f'''
        INSERT INTO {table_name}
        VALUES
        {','.join(map(str, t))}
        '''
    
        body['statement'] = insert_query

        res2 = requests.post(url, headers=headers, json=body)
    return res2

Next, call your function.

Output:

enter image description here

Output of API request:

enter image description here

One more way is using drivers to connect to Databricks.

Refer this on how to connect to the server and execute queries.