Databricks can't find a csv file inside a wheel I installed when running from a Databricks Notebook

67 Views Asked by At

I am learning Spark, so as a task we had to create a wheel locally and later install it in Databricks (I am using Azure Databricks), and test it by running it from a Databrick Notebook. This program involves reading a CSV file (timezones.csv) included inside the wheel file. The file is inside the wheel (I checked it) and also the wheel works properly when I install it and run it from a local PC Jupyter Notebook. However, when I install it in Databricks Notebook it gives this error, as you can see below in the snapshot:

[PATH_NOT_FOUND] Path does not exist: dbfs:/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv. SQLSTATE: 42K03
File <command-3771510969632751>, line 7
      3 from pyspark.sql import SparkSession
      5 spark = SparkSession.builder.getOrCreate()
----> 7 flights_with_utc = aniade_hora_utc(spark, flights_df)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/agregaciones.py:25, in aniade_hora_utc(spark, df)
     23 path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
     24 #path_timezones = str(Path("resources") / "timezones.csv")
---> 25 timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
     27 # Concateno los datos de las columnas del timezones_df ("iata_code","iana_tz","windows_tz"), a la derecha de
     28 # las columnas del df original, copiando solo en las filas donde coincida el aeropuerto de origen (Origin) con
     29 # el valor de la columna iata_code de timezones.df. Si algun aeropuerto de Origin no apareciera en timezones_df,
     30 # las 3 columnas quedarán con valor nulo (NULL)
     32 df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/readwriter.py:830, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
    828 if type(path) == list:
    829     assert self._spark._sc._jvm is not None
--> 830     return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    831 elif isinstance(path, RDD):
    833     def func(iterator):
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
    226 converted = convert_exception(e.java_exception)
    227 if not isinstance(converted, UnknownException):
    228     # Hide where the exception came from that shows a non-Pythonic
    229     # JVM exception message.
--> 230     raise converted from None
    231 else:
    232     raise

Databricks Error Snapshot 1: Databricks Error Snapshot 1

Databricks Error Snapshot 2: Databricks Error Snapshot 2

Has anyone experienced this problem before? Is there any solution?

I tried installing the file both with pip and from Library, and I got the same error, also rebooted the cluster several times.

Thanks in advance for your help.

I am using Python 3.11, Pyspark 3.5 and Java 8 and created the wheel locally from PyCharm. If you need more details to answer, just ask and I'll provide them.

I explained all the details above. I was expecting to be able to use the wheel I created locally from a Databricks Notebook.

Sorry about my English is not my native tongue and I am a bit rusty.

Edited to answer comment:

Can u navigate to %sh ls /dbfs/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources and share the folder content as image? – Samuel Demir 11 hours ago

I just did what you asked for and I got this result (The file is actually there even if Databricks says it can't find it...

Snapshot of suggestion result

Edited to answer Samuel Demir:

Maybe the package_data is missing in your setup initialization?!

This what I do to add my configuration files in the res folder to the wheel. The files are then accessible exactly by the same piece of code of yours.

setup(
    name="daprep",
    version=__version__,
    author="",
    author_email="[email protected]",
    description="A short summary of the project",
    license="proprietary",
    url="",
    packages=find_packages("src"),
    package_dir={"": "src"},
    package_data={"daprep": ["res/**/*"]},
    long_description=read("README.md"),
    install_requires=read_requirements(Path("requirements.txt")),
    tests_require=[
        "pytest",
        "pytest-cov",
        "pre-commit",
    ],
    cmdclass={
        "dist": DistCommand,
        "test": TestCommand,
        "testcov": TestCovCommand,
    },
    platforms="any",
    python_requires=">=3.7",
    entry_points={
        "console_scripts": [
            "main_entrypoint = daprep.main:main_entrypoint",
        ]
    }, )

My setup.py file has the package _data line as well, here you can see an snapshot and the code for it. Do you catch any other detail that could be relevant there? Thanks in advance.

Snapshot of my setup.py file

from setuptools import setup, find_packages

setup(
    name="motor-ingesta",
    version="0.1.0",
    author="Estrella Adriana Sicardi Segade",
    author_email="[email protected]",
    description="Motor de ingesta para el curso de Spark",
    long_description="Motor de ingesta para el curso de Spark",
    long_description_content_type="text/markdown",
    url="https://github.com/esicardi",
    python_requires=">=3.8",
    packages=find_packages(),
    package_data={"motor_ingesta": ["resources/*.csv"]})

Edited to answer questions:

Are you able to load the csv file by simply read it in a notebook cell? – Samuel Demir 17 hours ago

Yes, I am able to read the file from outside the code. For example I can print the content of the file:

%sh cat /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv

"iata_code","iana_tz","windows_tz"
"AAA","Pacific/Tahiti","Hawaiian Standard Time"
"AAB","Australia/Brisbane","E. Australia Standard Time"
"AAC","Africa/Cairo","Egypt Standard Time"
"AAD","Africa/Mogadishu","E. Africa Standard Time"
"AAE","Africa/Algiers","W. Central Africa Standard Time"
"AAF","America/New_York","Eastern Standard Time"
"AAG","America/Sao_Paulo","E. South America Standard Time"
"AAH","Europe/Berlin","W. Europe Standard Time"
"AAI","America/Araguaina","Tocantins Standard Time"
"AAJ","America/Paramaribo","SA Eastern Standard Time"
"AAK","Pacific/Tarawa","UTC+12"
"AAL","Europe/Copenhagen","Romance Standard Time"
"AAM","Africa/Johannesburg","South Africa Standard Time"
"AAN","Asia/Dubai","Arabian Standard Time"
"AAO","America/Caracas","Venezuela Standard Time"
"AAP","Asia/Makassar","Singapore Standard Time"
"AAQ","Europe/Moscow","Russian Standard Time"
"AAR","Europe/Copenhagen","Romance Standard Time"
"AAS","Asia/Jayapura","Tokyo Standard Time"

...
[file continues on until the end]

Are u able to share a part of your codebase to reconstruct the problem? – Samuel Demir 17 hours ago

Regarding the package (called motor_ingesta), it is composed by three py files, motor_ingesta.py, agregaciones.py and flujo_diario.py The package ingest and process some JSON files with info about flights in USA airports during January 2023. The function that reads timezones.csv in defined inside agregaciones.py. The function takes a Dataframe (this function is used to process dataframes ingested previouslt from JSON files), and it joins it with the data from timezones.csv, by matching airport codes. Later it uses that info to calcultate UTC time from Departure time (DepTime) and UTC zones. It adds one more column to the right of Dataframe, "FlightTime" containing UTC time, and later it drops the colums it added from timezones.csv. Here is the code for the function:

def aniade_hora_utc(spark: SparkSession, df: DF) -> DF:
    """
    Añade la columna FlightTime en formato UTC al DataFrame de vuelos.

    :param spark: Sesión de Spark.
    :param df: DataFrame de vuelos.
    :return: DataFrame de vuelos con la columna FlightTime en formato UTC.
    """

        
    path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
    timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
    df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")   
    df_with_flight_time = df_with_tz.withColumn("FlightTime", to_utc_timestamp(concat(
        col("FlightDate"), lit(" "),
        lpad(col("DepTime").cast("string"), 4, "0").substr(1, 2), lit(":"),
        col("DepTime").cast("string").substr(-2, 2)
    ), col("iana_tz")))

    df_with_flight_time = df_with_flight_time.drop("iata_code", "iana_tz", "windows_tz")
    return df_with_flight_time
1

There are 1 best solutions below

1
Samuel Demir On

Maybe the package_data is missing in your setup initialization?!

This what I do to add my configuration files in the res folder to the wheel. The files are then accessible exactly by the same piece of code of yours.


setup(
    name="daprep",
    version=__version__,
    author="",
    author_email="[email protected]",
    description="A short summary of the project",
    license="proprietary",
    url="",
    packages=find_packages("src"),
    package_dir={"": "src"},
    package_data={"daprep": ["res/**/*"]},
    long_description=read("README.md"),
    install_requires=read_requirements(Path("requirements.txt")),
    tests_require=[
        "pytest",
        "pytest-cov",
        "pre-commit",
    ],
    cmdclass={
        "dist": DistCommand,
        "test": TestCommand,
        "testcov": TestCovCommand,
    },
    platforms="any",
    python_requires=">=3.7",
    entry_points={
        "console_scripts": [
            "main_entrypoint = daprep.main:main_entrypoint",
        ]
    },
)