I'm currently working on an academic project which requires the use of airflow. Currently, I'm only dealing with one database table therefore I require some assistance to know if I'm going in the correct direction.
I have a .py file called data_transformation.py:
def get_df_from_db():
conn = pymysql.connect(host='HOST',
user='USER',
password='PW',
database='DB')
query = "SELECT * FROM table"
df = pd.read_sql(query, conn)
df.reset_index(inplace=True)
df.drop('index', axis=1, inplace=True)
connection.close()
return df
def clean_colorTypes(df):
df['colorType'] = df['colorType'].apply(lambda x: x if x in ['Red', 'Blue', 'Orange'] else 'Others')
df = df[df['numOfButtons'] <= 2]
return df
def add_yearsSinceManufactured(df):
df['yearsSinceManu'] = df['yearListed'] - df['yearManufactured']
return df
def add_to_db(df):
#create connection to mysql using create_engine
df.to_sql(name = 'cleaned', con = conn, if_exists = 'append', index = False)
return df
My idea for my airflow dag file is:
- execute get_df_from_db
- execute clean_colorTypes and add_yearsSinceManufactured at the same time since they are not dependent
- once step 2 is done, execute add_to_db
However, I'm not sure if this approach can be done since I am passing in df in every function.
I'd do it the way it was already suggested, either:
have the DB do all the heavy lifting, each query as a separate task, with staging tables in between, or;
if you're adamant, use Pandas, save each transformation to a CSV.
Airflow DB (XCOM) is simply not suitable for holding large data, it is simply a feature for sending data from task to task.
The DB route would still be an ETL pipeline: