Here is my python code:
spark.read.parquet(
"...",
header="true",
inferSchema="true",
)
sourcedata = sourcedata.select("*").withColumn(
"index", F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
) # create index for later jobs
sourcedata.show(truncate=False)
Result of above code is a dataframe with columns like this:
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+-----+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|Trip_Distance |Start_Lon |Start_Lat|Rate_Code|store_and_forward|End_Lon |End_Lat |Payment_Type|Fare_Amt |surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|index|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+-----+
|VTS |2009-01-04 02:52:00 |2009-01-04 03:02:00 |1 |2.63 |-73.991957 |40.721567|null |null |-73.993803 |40.695922|CASH |8.9 |0.5 |null |0.0 |0.0 |9.4 |1 |
|VTS |2009-01-04 03:31:00 |2009-01-04 03:38:00 |3 |4.55 |-73.982102 |40.73629 |null |null |-73.95585 |40.76803 |Credit |12.1 |0.5 |null |2.0 |0.0 |14.6 |2 |
|VTS |2009-01-03 15:43:00 |2009-01-03 15:57:00 |5 |10.35 |-74.002587 |40.739748|null |null |-73.869983 |40.770225|Credit |23.7 |0.0 |null |4.74 |0.0 |28.44 |3 |
Data type of this dataframe:
root
|-- vendor_name: string (nullable = true)
|-- Trip_Pickup_DateTime: string (nullable = true)
|-- Trip_Dropoff_DateTime: string (nullable = true)
|-- Passenger_Count: long (nullable = true)
|-- Trip_Distance: double (nullable = true)
|-- Start_Lon: double (nullable = true)
|-- Start_Lat: double (nullable = true)
|-- Rate_Code: double (nullable = true)
|-- store_and_forward: double (nullable = true)
|-- End_Lon: double (nullable = true)
|-- End_Lat: double (nullable = true)
|-- Payment_Type: string (nullable = true)
|-- Fare_Amt: double (nullable = true)
|-- surcharge: double (nullable = true)
|-- mta_tax: double (nullable = true)
|-- Tip_Amt: double (nullable = true)
|-- Tolls_Amt: double (nullable = true)
|-- Total_Amt: double (nullable = true)
|-- index: integer (nullable = false)
Next, I create another dataframe contains information of City, State with each pair of latitude and longitute:
df = sourcedata_indexed.select(
"index", col("Start_Lat").alias("latitude"), col("Start_Lon").alias("longitude")
)
# Define a user-defined function (UDF) to perform reverse geocoding
@udf(StringType())
def reverse_geocode(latitude, longitude):
# Assuming latitude and longitude are of type DoubleType
latitude = float(latitude)
longitude = float(longitude)
geolocator = Nominatim(user_agent="reverse_geocode")
location = geolocator.reverse((latitude, longitude), language="en")
if location:
address = location.address
return address
else:
return None
# Add 'city' and 'state' columns using the reverse_geocode UDF
df_with_location = df.withColumn(
"location", reverse_geocode(df["latitude"], df["longitude"])
)
df_with_location.show(truncate=False)
The result look like this:
+-----+---------+------------------+--------------------------------------------------------------------------------------------------------------------------+
|index|latitude |longitude |location |
+-----+---------+------------------+--------------------------------------------------------------------------------------------------------------------------+
|1 |40.721567|-73.991957 |183, Chrystie Street, Manhattan Community Board 3, Manhattan, New York County, New York, 10002, United States |
|2 |40.73629 |-73.982102 |359, 2nd Avenue, Manhattan Community Board 6, Manhattan, New York County, New York, 10010, United States |
Next, I take only information of city and state from previous dataframe:
# Split the 'location' column into 'city' and 'state'
df_with_city_state = (
df_with_location.withColumn("location", split("location", ","))
.withColumn("city", col("location")[size("location") - 4])
.withColumn("state", col("location")[size("location") - 5])
)
# Drop the intermediate 'location' column
df_with_city_state = df_with_city_state.select("index", "city", "state")
Finally, I want to export this dataframe into a file (csv, parquet...)
# Export the final DataFrame
df_with_location.write.csv('test', mode='overwrite')
But no matter how I try, I still can not do it. The most problem is timed out, so I think it maybe because the file is too large to export. But in my previous part of code, other dimentions, which have the same number of rows, can be exported easily. So someone please help me to find a way to export my dataframe into a csv file. Many thanks! Sorry if my English grammar is wrong!