# Read locations dataset in Delta format and register as a SQL view for querying
<- spark_read_delta(
locations_sdf_updated_three
sc,path = file.path(getwd(), "data", "locations_sdf_updated_three")
|>
) filter(trip_id >= 40000000 & trip_id <= 40000010) %>% # filter for only ten rows
sdf_register("locations_sdf_updated_three_view") # Register as view for SQL queries
# Read trip data in Delta format and register as a SQL view
<- spark_read_delta(
trip_data_sdf
sc,path = file.path(getwd(), "data", "trip_data_sdf")
%>%
) filter(trip_id >= 40000000 & trip_id <= 40000010) %>% # Optional filtering
sdf_register("trip_data_sdf")
6 Combining Updated Locations Data with Initial Trip Data
6.1 Introduction
If you recall, we had initially separated location and non-location data so as to gather more location-related data, such as median household income and population density around pickup and dropoff points.
Having accomplished that, we will now combine the updated locations data with the rest of the trip data.
This will be the last chapter for me, but please go further — create maps based on this data, and perform machine learning analysis too. You can find Sparklyr machine learning documentation here.
It has been a joy to come this far, and I hope you have learnt something new throughout this whole tutorial.
FYI, because we are not using Apache Sedona in this part, we are using the same Spark configuration as that used in Chapter Two.
Anyhow, let us get to work!
6.2 Load the datasets
We start by loading the datasets.
As you can see below, in addition to partitioning the data into 24 parts, we also specify the column to partition by, and use trip_id which is common to both datasets.
This is because we shall later join the datasets based on this column, and we want rows with the same trip_id to be in the same partitions so as to minimise data shuffling, which is quite computationally intensive as previously underscored.
Just a refresher on how these datasets look.
print(locations_sdf_updated_three, width=Inf)
# Source: table<`locations_sdf_updated_three_view`> [?? x 11]
# Database: spark_connection
trip_id latitude longitude is_pickup BoroName NTA2020
<dbl> <dbl> <dbl> <dbl> <chr> <chr>
1 40000009 40.7 -74.0 1 Manhattan MN0603
2 40000006 40.8 -74.0 1 Manhattan MN0604
3 40000007 40.7 -74.0 0 Manhattan MN0501
4 40000005 40.8 -74.0 1 Manhattan MN0502
5 40000006 40.8 -74.0 0 Manhattan MN0801
6 40000004 40.7 -74.0 1 Manhattan MN0401
7 40000002 40.7 -74.0 0 Manhattan MN0202
8 40000004 40.7 -74.0 0 Manhattan MN0501
9 40000003 40.7 -74.0 0 Manhattan MN0501
10 40000007 40.8 -74.0 1 Manhattan MN0603
NTAName MdHHIncE pop_density lcz_class
<chr> <int> <dbl> <int>
1 Murray Hill-Kips Bay 138337 44076. 1
2 East Midtown-Turtle Bay 161934 41235. 1
3 Midtown South-Flatiron-Union Square 167458 29773. 1
4 Midtown-Times Square 153871 4928. 1
5 Upper East Side-Lenox Hill-Roosevelt Island 133349 21597. 1
6 Chelsea-Hudson Yards 118915 37489. 2
7 Greenwich Village 175436 31734. 2
8 Midtown South-Flatiron-Union Square 167458 29773. 2
9 Midtown South-Flatiron-Union Square 167458 15877. 1
10 Murray Hill-Kips Bay 138337 20680. 1
lcz_label
<chr>
1 Compact highrise
2 Compact highrise
3 Compact highrise
4 Compact highrise
5 Compact highrise
6 Compact midrise
7 Compact midrise
8 Compact midrise
9 Compact highrise
10 Compact highrise
# ℹ more rows
print(trip_data_sdf, width=Inf, n=10)
# Source: table<`trip_data_sdf`> [?? x 30]
# Database: spark_connection
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
<int> <dttm> <dttm> <int>
1 2 2016-01-25 11:08:45 2016-01-25 11:25:00 4
2 1 2016-01-25 11:18:15 2016-01-25 11:25:05 1
3 1 2016-01-25 11:17:40 2016-01-25 11:25:12 1
4 2 2016-01-25 11:01:31 2016-01-25 11:25:15 1
5 1 2016-01-25 11:13:33 2016-01-25 11:25:18 1
6 1 2016-01-25 10:59:58 2016-01-25 11:25:18 1
7 1 2016-01-25 11:18:57 2016-01-25 11:25:22 1
8 2 2016-01-25 11:08:52 2016-01-25 11:25:26 5
9 1 2016-01-25 10:50:15 2016-01-25 11:25:26 1
10 2 2016-01-25 10:58:22 2016-01-25 11:25:54 3
trip_distance RateCodeID store_and_fwd_flag payment_type fare_amount extra
<dbl> <chr> <chr> <chr> <dbl> <dbl>
1 0.82 1 N 1 10.5 0
2 0.7 1 N 1 6 0
3 0.6 1 N 1 6.5 0
4 1.43 1 N 2 14 0
5 1 1 N 2 8.5 0
6 1.5 1 N 2 15.5 0
7 0.6 1 N 1 6 0
8 1.51 1 N 1 11 0
9 7.3 1 N 2 29 0
10 3.19 1 N 1 18 0
mta_tax tip_amount tolls_amount improvement_surcharge total_amount
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.5 2.82 0 0.3 14.1
2 0.5 1.36 0 0.3 8.16
3 0.5 1.45 0 0.3 8.75
4 0.5 0 0 0.3 14.8
5 0.5 0 0 0.3 9.3
6 0.5 0 0 0.3 16.3
7 0.5 1.36 0 0.3 8.16
8 0.5 2.36 0 0.3 14.2
9 0.5 0 0 0.3 29.8
10 0.5 2 0 0.3 20.8
improvement_surcharge_imputed pickup_hour pickup_dayofweek pickup_week
<dbl> <int> <chr> <int>
1 0.3 11 Mon 4
2 0.3 11 Mon 4
3 0.3 11 Mon 4
4 0.3 11 Mon 4
5 0.3 11 Mon 4
6 0.3 10 Mon 4
7 0.3 11 Mon 4
8 0.3 11 Mon 4
9 0.3 10 Mon 4
10 0.3 10 Mon 4
pickup_month dropoff_hour dropoff_dayofweek dropoff_week dropoff_month
<int> <int> <chr> <int> <int>
1 1 11 Mon 4 1
2 1 11 Mon 4 1
3 1 11 Mon 4 1
4 1 11 Mon 4 1
5 1 11 Mon 4 1
6 1 11 Mon 4 1
7 1 11 Mon 4 1
8 1 11 Mon 4 1
9 1 11 Mon 4 1
10 1 11 Mon 4 1
is_weekend_pickup is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled
<dbl> <dbl> <dbl> <dbl>
1 0 0 0 -0.586
2 0 0 0 -0.620
3 0 0 0 -0.648
4 0 0 0 -0.413
5 0 0 0 -0.535
6 0 0 0 -0.393
7 0 0 0 -0.648
8 0 0 0 -0.390
9 0 0 0 1.25
10 0 0 0 0.0860
fare_amount_scaled trip_id
<dbl> <dbl>
1 -0.178 40000000
2 -0.616 40000001
3 -0.567 40000002
4 0.163 40000003
5 -0.373 40000004
6 0.309 40000005
7 -0.616 40000006
8 -0.129 40000007
9 1.62 40000008
10 0.552 40000009
# ℹ more rows
6.3 Joining the datasets
We start by joining the non-location data with the pickup location data, renaming certain variables, and dropping others in the process.
# Join trip data with pickup locations using trip_id and rename selected columns for clarity
<- left_join(
merged_one
trip_data_sdf,%>% filter(is_pickup == 1),
locations_sdf_updated_three by = "trip_id"
%>%
) rename(
pickup_borough = BoroName,
pickup_neighbourhood = NTAName,
pickup_neigh_hhincome = MdHHIncE,
pickup_pop_density = pop_density,
pickup_lcz_label = lcz_label
%>%
) select(
trip_id,
VendorID,
tpep_pickup_datetime,
tpep_dropoff_datetime,
passenger_count,
trip_distance,
pickup_hour,
pickup_dayofweek,
pickup_week,
pickup_month,
dropoff_hour,
dropoff_dayofweek,
dropoff_week,
dropoff_month,
is_weekend_pickup,
is_weekend_dropoff,
is_rush_hour_pickup,
trip_distance_scaled,
pickup_borough,
pickup_neighbourhood,
pickup_neigh_hhincome,
pickup_pop_density,
pickup_lcz_label
)
# Display the merged pickup-enriched dataset
print(merged_one, width = Inf, n=10)
# Source: SQL [?? x 23]
# Database: spark_connection
trip_id VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
<dbl> <int> <dttm> <dttm> <int>
1 40000002 1 2016-01-25 11:17:40 2016-01-25 11:25:12 1
2 40000008 1 2016-01-25 10:50:15 2016-01-25 11:25:26 1
3 40000010 2 2016-01-25 11:13:41 2016-01-25 11:26:17 6
4 40000001 1 2016-01-25 11:18:15 2016-01-25 11:25:05 1
5 40000006 1 2016-01-25 11:18:57 2016-01-25 11:25:22 1
6 40000005 1 2016-01-25 10:59:58 2016-01-25 11:25:18 1
7 40000000 2 2016-01-25 11:08:45 2016-01-25 11:25:00 4
8 40000007 2 2016-01-25 11:08:52 2016-01-25 11:25:26 5
9 40000004 1 2016-01-25 11:13:33 2016-01-25 11:25:18 1
10 40000009 2 2016-01-25 10:58:22 2016-01-25 11:25:54 3
trip_distance pickup_hour pickup_dayofweek pickup_week pickup_month
<dbl> <int> <chr> <int> <int>
1 0.6 11 Mon 4 1
2 7.3 10 Mon 4 1
3 1.43 11 Mon 4 1
4 0.7 11 Mon 4 1
5 0.6 11 Mon 4 1
6 1.5 10 Mon 4 1
7 0.82 11 Mon 4 1
8 1.51 11 Mon 4 1
9 1 11 Mon 4 1
10 3.19 10 Mon 4 1
dropoff_hour dropoff_dayofweek dropoff_week dropoff_month is_weekend_pickup
<int> <chr> <int> <int> <dbl>
1 11 Mon 4 1 0
2 11 Mon 4 1 0
3 11 Mon 4 1 0
4 11 Mon 4 1 0
5 11 Mon 4 1 0
6 11 Mon 4 1 0
7 11 Mon 4 1 0
8 11 Mon 4 1 0
9 11 Mon 4 1 0
10 11 Mon 4 1 0
is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled pickup_borough
<dbl> <dbl> <dbl> <chr>
1 0 0 -0.648 Manhattan
2 0 0 1.25 Manhattan
3 0 0 -0.413 Manhattan
4 0 0 -0.620 Manhattan
5 0 0 -0.648 Manhattan
6 0 0 -0.393 Manhattan
7 0 0 -0.586 Manhattan
8 0 0 -0.390 Manhattan
9 0 0 -0.535 Manhattan
10 0 0 0.0860 Manhattan
pickup_neighbourhood pickup_neigh_hhincome pickup_pop_density
<chr> <int> <dbl>
1 SoHo-Little Italy-Hudson Square 133847 14980.
2 Midtown-Times Square 153871 14246.
3 Midtown South-Flatiron-Union Square 167458 4953.
4 SoHo-Little Italy-Hudson Square 133847 22022.
5 East Midtown-Turtle Bay 161934 41235.
6 Midtown-Times Square 153871 4928.
7 East Midtown-Turtle Bay 161934 4928.
8 Murray Hill-Kips Bay 138337 20680.
9 Chelsea-Hudson Yards 118915 37489.
10 Murray Hill-Kips Bay 138337 44076.
pickup_lcz_label
<chr>
1 Compact midrise
2 Compact highrise
3 Compact highrise
4 Compact midrise
5 Compact highrise
6 Compact highrise
7 Compact highrise
8 Compact highrise
9 Compact midrise
10 Compact highrise
# ℹ more rows
We now join our updated data with dropoff location data, again renaming certain variables and dropping others.
# Join the merged pickup dataset with dropoff locations and rename/drop unnecessary columns
<- left_join(
merged_two
merged_one,%>% filter(is_pickup == 0),
locations_sdf_updated_three by = "trip_id"
%>%
) rename(
dropoff_borough = BoroName,
dropoff_neighbourhood = NTAName,
dropoff_neigh_hhincome = MdHHIncE,
dropoff_pop_density = pop_density,
dropoff_lcz_label = lcz_label
%>%
) select(-c(latitude, longitude, is_pickup, lcz_class)) # Drop redundant columns
This is what our final dataset looks like.
# Print the final merged dataset enriched with both pickup and dropoff spatial context
print(merged_two, width = Inf, n=10)
# Source: SQL [?? x 29]
# Database: spark_connection
trip_id VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
<dbl> <int> <dttm> <dttm> <int>
1 40000002 1 2016-01-25 11:17:40 2016-01-25 11:25:12 1
2 40000008 1 2016-01-25 10:50:15 2016-01-25 11:25:26 1
3 40000010 2 2016-01-25 11:13:41 2016-01-25 11:26:17 6
4 40000001 1 2016-01-25 11:18:15 2016-01-25 11:25:05 1
5 40000006 1 2016-01-25 11:18:57 2016-01-25 11:25:22 1
6 40000005 1 2016-01-25 10:59:58 2016-01-25 11:25:18 1
7 40000000 2 2016-01-25 11:08:45 2016-01-25 11:25:00 4
8 40000007 2 2016-01-25 11:08:52 2016-01-25 11:25:26 5
9 40000004 1 2016-01-25 11:13:33 2016-01-25 11:25:18 1
10 40000009 2 2016-01-25 10:58:22 2016-01-25 11:25:54 3
trip_distance pickup_hour pickup_dayofweek pickup_week pickup_month
<dbl> <int> <chr> <int> <int>
1 0.6 11 Mon 4 1
2 7.3 10 Mon 4 1
3 1.43 11 Mon 4 1
4 0.7 11 Mon 4 1
5 0.6 11 Mon 4 1
6 1.5 10 Mon 4 1
7 0.82 11 Mon 4 1
8 1.51 11 Mon 4 1
9 1 11 Mon 4 1
10 3.19 10 Mon 4 1
dropoff_hour dropoff_dayofweek dropoff_week dropoff_month is_weekend_pickup
<int> <chr> <int> <int> <dbl>
1 11 Mon 4 1 0
2 11 Mon 4 1 0
3 11 Mon 4 1 0
4 11 Mon 4 1 0
5 11 Mon 4 1 0
6 11 Mon 4 1 0
7 11 Mon 4 1 0
8 11 Mon 4 1 0
9 11 Mon 4 1 0
10 11 Mon 4 1 0
is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled pickup_borough
<dbl> <dbl> <dbl> <chr>
1 0 0 -0.648 Manhattan
2 0 0 1.25 Manhattan
3 0 0 -0.413 Manhattan
4 0 0 -0.620 Manhattan
5 0 0 -0.648 Manhattan
6 0 0 -0.393 Manhattan
7 0 0 -0.586 Manhattan
8 0 0 -0.390 Manhattan
9 0 0 -0.535 Manhattan
10 0 0 0.0860 Manhattan
pickup_neighbourhood pickup_neigh_hhincome pickup_pop_density
<chr> <int> <dbl>
1 SoHo-Little Italy-Hudson Square 133847 14980.
2 Midtown-Times Square 153871 14246.
3 Midtown South-Flatiron-Union Square 167458 4953.
4 SoHo-Little Italy-Hudson Square 133847 22022.
5 East Midtown-Turtle Bay 161934 41235.
6 Midtown-Times Square 153871 4928.
7 East Midtown-Turtle Bay 161934 4928.
8 Murray Hill-Kips Bay 138337 20680.
9 Chelsea-Hudson Yards 118915 37489.
10 Murray Hill-Kips Bay 138337 44076.
pickup_lcz_label dropoff_borough NTA2020
<chr> <chr> <chr>
1 Compact midrise Manhattan MN0202
2 Compact highrise Queens QN8081
3 Compact highrise Manhattan MN0202
4 Compact midrise Manhattan MN0201
5 Compact highrise Manhattan MN0801
6 Compact highrise Manhattan MN0603
7 Compact highrise Manhattan MN0502
8 Compact highrise Manhattan MN0501
9 Compact midrise Manhattan MN0501
10 Compact highrise Manhattan MN0701
dropoff_neighbourhood dropoff_neigh_hhincome
<chr> <int>
1 Greenwich Village 175436
2 LaGuardia Airport NA
3 Greenwich Village 175436
4 SoHo-Little Italy-Hudson Square 133847
5 Upper East Side-Lenox Hill-Roosevelt Island 133349
6 Murray Hill-Kips Bay 138337
7 Midtown-Times Square 153871
8 Midtown South-Flatiron-Union Square 167458
9 Midtown South-Flatiron-Union Square 167458
10 Upper West Side-Lincoln Square 158165
dropoff_pop_density dropoff_lcz_label
<dbl> <chr>
1 31734. Compact midrise
2 5171. Bare rock or paved
3 37464. Compact midrise
4 27356. Compact midrise
5 21597. Compact highrise
6 29922. Compact highrise
7 14246. Compact highrise
8 29773. Compact highrise
9 29773. Compact midrise
10 23416. Compact highrise
# ℹ more rows
6.4 Writing the final preprocessed dataset
Finally, we write the data to disk. As stated earlier, feel free to go beyond this tutorial, creating maps based on NYC neighbourhoods and building models. Good luck, and thank you!
# Define output path to save final merged dataset in Delta format
<- file.path(
locations_sdf_updated_four_file_path getwd(),
"data",
"locations_sdf_updated_four"
)
# Write final dataset in Delta format with append mode to allow incremental writing
spark_write_delta(
merged_two,path = locations_sdf_updated_four_file_path,
mode = "append"
)
# Disconnect from the Spark session to free resources
spark_disconnect(sc)