I am trying to read and use the data that has come from an Unpack content processor with my python script. I know that I am supposed to use an Execute script processor to run the python script but I am struggling to understand:
- How to format my Python code to find the path of the unzipped file in Nifi?
- How to format my Python code to set the output path of the processed file in Nifi?
This is what my current Nifi dataflow looks like: enter image description here
#!/usr/bin/env python
# coding: utf-8
from sysconfig import get_python_version
import pandas as pd
import os
from pathlib import Path
get_python_version().run_line_magic('pip', 'install pyarrow')
# Specifying the path of the unzipped insights file
path = Path(r'C:\Users\IT Admin\Desktop\Parquet_to_CSV_V2\insights_2023-06-29')
os.chdir(path)
cwd = Path.cwd()
cwd
# Adding the paths of every parquet file under the "complex_relations" folder to a list
target_dir = cwd / "complex_relation"
pq_files = []
for file in target_dir.rglob("*.parquet*"):
pq_files.append(file)
# A loop for adding all the parquet files to one parquet file
data_frames=[]
for parquet in pq_files:
df = pd.read_parquet(parquet)
data_frames.append(df)
concatenated_df = pd.concat(data_frames)
# Specifying the combined parquet file to a specific file_path
output_path = r'C:\Users\IT Admin\Desktop\Parquet_to_CSV_V2\output\complex_relations.parquet'
concatenated_df.to_parquet(output_path, engine = 'pyarrow')
compliled_pq = r'C:\Users\IT Admin\Desktop\Parquet_to_CSV_V2\output\complex_relations.parquet'
pd.read_parquet(compliled_pq, engine = "auto")