Can not export a big dataframe contain information about city and state of US

31 Views Asked by At

Here is my python code:

spark.read.parquet(
    "...",
    header="true",
    inferSchema="true",
)

sourcedata = sourcedata.select("*").withColumn(
    "index", F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
)  # create index for later jobs

sourcedata.show(truncate=False)

Result of above code is a dataframe with columns like this:

+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+-----+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|Trip_Distance     |Start_Lon         |Start_Lat|Rate_Code|store_and_forward|End_Lon           |End_Lat  |Payment_Type|Fare_Amt         |surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|index|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+-----+
|VTS        |2009-01-04 02:52:00 |2009-01-04 03:02:00  |1              |2.63              |-73.991957        |40.721567|null     |null             |-73.993803        |40.695922|CASH        |8.9              |0.5      |null   |0.0    |0.0      |9.4      |1    |
|VTS        |2009-01-04 03:31:00 |2009-01-04 03:38:00  |3              |4.55              |-73.982102        |40.73629 |null     |null             |-73.95585         |40.76803 |Credit      |12.1             |0.5      |null   |2.0    |0.0      |14.6     |2    |
|VTS        |2009-01-03 15:43:00 |2009-01-03 15:57:00  |5              |10.35             |-74.002587        |40.739748|null     |null             |-73.869983        |40.770225|Credit      |23.7             |0.0      |null   |4.74   |0.0      |28.44    |3    |

Data type of this dataframe:

root
|-- vendor_name: string (nullable = true)
|-- Trip_Pickup_DateTime: string (nullable = true)
|-- Trip_Dropoff_DateTime: string (nullable = true)
|-- Passenger_Count: long (nullable = true)
|-- Trip_Distance: double (nullable = true)
|-- Start_Lon: double (nullable = true)
|-- Start_Lat: double (nullable = true)
|-- Rate_Code: double (nullable = true)
|-- store_and_forward: double (nullable = true)
|-- End_Lon: double (nullable = true)
|-- End_Lat: double (nullable = true)
|-- Payment_Type: string (nullable = true)
|-- Fare_Amt: double (nullable = true)
|-- surcharge: double (nullable = true)
|-- mta_tax: double (nullable = true)
|-- Tip_Amt: double (nullable = true)
|-- Tolls_Amt: double (nullable = true)
|-- Total_Amt: double (nullable = true)
|-- index: integer (nullable = false)

Next, I create another dataframe contains information of City, State with each pair of latitude and longitute:

df = sourcedata_indexed.select(
    "index", col("Start_Lat").alias("latitude"), col("Start_Lon").alias("longitude")
)

# Define a user-defined function (UDF) to perform reverse geocoding
@udf(StringType())
def reverse_geocode(latitude, longitude):
    # Assuming latitude and longitude are of type DoubleType
    latitude = float(latitude)
    longitude = float(longitude)
    geolocator = Nominatim(user_agent="reverse_geocode")
    location = geolocator.reverse((latitude, longitude), language="en")
    if location:
        address = location.address
        return address
    else:
        return None


# Add 'city' and 'state' columns using the reverse_geocode UDF
df_with_location = df.withColumn(
    "location", reverse_geocode(df["latitude"], df["longitude"])
)

df_with_location.show(truncate=False)

The result look like this:

+-----+---------+------------------+--------------------------------------------------------------------------------------------------------------------------+
|index|latitude |longitude         |location                                                                                                                  |
+-----+---------+------------------+--------------------------------------------------------------------------------------------------------------------------+
|1    |40.721567|-73.991957        |183, Chrystie Street, Manhattan Community Board 3, Manhattan, New York County, New York, 10002, United States             |
|2    |40.73629 |-73.982102        |359, 2nd Avenue, Manhattan Community Board 6, Manhattan, New York County, New York, 10010, United States                  |

Next, I take only information of city and state from previous dataframe:

# Split the 'location' column into 'city' and 'state'
df_with_city_state = (
    df_with_location.withColumn("location", split("location", ","))
    .withColumn("city", col("location")[size("location") - 4])
    .withColumn("state", col("location")[size("location") - 5])
)

# Drop the intermediate 'location' column
df_with_city_state = df_with_city_state.select("index", "city", "state")

Finally, I want to export this dataframe into a file (csv, parquet...)

# Export the final DataFrame
df_with_location.write.csv('test', mode='overwrite')

But no matter how I try, I still can not do it. The most problem is timed out, so I think it maybe because the file is too large to export. But in my previous part of code, other dimentions, which have the same number of rows, can be exported easily. So someone please help me to find a way to export my dataframe into a csv file. Many thanks! Sorry if my English grammar is wrong!

0

There are 0 best solutions below