6  Combining Updated Locations Data with Initial Trip Data

6.1 Introduction

If you recall, we had initially separated location and non-location data so as to gather more location-related data, such as median household income and population density around pickup and dropoff points.

Having accomplished that, we will now combine the updated locations data with the rest of the trip data.

This will be the last chapter for me, but please go further — create maps based on this data, and perform machine learning analysis too. You can find Sparklyr machine learning documentation here.

It has been a joy to come this far, and I hope you have learnt something new throughout this whole tutorial.

FYI, because we are not using Apache Sedona in this part, we are using the same Spark configuration as that used in Chapter Two.

Anyhow, let us get to work!

6.2 Load the datasets

We start by loading the datasets.

As you can see below, in addition to partitioning the data into 24 parts, we also specify the column to partition by, and use trip_id which is common to both datasets.

This is because we shall later join the datasets based on this column, and we want rows with the same trip_id to be in the same partitions so as to minimise data shuffling, which is quite computationally intensive as previously underscored.

# Read locations dataset in Delta format and register as a SQL view for querying
locations_sdf_updated_three <- spark_read_delta(
  sc,
  path = file.path(getwd(), "data", "locations_sdf_updated_three")
) |> 
  filter(trip_id >= 40000000 & trip_id <= 40000010) %>% # filter for only ten rows 
  sdf_register("locations_sdf_updated_three_view")  # Register as view for SQL queries

# Read trip data in Delta format and register as a SQL view
trip_data_sdf <- spark_read_delta(
  sc,
  path = file.path(getwd(), "data", "trip_data_sdf")
) %>% 
  filter(trip_id >= 40000000 & trip_id <= 40000010) %>%  # Optional filtering
  sdf_register("trip_data_sdf")

Just a refresher on how these datasets look.

print(locations_sdf_updated_three, width=Inf)
# Source:   table<`locations_sdf_updated_three_view`> [?? x 11]
# Database: spark_connection
    trip_id latitude longitude is_pickup BoroName  NTA2020
      <dbl>    <dbl>     <dbl>     <dbl> <chr>     <chr>  
 1 40000009     40.7     -74.0         1 Manhattan MN0603 
 2 40000006     40.8     -74.0         1 Manhattan MN0604 
 3 40000007     40.7     -74.0         0 Manhattan MN0501 
 4 40000005     40.8     -74.0         1 Manhattan MN0502 
 5 40000006     40.8     -74.0         0 Manhattan MN0801 
 6 40000004     40.7     -74.0         1 Manhattan MN0401 
 7 40000002     40.7     -74.0         0 Manhattan MN0202 
 8 40000004     40.7     -74.0         0 Manhattan MN0501 
 9 40000003     40.7     -74.0         0 Manhattan MN0501 
10 40000007     40.8     -74.0         1 Manhattan MN0603 
   NTAName                                     MdHHIncE pop_density lcz_class
   <chr>                                          <int>       <dbl>     <int>
 1 Murray Hill-Kips Bay                          138337      44076.         1
 2 East Midtown-Turtle Bay                       161934      41235.         1
 3 Midtown South-Flatiron-Union Square           167458      29773.         1
 4 Midtown-Times Square                          153871       4928.         1
 5 Upper East Side-Lenox Hill-Roosevelt Island   133349      21597.         1
 6 Chelsea-Hudson Yards                          118915      37489.         2
 7 Greenwich Village                             175436      31734.         2
 8 Midtown South-Flatiron-Union Square           167458      29773.         2
 9 Midtown South-Flatiron-Union Square           167458      15877.         1
10 Murray Hill-Kips Bay                          138337      20680.         1
   lcz_label       
   <chr>           
 1 Compact highrise
 2 Compact highrise
 3 Compact highrise
 4 Compact highrise
 5 Compact highrise
 6 Compact midrise 
 7 Compact midrise 
 8 Compact midrise 
 9 Compact highrise
10 Compact highrise
# ℹ more rows
print(trip_data_sdf, width=Inf, n=10)
# Source:   table<`trip_data_sdf`> [?? x 30]
# Database: spark_connection
   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
      <int> <dttm>               <dttm>                          <int>
 1        2 2016-01-25 11:08:45  2016-01-25 11:25:00                 4
 2        1 2016-01-25 11:18:15  2016-01-25 11:25:05                 1
 3        1 2016-01-25 11:17:40  2016-01-25 11:25:12                 1
 4        2 2016-01-25 11:01:31  2016-01-25 11:25:15                 1
 5        1 2016-01-25 11:13:33  2016-01-25 11:25:18                 1
 6        1 2016-01-25 10:59:58  2016-01-25 11:25:18                 1
 7        1 2016-01-25 11:18:57  2016-01-25 11:25:22                 1
 8        2 2016-01-25 11:08:52  2016-01-25 11:25:26                 5
 9        1 2016-01-25 10:50:15  2016-01-25 11:25:26                 1
10        2 2016-01-25 10:58:22  2016-01-25 11:25:54                 3
   trip_distance RateCodeID store_and_fwd_flag payment_type fare_amount extra
           <dbl> <chr>      <chr>              <chr>              <dbl> <dbl>
 1          0.82 1          N                  1                   10.5     0
 2          0.7  1          N                  1                    6       0
 3          0.6  1          N                  1                    6.5     0
 4          1.43 1          N                  2                   14       0
 5          1    1          N                  2                    8.5     0
 6          1.5  1          N                  2                   15.5     0
 7          0.6  1          N                  1                    6       0
 8          1.51 1          N                  1                   11       0
 9          7.3  1          N                  2                   29       0
10          3.19 1          N                  1                   18       0
   mta_tax tip_amount tolls_amount improvement_surcharge total_amount
     <dbl>      <dbl>        <dbl>                 <dbl>        <dbl>
 1     0.5       2.82            0                   0.3        14.1 
 2     0.5       1.36            0                   0.3         8.16
 3     0.5       1.45            0                   0.3         8.75
 4     0.5       0               0                   0.3        14.8 
 5     0.5       0               0                   0.3         9.3 
 6     0.5       0               0                   0.3        16.3 
 7     0.5       1.36            0                   0.3         8.16
 8     0.5       2.36            0                   0.3        14.2 
 9     0.5       0               0                   0.3        29.8 
10     0.5       2               0                   0.3        20.8 
   improvement_surcharge_imputed pickup_hour pickup_dayofweek pickup_week
                           <dbl>       <int> <chr>                  <int>
 1                           0.3          11 Mon                        4
 2                           0.3          11 Mon                        4
 3                           0.3          11 Mon                        4
 4                           0.3          11 Mon                        4
 5                           0.3          11 Mon                        4
 6                           0.3          10 Mon                        4
 7                           0.3          11 Mon                        4
 8                           0.3          11 Mon                        4
 9                           0.3          10 Mon                        4
10                           0.3          10 Mon                        4
   pickup_month dropoff_hour dropoff_dayofweek dropoff_week dropoff_month
          <int>        <int> <chr>                    <int>         <int>
 1            1           11 Mon                          4             1
 2            1           11 Mon                          4             1
 3            1           11 Mon                          4             1
 4            1           11 Mon                          4             1
 5            1           11 Mon                          4             1
 6            1           11 Mon                          4             1
 7            1           11 Mon                          4             1
 8            1           11 Mon                          4             1
 9            1           11 Mon                          4             1
10            1           11 Mon                          4             1
   is_weekend_pickup is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled
               <dbl>              <dbl>               <dbl>                <dbl>
 1                 0                  0                   0              -0.586 
 2                 0                  0                   0              -0.620 
 3                 0                  0                   0              -0.648 
 4                 0                  0                   0              -0.413 
 5                 0                  0                   0              -0.535 
 6                 0                  0                   0              -0.393 
 7                 0                  0                   0              -0.648 
 8                 0                  0                   0              -0.390 
 9                 0                  0                   0               1.25  
10                 0                  0                   0               0.0860
   fare_amount_scaled  trip_id
                <dbl>    <dbl>
 1             -0.178 40000000
 2             -0.616 40000001
 3             -0.567 40000002
 4              0.163 40000003
 5             -0.373 40000004
 6              0.309 40000005
 7             -0.616 40000006
 8             -0.129 40000007
 9              1.62  40000008
10              0.552 40000009
# ℹ more rows

6.3 Joining the datasets

We start by joining the non-location data with the pickup location data, renaming certain variables, and dropping others in the process.

# Join trip data with pickup locations using trip_id and rename selected columns for clarity
merged_one <- left_join(
  trip_data_sdf,
  locations_sdf_updated_three %>% filter(is_pickup == 1),
  by = "trip_id"
) %>% 
  rename(
    pickup_borough = BoroName,
    pickup_neighbourhood = NTAName, 
    pickup_neigh_hhincome = MdHHIncE,
    pickup_pop_density = pop_density,
    pickup_lcz_label = lcz_label
  ) %>% 
  select(
    trip_id,
    VendorID, 
    tpep_pickup_datetime, 
    tpep_dropoff_datetime, 
    passenger_count,
    trip_distance,
    pickup_hour,
    pickup_dayofweek,
    pickup_week,
    pickup_month,
    dropoff_hour,
    dropoff_dayofweek,
    dropoff_week,
    dropoff_month,
    is_weekend_pickup,
    is_weekend_dropoff,
    is_rush_hour_pickup,
    trip_distance_scaled,
    pickup_borough,
    pickup_neighbourhood,
    pickup_neigh_hhincome,
    pickup_pop_density,
    pickup_lcz_label
  )

# Display the merged pickup-enriched dataset
print(merged_one, width = Inf, n=10)
# Source:   SQL [?? x 23]
# Database: spark_connection
    trip_id VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
      <dbl>    <int> <dttm>               <dttm>                          <int>
 1 40000002        1 2016-01-25 11:17:40  2016-01-25 11:25:12                 1
 2 40000008        1 2016-01-25 10:50:15  2016-01-25 11:25:26                 1
 3 40000010        2 2016-01-25 11:13:41  2016-01-25 11:26:17                 6
 4 40000001        1 2016-01-25 11:18:15  2016-01-25 11:25:05                 1
 5 40000006        1 2016-01-25 11:18:57  2016-01-25 11:25:22                 1
 6 40000005        1 2016-01-25 10:59:58  2016-01-25 11:25:18                 1
 7 40000000        2 2016-01-25 11:08:45  2016-01-25 11:25:00                 4
 8 40000007        2 2016-01-25 11:08:52  2016-01-25 11:25:26                 5
 9 40000004        1 2016-01-25 11:13:33  2016-01-25 11:25:18                 1
10 40000009        2 2016-01-25 10:58:22  2016-01-25 11:25:54                 3
   trip_distance pickup_hour pickup_dayofweek pickup_week pickup_month
           <dbl>       <int> <chr>                  <int>        <int>
 1          0.6           11 Mon                        4            1
 2          7.3           10 Mon                        4            1
 3          1.43          11 Mon                        4            1
 4          0.7           11 Mon                        4            1
 5          0.6           11 Mon                        4            1
 6          1.5           10 Mon                        4            1
 7          0.82          11 Mon                        4            1
 8          1.51          11 Mon                        4            1
 9          1             11 Mon                        4            1
10          3.19          10 Mon                        4            1
   dropoff_hour dropoff_dayofweek dropoff_week dropoff_month is_weekend_pickup
          <int> <chr>                    <int>         <int>             <dbl>
 1           11 Mon                          4             1                 0
 2           11 Mon                          4             1                 0
 3           11 Mon                          4             1                 0
 4           11 Mon                          4             1                 0
 5           11 Mon                          4             1                 0
 6           11 Mon                          4             1                 0
 7           11 Mon                          4             1                 0
 8           11 Mon                          4             1                 0
 9           11 Mon                          4             1                 0
10           11 Mon                          4             1                 0
   is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled pickup_borough
                <dbl>               <dbl>                <dbl> <chr>         
 1                  0                   0              -0.648  Manhattan     
 2                  0                   0               1.25   Manhattan     
 3                  0                   0              -0.413  Manhattan     
 4                  0                   0              -0.620  Manhattan     
 5                  0                   0              -0.648  Manhattan     
 6                  0                   0              -0.393  Manhattan     
 7                  0                   0              -0.586  Manhattan     
 8                  0                   0              -0.390  Manhattan     
 9                  0                   0              -0.535  Manhattan     
10                  0                   0               0.0860 Manhattan     
   pickup_neighbourhood                pickup_neigh_hhincome pickup_pop_density
   <chr>                                               <int>              <dbl>
 1 SoHo-Little Italy-Hudson Square                    133847             14980.
 2 Midtown-Times Square                               153871             14246.
 3 Midtown South-Flatiron-Union Square                167458              4953.
 4 SoHo-Little Italy-Hudson Square                    133847             22022.
 5 East Midtown-Turtle Bay                            161934             41235.
 6 Midtown-Times Square                               153871              4928.
 7 East Midtown-Turtle Bay                            161934              4928.
 8 Murray Hill-Kips Bay                               138337             20680.
 9 Chelsea-Hudson Yards                               118915             37489.
10 Murray Hill-Kips Bay                               138337             44076.
   pickup_lcz_label
   <chr>           
 1 Compact midrise 
 2 Compact highrise
 3 Compact highrise
 4 Compact midrise 
 5 Compact highrise
 6 Compact highrise
 7 Compact highrise
 8 Compact highrise
 9 Compact midrise 
10 Compact highrise
# ℹ more rows

We now join our updated data with dropoff location data, again renaming certain variables and dropping others.

# Join the merged pickup dataset with dropoff locations and rename/drop unnecessary columns
merged_two <- left_join(
  merged_one,
  locations_sdf_updated_three %>% filter(is_pickup == 0),
  by = "trip_id"
) %>% 
  rename(
    dropoff_borough = BoroName,
    dropoff_neighbourhood = NTAName, 
    dropoff_neigh_hhincome = MdHHIncE,
    dropoff_pop_density = pop_density,
    dropoff_lcz_label = lcz_label
  ) %>% 
  select(-c(latitude, longitude, is_pickup, lcz_class))  # Drop redundant columns

This is what our final dataset looks like.

# Print the final merged dataset enriched with both pickup and dropoff spatial context
print(merged_two, width = Inf, n=10)
# Source:   SQL [?? x 29]
# Database: spark_connection
    trip_id VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
      <dbl>    <int> <dttm>               <dttm>                          <int>
 1 40000002        1 2016-01-25 11:17:40  2016-01-25 11:25:12                 1
 2 40000008        1 2016-01-25 10:50:15  2016-01-25 11:25:26                 1
 3 40000010        2 2016-01-25 11:13:41  2016-01-25 11:26:17                 6
 4 40000001        1 2016-01-25 11:18:15  2016-01-25 11:25:05                 1
 5 40000006        1 2016-01-25 11:18:57  2016-01-25 11:25:22                 1
 6 40000005        1 2016-01-25 10:59:58  2016-01-25 11:25:18                 1
 7 40000000        2 2016-01-25 11:08:45  2016-01-25 11:25:00                 4
 8 40000007        2 2016-01-25 11:08:52  2016-01-25 11:25:26                 5
 9 40000004        1 2016-01-25 11:13:33  2016-01-25 11:25:18                 1
10 40000009        2 2016-01-25 10:58:22  2016-01-25 11:25:54                 3
   trip_distance pickup_hour pickup_dayofweek pickup_week pickup_month
           <dbl>       <int> <chr>                  <int>        <int>
 1          0.6           11 Mon                        4            1
 2          7.3           10 Mon                        4            1
 3          1.43          11 Mon                        4            1
 4          0.7           11 Mon                        4            1
 5          0.6           11 Mon                        4            1
 6          1.5           10 Mon                        4            1
 7          0.82          11 Mon                        4            1
 8          1.51          11 Mon                        4            1
 9          1             11 Mon                        4            1
10          3.19          10 Mon                        4            1
   dropoff_hour dropoff_dayofweek dropoff_week dropoff_month is_weekend_pickup
          <int> <chr>                    <int>         <int>             <dbl>
 1           11 Mon                          4             1                 0
 2           11 Mon                          4             1                 0
 3           11 Mon                          4             1                 0
 4           11 Mon                          4             1                 0
 5           11 Mon                          4             1                 0
 6           11 Mon                          4             1                 0
 7           11 Mon                          4             1                 0
 8           11 Mon                          4             1                 0
 9           11 Mon                          4             1                 0
10           11 Mon                          4             1                 0
   is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled pickup_borough
                <dbl>               <dbl>                <dbl> <chr>         
 1                  0                   0              -0.648  Manhattan     
 2                  0                   0               1.25   Manhattan     
 3                  0                   0              -0.413  Manhattan     
 4                  0                   0              -0.620  Manhattan     
 5                  0                   0              -0.648  Manhattan     
 6                  0                   0              -0.393  Manhattan     
 7                  0                   0              -0.586  Manhattan     
 8                  0                   0              -0.390  Manhattan     
 9                  0                   0              -0.535  Manhattan     
10                  0                   0               0.0860 Manhattan     
   pickup_neighbourhood                pickup_neigh_hhincome pickup_pop_density
   <chr>                                               <int>              <dbl>
 1 SoHo-Little Italy-Hudson Square                    133847             14980.
 2 Midtown-Times Square                               153871             14246.
 3 Midtown South-Flatiron-Union Square                167458              4953.
 4 SoHo-Little Italy-Hudson Square                    133847             22022.
 5 East Midtown-Turtle Bay                            161934             41235.
 6 Midtown-Times Square                               153871              4928.
 7 East Midtown-Turtle Bay                            161934              4928.
 8 Murray Hill-Kips Bay                               138337             20680.
 9 Chelsea-Hudson Yards                               118915             37489.
10 Murray Hill-Kips Bay                               138337             44076.
   pickup_lcz_label dropoff_borough NTA2020
   <chr>            <chr>           <chr>  
 1 Compact midrise  Manhattan       MN0202 
 2 Compact highrise Queens          QN8081 
 3 Compact highrise Manhattan       MN0202 
 4 Compact midrise  Manhattan       MN0201 
 5 Compact highrise Manhattan       MN0801 
 6 Compact highrise Manhattan       MN0603 
 7 Compact highrise Manhattan       MN0502 
 8 Compact highrise Manhattan       MN0501 
 9 Compact midrise  Manhattan       MN0501 
10 Compact highrise Manhattan       MN0701 
   dropoff_neighbourhood                       dropoff_neigh_hhincome
   <chr>                                                        <int>
 1 Greenwich Village                                           175436
 2 LaGuardia Airport                                               NA
 3 Greenwich Village                                           175436
 4 SoHo-Little Italy-Hudson Square                             133847
 5 Upper East Side-Lenox Hill-Roosevelt Island                 133349
 6 Murray Hill-Kips Bay                                        138337
 7 Midtown-Times Square                                        153871
 8 Midtown South-Flatiron-Union Square                         167458
 9 Midtown South-Flatiron-Union Square                         167458
10 Upper West Side-Lincoln Square                              158165
   dropoff_pop_density dropoff_lcz_label 
                 <dbl> <chr>             
 1              31734. Compact midrise   
 2               5171. Bare rock or paved
 3              37464. Compact midrise   
 4              27356. Compact midrise   
 5              21597. Compact highrise  
 6              29922. Compact highrise  
 7              14246. Compact highrise  
 8              29773. Compact highrise  
 9              29773. Compact midrise   
10              23416. Compact highrise  
# ℹ more rows

6.4 Writing the final preprocessed dataset

Finally, we write the data to disk. As stated earlier, feel free to go beyond this tutorial, creating maps based on NYC neighbourhoods and building models. Good luck, and thank you!

# Define output path to save final merged dataset in Delta format
locations_sdf_updated_four_file_path <- file.path(
  getwd(),
  "data",
  "locations_sdf_updated_four"
)

# Write final dataset in Delta format with append mode to allow incremental writing
spark_write_delta(
  merged_two,
  path = locations_sdf_updated_four_file_path,
  mode = "append"
)
# Disconnect from the Spark session to free resources
spark_disconnect(sc)