2  Preprocessing and Feature Engineering for Yellow Cab Trip Data

2.1 Introduction

In this chapter, we shall demonstrate how to perform basic data cleaning and feature engineering using Sparklyr, and how to save the data in the Delta Lake format.

The primary dataset was downloaded from Kaggle here. It provides information about taxi trips, including the pickup and dropoff times and locations. Our goal is to enrich it using additional data obtained from geospatial sources, and leave the rest to you to visualise it and perform analysis that predicts taxi trip durations.

The overarching goal is to show you how to go about using Delta Lake, Sparklyr, Apache Sedona, and R for big data geospatial analysis when you only have an ordinary computer at your disposal.

For my actual analysis, I used the entire 7.4 GB dataset provided, containing about 47 million rows. However, for this published tutorial, I sometimes use less data so as to timely publish and update this website. For reference, I am using an M1 MacBook with 16 GB of RAM and 500 GB of disk space.

If you have 8 GB of RAM, I would suggest that you use one of the four datasets available, as they are also relatively massive with about 12 million rows each!

Anyhow, enough talking — let us get to work.

2.2 Installing and loading packages

We shall start by installing and loading the necessary libraries: arrow, sparklyr, rJavaEnv, and dplyr.

install.packages("arrow")
install.packages("sparklyr")
install.packages("rJavaEnv")

We use sparklyr to interface with Apache Spark in R, allowing us to work efficiently with large datasets using distributed computing. Spark operates on a cluster-based architecture, where a driver program coordinates tasks and executors perform computations across multiple nodes in parallel. However, when clusters are unavailable, Spark can also run in local mode, using a single machine while still leveraging parallelism to speed up computations. This makes it accessible for development and smaller-scale analyses. The dplyr package provides powerful data manipulation functions that integrate seamlessly with Spark, making it easier to transform and summarise data. Meanwhile, rJavaEnv is a package that allows us to automatically install Java and set the JAVA_HOME variable as it is a requirement for using Spark, because Spark runs on a Java Virtual Machine (JVM). Finally, we load arrow, which enhances Spark’s performance when copying, collecting, and transforming data, thereby improving the overall efficiency of our analysis.

# Load required libraries
library(arrow)      # Handle efficient data exchange between R and Spark
library(sparklyr)   # Spark connection and data manipulation
library(dplyr)      # Data manipulation functions
library(rJavaEnv)   # Installs Java and sets JAVA_HOME environment

We can now use sparklyr to download and install Spark. In this chapter, we shall install Spark version 3.5.5, Java version 17 and set the JAVA_HOME and SPARK_HOME environment variables. Although you can initialise these variables system-wide, it is often easier to set them within your working file, especially if you have multiple installations of Spark and Java on your system.

Whilst Spark 3.5 is compiled for Java 8, 11, and 17, I previously used Java 11 as I found it to be more stable than 8 and 17. However, support for Java versions 8 and 11 will be discontinued in the next Spark major release (source). Therefore, I will use version 17 for this tutorial, and so should you. If you encounter problems while using version 17, switch to 11. Thankfully, the rJavaEnv package will automatically install Java and set the JAVA_HOME and PATH environment variables.

2.3 Installing Spark and setting environment variables

# Install and set up Spark environment
spark_install("3.5.5")  # Install the specific version of Spark (3.5.5)

# Installs Java 17 and sets JAVA_HOME environment variable
java_quick_install(version = 17)

# Set Spark home directory path (not obligatory as it is done implicitly upon installing Spark)
Sys.setenv("SPARK_HOME"=spark_home_dir(version = "3.5.5"))  # Set Spark home directory

2.4 Configuring Spark

We shall now create a folder where Spark will store temporary files. By default, Spark stores these files in memory, but in our case, we want them to be stored on disk. This is why we specify a spark_dir path to direct Spark to use disk storage for its temporary file storage.

# Define path for Spark data
spark_dir <- file.path(getwd(), "data", "spark")

We now initialise a list and provide configuration settings for Spark. This is arguably one of the most important steps, as it determines both how fast your data is processed and whether it is successfully processed. A typical Spark process involves reading data from files (for instance), processing it, transmitting it between executors (cores), and then writing it back to files. All of this is made possible by serialising and deserialising the data into bytes. Naturally, your choice of serializer will heavily influence the performance of your application. Here, we use Kryo serialisation, as it is “significantly faster and more compact than Java serialisation” (source).

Spark runs on the Java Virtual Machine (JVM), and Java heap space refers to the memory allocated to the JVM during runtime for storing objects and data. The heap memory is divided into Spark memory (M), reserved memory, and user memory. Spark memory itself is divided into two parts: execution and storage (R). Execution memory is used for computations such as shuffles, joins, sorts, and aggregations. Storage memory, on the other hand, is used for caching and propagating internal data across the cluster (when running in cluster mode). Read more about this here.

In our case, since we are running our code in local mode, we set the JVM heap space to 10GB using sparklyr.shell.driver-memory. We then allocate 70% of the JVM heap space to Spark memory (M) using the spark.memory.fraction option. This means 7GB is reserved for both storage and execution. By default, 50% of M (i.e., 3.5GB) is reserved for storage (R). Although this can be adjusted using spark.memory.storageFraction, we leave it at the default here. Importantly, when no execution memory is needed, R can make use of the entire 7GB.

Other configuration choices we make include enabling the storage of 2GB of data off-heap (i.e., outside the JVM) using the settings spark.memory.offHeap.enabled = "true" and spark.memory.offHeap.size = "2g". We also instruct Spark not to write intermediate shuffle data to disk—to avoid I/O bottlenecks—by setting spark.sql.shuffle.spill = "false".

To manage memory efficiently, we enable periodic garbage collection every 60 seconds with spark.cleaner.periodicGC.interval = "60s", which helps reclaim unused space. Additionally, we set our maximum partition file size to 200MB. It is recommended to keep this between 128MB and 200MB, depending on your dataset size and cluster resources (source).

Finally, we enable Adaptive Query Execution (AQE), which allows Spark to automatically optimise query plans during runtime, such as when performing joins, thereby improving performance without manual interference (source).

Please update the configuration settings based on your available RAM.

# Create an empty list for Spark configuration settings
config <- list()

# Set Spark configurations for memory and performance optimisation

# Use KryoSerializer for better performance
config$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"  

# Set temporary directory for Spark
config$`sparklyr.shell.driver-java-options` <- paste0("-Djava.io.tmpdir=", spark_dir)  

# Use compressed Oops for JVM performance
config$`sparklyr.shell.driver-java-options` <- "-XX:+UseCompressedOops"  

# Allocate 10GB of memory for the Spark driver
config$`sparklyr.shell.driver-memory` <- '10G'  

# Set fraction of heap memory used for Spark storage
config$spark.memory.fraction <- 0.7  

# Set shuffle partitions (local setting based on workload)
config$spark.sql.shuffle.partitions.local <- 24  

# Set extra memory for driver
config$spark.driver.extraJavaOptions <- "-Xmx1G"  

# Enable off-heap memory usage
config$spark.memory.offHeap.enabled <- "true" 

# Set 4GB for off-heap memory
config$spark.memory.offHeap.size <- "2g"  

# Disable shuffle spill to disk
config$spark.sql.shuffle.spill <- "false"  

# Periodic garbage collection interval
config$spark.cleaner.periodicGC.interval <- "60s"  

# Set max partition size for shuffle files
config$spark.sql.files.maxPartitionBytes <- "200m"  

# Enable adaptive query execution
config$spark.sql.adaptive.enabled <- "true"  

After configuring our setup, we now connect to Spark. Note that we have also instructed Spark to install the Delta package. This is a necessary step if you want to read from or write to Delta tables, which are commonly used for managing large-scale data with ACID transaction support among many other advantages. By including local[*] in our spark context, we have told Spark to use all available cores in our computer. If, for instance, you only wanted to use 4, you would change this to local[4].

# Connect to Spark with the specified configurations
sc <- spark_connect(
  master = "local[*]",  # Use all available cores for local execution
  config = config,      # Use the specified configurations
  packages = "delta"    # Install the Delta Lake package for optimised storage
)

I recommend using the Spark Web User Interface (UI) to track metrics associated with your Spark application. You can access it as shown below.

# Open Spark web UI for monitoring the connection
spark_web(sc)

After successfully setting up a Spark context, we now turn to loading our data. We start by specifying the path where the files are located. Note that we are instructing Spark to read all CSV files within the yellow_tripdata subfolder.

Additionally, we organise our data into 24 partitions. We chose 24 because it is three times the number of our total cores (8). This approach helps ensure parallelism during processing and prevents data skew, which could otherwise slow down our computations.

2.5 Loading the data

# Define the path for the yellow cab data
yellow_cab_parent_folder <- file.path(getwd(), "data", "yellow_tripdata")
yellow_cab_filepattern <- file.path(yellow_cab_parent_folder, "*csv")

# Read the yellow cab data from CSV files into a Spark DataFrame
yellow_cab_sdf <- spark_read_csv(
  sc, 
  path = yellow_cab_filepattern, 
  name = "yellow_cab_sdf"
  ) %>% 
    sdf_repartition(24)

# Print the structure of the DataFrame for inspection
print(yellow_cab_sdf, width = Inf)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   table<`sparklyr_tmp_bb0e7e12_3f41_4c55_a042_c6215cf965a0`> [?? x 19]
# Database: spark_connection
   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
   <chr>    <chr>                <chr>                 <chr>          
 1 1        2015-01-26 12:25:20  2015-01-26 12:39:02   1              
 2 2        2015-01-15 22:08:32  2015-01-15 22:23:12   1              
 3 1        2015-01-27 11:30:29  2015-01-27 11:34:35   1              
 4 1        2015-01-06 22:48:59  2015-01-06 22:59:12   1              
 5 1        2015-01-16 21:28:01  2015-01-16 21:58:08   2              
 6 2        2015-01-09 10:10:06  2015-01-09 10:17:24   2              
 7 2        2015-01-24 00:43:49  2015-01-24 00:45:52   1              
 8 1        2015-01-18 16:27:09  2015-01-18 16:35:31   2              
 9 1        2015-01-14 22:57:21  2015-01-14 23:10:52   1              
10 2        2015-01-02 02:09:53  2015-01-02 02:13:07   1              
   trip_distance pickup_longitude    pickup_latitude    RateCodeID
   <chr>         <chr>               <chr>              <chr>     
 1 1.60          -73.988983154296875 40.748603820800781 1         
 2 1.51          -73.994148254394531 40.751190185546875 1         
 3 .60           -73.902961730957031 40.770702362060547 1         
 4 1.80          -73.9700927734375   40.757877349853516 1         
 5 7.40          -73.99237060546875  40.742774963378906 1         
 6 1.15          -73.978851318359375 40.766895294189453 1         
 7 .60           -73.987869262695312 40.749561309814453 1         
 8 1.00          -73.991905212402344 40.755977630615234 1         
 9 1.50          -73.9891357421875   40.758594512939453 1         
10 .72           -73.978500366210938 40.783191680908203 1         
   store_and_fwd_flag dropoff_longitude   dropoff_latitude   payment_type
   <chr>              <chr>               <chr>              <chr>       
 1 N                  -73.98883056640625  40.740432739257813 1           
 2 N                  -73.972984313964844 40.750438690185547 2           
 3 N                  -73.911933898925781 40.775138854980469 1           
 4 N                  -73.994132995605469 40.751636505126953 1           
 5 N                  -73.908111572265625 40.708057403564453 1           
 6 N                  -73.979690551757812 40.781406402587891 1           
 7 N                  -73.985969543457031 40.756549835205078 2           
 8 N                  -73.98345947265625  40.757297515869141 2           
 9 N                  -73.973533630371094 40.747013092041016 1           
10 N                  -73.971961975097656 40.783878326416016 2           
   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
   <chr>       <chr> <chr>   <chr>      <chr>        <chr>                
 1 9.5         0     0.5     2.05       0            0.3                  
 2 11          0.5   0.5     0          0            0.3                  
 3 4.5         0     0.5     1.55       0            0.3                  
 4 8.5         0.5   0.5     1.95       0            0.3                  
 5 26.5        0.5   0.5     5.55       0            0.3                  
 6 7           0     0.5     1.4        0            0.3                  
 7 4           0.5   0.5     0          0            0.3                  
 8 7           0     0.5     0          0            0.3                  
 9 10          0.5   0.5     1          0            0.3                  
10 4.5         0.5   0.5     0          0            0.3                  
   total_amount
   <chr>       
 1 12.35       
 2 12.3        
 3 6.85        
 4 11.75       
 5 33.35       
 6 9.2         
 7 5.3         
 8 7.8         
 9 12.3        
10 5.8         
# ℹ more rows

Below we can see how many columns and rows our data has.

# Get the number of rows and columns in the DataFrame
sdf_ncol(yellow_cab_sdf)
[1] 19
sdf_nrow(yellow_cab_sdf)
[1] 47248845

Looking at the number of partitions, we see that each core will be responsible for an approximate equal number of rows for each task. This ensures that all cores are doing an equal amount of work, without any being overworked.

# Number of rows per each partition
yellow_cab_sdf %>% 
  sdf_partition_sizes()
   partition_index partition_size
1                0        1968700
2                1        1968700
3                2        1968701
4                3        1968700
5                4        1968701
6                5        1968703
7                6        1968702
8                7        1968702
9                8        1968701
10               9        1968703
11              10        1968706
12              11        1968704
13              12        1968703
14              13        1968703
15              14        1968701
16              15        1968700
17              16        1968701
18              17        1968702
19              18        1968701
20              19        1968701
21              20        1968700
22              21        1968703
23              22        1968705
24              23        1968702

2.6 Preprocessing

2.6.1 Updating the schema

Depending on how much data you loaded, you may find that all the variables are in character format. This is not ideal, both for processing and memory allocation, as strings take up a significant amount of space.

# Print the schema (column types) of the DataFrame
sdf_schema(yellow_cab_sdf)
$VendorID
$VendorID$name
[1] "VendorID"

$VendorID$type
[1] "StringType"


$tpep_pickup_datetime
$tpep_pickup_datetime$name
[1] "tpep_pickup_datetime"

$tpep_pickup_datetime$type
[1] "StringType"


$tpep_dropoff_datetime
$tpep_dropoff_datetime$name
[1] "tpep_dropoff_datetime"

$tpep_dropoff_datetime$type
[1] "StringType"


$passenger_count
$passenger_count$name
[1] "passenger_count"

$passenger_count$type
[1] "StringType"


$trip_distance
$trip_distance$name
[1] "trip_distance"

$trip_distance$type
[1] "StringType"


$pickup_longitude
$pickup_longitude$name
[1] "pickup_longitude"

$pickup_longitude$type
[1] "StringType"


$pickup_latitude
$pickup_latitude$name
[1] "pickup_latitude"

$pickup_latitude$type
[1] "StringType"


$RateCodeID
$RateCodeID$name
[1] "RateCodeID"

$RateCodeID$type
[1] "StringType"


$store_and_fwd_flag
$store_and_fwd_flag$name
[1] "store_and_fwd_flag"

$store_and_fwd_flag$type
[1] "StringType"


$dropoff_longitude
$dropoff_longitude$name
[1] "dropoff_longitude"

$dropoff_longitude$type
[1] "StringType"


$dropoff_latitude
$dropoff_latitude$name
[1] "dropoff_latitude"

$dropoff_latitude$type
[1] "StringType"


$payment_type
$payment_type$name
[1] "payment_type"

$payment_type$type
[1] "StringType"


$fare_amount
$fare_amount$name
[1] "fare_amount"

$fare_amount$type
[1] "StringType"


$extra
$extra$name
[1] "extra"

$extra$type
[1] "StringType"


$mta_tax
$mta_tax$name
[1] "mta_tax"

$mta_tax$type
[1] "StringType"


$tip_amount
$tip_amount$name
[1] "tip_amount"

$tip_amount$type
[1] "StringType"


$tolls_amount
$tolls_amount$name
[1] "tolls_amount"

$tolls_amount$type
[1] "StringType"


$improvement_surcharge
$improvement_surcharge$name
[1] "improvement_surcharge"

$improvement_surcharge$type
[1] "StringType"


$total_amount
$total_amount$name
[1] "total_amount"

$total_amount$type
[1] "StringType"

We shall, therefore, update the schema accordingly.

# Data cleaning: Convert columns to appropriate types
yellow_cab_sdf <- yellow_cab_sdf |>
  mutate(
    VendorID = as.integer(VendorID),  # Convert VendorID to integer
    tpep_pickup_datetime = to_timestamp(tpep_pickup_datetime),  # Convert to timestamp
    tpep_dropoff_datetime = to_timestamp(tpep_dropoff_datetime),  # Convert to timestamp
    passenger_count = as.integer(passenger_count),  # Convert to integer
    trip_distance = as.numeric(trip_distance),  # Convert to numeric
    pickup_longitude = as.numeric(pickup_longitude),  # Convert to numeric
    pickup_latitude = as.numeric(pickup_latitude),  # Convert to numeric
    RateCodeID = as.character(RateCodeID),  # Convert to character
    store_and_fwd_flag = as.character(store_and_fwd_flag),  # Convert to character
    dropoff_longitude = as.numeric(dropoff_longitude),  # Convert to numeric
    dropoff_latitude = as.numeric(dropoff_latitude),  # Convert to numeric
    payment_type = as.character(payment_type),  # Convert to character
    fare_amount = as.numeric(fare_amount),  # Convert to numeric
    extra = as.numeric(extra),  # Convert to numeric
    mta_tax = as.numeric(mta_tax),  # Convert to numeric
    tip_amount = as.numeric(tip_amount),  # Convert to numeric
    tolls_amount = as.numeric(tolls_amount),  # Convert to numeric
    improvement_surcharge = as.numeric(improvement_surcharge),  # Convert to numeric
    total_amount = as.numeric(total_amount)  # Convert to numeric
  )

2.6.2 Missing values

We now want to check if we have any missing values. By calling collect(), we are triggering an action. By default, Spark performs lazy evaluation, meaning it does not execute every line of code immediately. The code is only executed when actions are performed, such as collect() and count(). Learn more about this here.

By calling collect(), we will change the class of the resulting object into an R dataframe rather than a Spark dataframe.

# Handle missing values: Summarise the missing values in each column
missing_values_by_col <- yellow_cab_sdf |>
  summarise_all(~ sum(as.integer(is.na(.)))) |>
  collect()
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
# Print missing values summary
print(missing_values_by_col, width = Inf)
# A tibble: 1 × 19
  VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
     <int>                <int>                 <int>           <int>
1        0                    0                     0               0
  trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag
          <int>            <int>           <int>      <int>              <int>
1             0                0               0          0                  0
  dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax
              <int>            <int>        <int>       <int> <int>   <int>
1                 0                0            0           0     0       0
  tip_amount tolls_amount improvement_surcharge total_amount
       <int>        <int>                 <int>        <int>
1          0            0                     3            0
# print classes of yellow_cab_sdf and missing_values_by_col
print(yellow_cab_sdf %>% class())
[1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"      
print(missing_values_by_col %>% class())
[1] "tbl_df"     "tbl"        "data.frame"

We can see that the only column with missing values is improvement_surcharge. We shall impute the missing data using the median value of the column and create a new column called improvement_surcharge_imputed.

# Impute missing values for specific columns (e.g., "improvement_surcharge")
input_cols <- c("improvement_surcharge")
output_cols <- paste0(input_cols, "_imputed")

yellow_cab_sdf <- yellow_cab_sdf |>
  ft_imputer(input_cols = input_cols,   # Specify input columns
             output_cols = output_cols,  # Specify output columns
             strategy = "median")  # Use median strategy for imputation

2.6.3 Duplicates

We shall now remove duplicates based on specific columns.

# Remove duplicate rows based on specific columns
yellow_cab_sdf <- sdf_drop_duplicates(
  yellow_cab_sdf,
  cols = c(
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "pickup_longitude",
    "pickup_latitude",
    "dropoff_longitude",
    "dropoff_latitude"
  )
)

2.6.4 Outliers

We shall also handle outliers by filtering out unreasonable values in our dataset.

# Handle outliers by filtering unreasonable values in columns
summary_stats <- sdf_describe(
  yellow_cab_sdf,
  cols = c(
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "total_amount"
  )
) |>
  collect()

print(summary_stats, width=Inf)
# A tibble: 5 × 5
  summary passenger_count    trip_distance     fare_amount       
  <chr>   <chr>              <chr>             <chr>             
1 count   47231381           47231381          47231381          
2 mean    1.6669565304474159 7.511100940283872 12.396603992375454
3 stddev  1.3220203537930795 6488.8576466189   78.63046129746266 
4 min     0                  -3390583.8        -450.0            
5 max     9                  1.90726288E7      429496.72         
  total_amount      
  <chr>             
1 47231381          
2 15.598314385358897
3 580.2462379599986 
4 -450.3            
5 3950611.6         
# Filter out outliers based on summary statistics
yellow_cab_sdf <- yellow_cab_sdf |>
  filter(fare_amount > 0 & fare_amount <= 1000,
         trip_distance > 0 & trip_distance < 100)

2.6.5 Feauture Engineering

This is followed by performing feature engineering, where we derive certain columns such as the hour, day, week, and month of pickup and dropoff. We also derive variables indicating whether the pickup and dropoff occurred on a weekend and whether the pickup was during rush hour.

# Feature Engineering: Create new time-based features (pickup and dropoff times)
yellow_cab_sdf <- yellow_cab_sdf |>
  mutate(
    pickup_hour = hour(tpep_pickup_datetime),  # Hour of the pickup
    pickup_dayofweek = date_format(tpep_pickup_datetime, "E"),  # Day of the week for pickup
    pickup_week = weekofyear(tpep_pickup_datetime),  # Week of the year for pickup
    pickup_month = month(tpep_pickup_datetime),  # Month of pickup
    dropoff_hour = hour(tpep_dropoff_datetime),  # Hour of the dropoff
    dropoff_dayofweek = date_format(tpep_pickup_datetime, "E"),  # Day of the week for dropoff
    dropoff_week = weekofyear(tpep_dropoff_datetime),  # Week of the year for dropoff
    dropoff_month = month(tpep_dropoff_datetime),  # Month of dropoff
    is_weekend_pickup = ifelse(pickup_dayofweek %in% c("Sat", "Sun"), 1, 0),  # Weekend pickup flag
    is_weekend_dropoff = ifelse(dropoff_dayofweek %in% c("Sat", "Sun"), 1, 0),  # Weekend dropoff flag
    is_rush_hour_pickup = ifelse(pickup_hour %in% c(7:9, 16:19), 1, 0)  # Rush hour pickup flag
  )

2.6.6 Standardisation

We now normalise trip_distance and fare_amount to standardise our data for modelling.

# Normalise features to standardise data for machine learning
yellow_cab_sdf <- yellow_cab_sdf %>%
  mutate(
    trip_distance_scaled = (trip_distance - mean(trip_distance)) / sd(trip_distance),  # Standardise trip distance
    fare_amount_scaled = (fare_amount - mean(fare_amount)) / sd(fare_amount)  # Standardise fare amount
  )

# Print the first 5 rows of the updated data
print(yellow_cab_sdf, n=5, width = Inf)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   SQL [?? x 33]
# Database: spark_connection
  VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
     <int> <dttm>               <dttm>                          <int>
1        2 2015-01-01 00:00:00  2015-01-01 00:00:00                 1
2        1 2015-01-01 00:02:57  2015-01-01 00:05:36                 1
3        1 2015-01-01 00:06:44  2015-01-01 00:07:06                 2
4        1 2015-01-01 00:01:51  2015-01-01 00:07:07                 3
5        1 2015-01-01 00:03:58  2015-01-01 00:07:23                 1
  trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag
          <dbl>            <dbl>           <dbl> <chr>      <chr>             
1          1.68            -74.0            40.8 1          N                 
2          0.6             -74.0            40.8 1          N                 
3          7.8             -74.0            40.8 1          N                 
4          0.7             -74.0            40.7 1          N                 
5          0.3             -74.0            40.7 1          N                 
  dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax
              <dbl>            <dbl> <chr>              <dbl> <dbl>   <dbl>
1               0                0   2                   10     0       0.5
2             -74.0             40.8 1                    4     0.5     0.5
3             -74.0             40.8 2                    2.5   0.5     0.5
4             -74.0             40.7 2                    5.5   0.5     0.5
5             -74.0             40.7 2                    4     0.5     0.5
  tip_amount tolls_amount improvement_surcharge total_amount
       <dbl>        <dbl>                 <dbl>        <dbl>
1        0              0                   0.3         10.8
2        1.5            0                   0            6.8
3        0              0                   0            3.8
4        0              0                   0            6.8
5        0              0                   0            5.3
  improvement_surcharge_imputed pickup_hour pickup_dayofweek pickup_week
                          <dbl>       <int> <chr>                  <int>
1                           0.3           0 Thu                        1
2                           0             0 Thu                        1
3                           0             0 Thu                        1
4                           0             0 Thu                        1
5                           0             0 Thu                        1
  pickup_month dropoff_hour dropoff_dayofweek dropoff_week dropoff_month
         <int>        <int> <chr>                    <int>         <int>
1            1            0 Thu                          1             1
2            1            0 Thu                          1             1
3            1            0 Thu                          1             1
4            1            0 Thu                          1             1
5            1            0 Thu                          1             1
  is_weekend_pickup is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled
              <dbl>              <dbl>               <dbl>                <dbl>
1                 0                  0                   0               -0.342
2                 0                  0                   0               -0.648
3                 0                  0                   0                1.39 
4                 0                  0                   0               -0.620
5                 0                  0                   0               -0.733
  fare_amount_scaled
               <dbl>
1             -0.227
2             -0.811
3             -0.957
4             -0.665
5             -0.811
# ℹ more rows

2.7 Separating the data

At this point, I separate my data into two sets: location-related data and other non-location data. I do this because the next few steps involve obtaining additional geospatial variables solely based on pickup and dropoff coordinates. Instead of working with a dataset containing 20-plus columns, I will now only need four: trip_id, latitude, longitude, and is_pickup.

The only downside is that I will double the number of rows since pickup and dropoff coordinates for the same trip will now be in separate rows. I justify this decision because the alternative—performing heavy spatial joins twice on the same dataset—is quite resource-intensive. Another alternative would be to save the pickup and dropoff locations in separate datasets. Ultimately, you can make various design decisions based on the resources available to you.

# Separate data into two parts: location and trip metadata
yellow_cab_sdf <- yellow_cab_sdf %>% 
  sdf_with_unique_id(id = "trip_id")  # Add unique trip ID
# Create separate DataFrames for pickup and dropoff locations
pickup_sdf <- yellow_cab_sdf %>% 
  transmute(
    trip_id,
    latitude = pickup_latitude,
    longitude = pickup_longitude,
    is_pickup = 1  # Flag for pickup locations
  )

dropoff_sdf <- yellow_cab_sdf %>% 
  transmute(
    trip_id,
    latitude = dropoff_latitude,
    longitude = dropoff_longitude,
    is_pickup = 0  # Flag for dropoff locations
  )

# Combine pickup and dropoff locations into one DataFrame
locations_sdf <- sdf_bind_rows(
  pickup_sdf,
  dropoff_sdf
)

print(locations_sdf, width = Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   table<`sparklyr_tmp__403dd909_e019_4658_b714_341a9a08f70e`> [?? x 4]
# Database: spark_connection
   trip_id latitude longitude is_pickup
     <dbl>    <dbl>     <dbl>     <dbl>
 1     400     40.8     -74.0         1
 2     401     40.8     -74.0         1
 3     402     40.7     -74.0         1
 4     403     40.7     -74.0         1
 5     404      0         0           1
 6     405     40.8     -74.0         1
 7     400     40.8     -74.0         0
 8     401     40.8     -73.9         0
 9     402     40.7     -74.0         0
10     403     40.7     -74.0         0
# ℹ more rows
# Create another DataFrame for non-location trip data (excluding coordinates)
trip_data_sdf <- yellow_cab_sdf %>% 
  select(
    -c(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)  # Exclude latitude and longitude
  )

print(trip_data_sdf, width = Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   SQL [?? x 30]
# Database: spark_connection
  VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
     <int> <dttm>               <dttm>                          <int>
1        2 2015-01-01 00:32:14  2015-01-01 00:40:14                 2
2        1 2015-01-01 00:30:35  2015-01-01 00:40:15                 1
3        1 2015-01-01 00:30:28  2015-01-01 00:40:22                 1
4        2 2015-01-01 00:28:51  2015-01-01 00:40:22                 1
5        1 2015-01-01 00:23:18  2015-01-01 00:40:22                 2
6        2 2015-01-01 00:24:48  2015-01-01 00:40:23                 2
  trip_distance RateCodeID store_and_fwd_flag payment_type fare_amount extra
          <dbl> <chr>      <chr>              <chr>              <dbl> <dbl>
1          1.82 1          N                  2                    8     0.5
2          2    1          N                  2                    9.5   0.5
3          4.7  1          N                  1                   14.7   0.5
4          1.46 1          N                  1                    9.5   0.5
5          2    1          N                  2                   11.5   0.5
6          2.46 1          N                  1                   12     0.5
  mta_tax tip_amount tolls_amount improvement_surcharge total_amount
    <dbl>      <dbl>        <dbl>                 <dbl>        <dbl>
1     0.5       0               0                   0.3          9.3
2     0.5       0               0                   0           10.8
3     0.5       3.2             0                   0           19.2
4     0.5       3               0                   0.3         13.8
5     0.5       0               0                   0           12.8
6     0.5       3.75            0                   0.3         17.0
  improvement_surcharge_imputed pickup_hour pickup_dayofweek pickup_week
                          <dbl>       <int> <chr>                  <int>
1                           0.3           0 Thu                        1
2                           0             0 Thu                        1
3                           0             0 Thu                        1
4                           0.3           0 Thu                        1
5                           0             0 Thu                        1
6                           0.3           0 Thu                        1
  pickup_month dropoff_hour dropoff_dayofweek dropoff_week dropoff_month
         <int>        <int> <chr>                    <int>         <int>
1            1            0 Thu                          1             1
2            1            0 Thu                          1             1
3            1            0 Thu                          1             1
4            1            0 Thu                          1             1
5            1            0 Thu                          1             1
6            1            0 Thu                          1             1
  is_weekend_pickup is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled
              <dbl>              <dbl>               <dbl>                <dbl>
1                 0                  0                   0               -0.302
2                 0                  0                   0               -0.251
3                 0                  0                   0                0.514
4                 0                  0                   0               -0.404
5                 0                  0                   0               -0.251
6                 0                  0                   0               -0.121
  fare_amount_scaled trip_id
               <dbl>   <dbl>
1            -0.421      400
2            -0.275      401
3             0.231      402
4            -0.275      403
5            -0.0806     404
6            -0.0320     405

2.8 Writing the data

Finally, we save the preprocessed data into Delta Lake. While we had no choice in determining the format of the initial dataset, we do have a choice in how we write it. Delta Lake is based on Parquet files, but incorporates additional metadata that improves the efficiency of dealing with multiple parquet files.

The main difference between Parquet files and CSV files is that Parquet is columnar-based, while CSV is row-based. This offers several advantages to Parquet files, such as faster reading and smaller file sizes. Delta Lake further enhances Parquet files by adding ACID capabilities, among other features. You can find a detailed discussion of the advantages of using Delta tables over Parquet files here.

# Save the location and trip data to disk using Delta Lake format
save_file_path_locations_sdf <- file.path(getwd(), "data", "locations_sdf")
spark_write_delta(
  locations_sdf,
  save_file_path_locations_sdf,
  mode = "overwrite"  # Overwrite existing file if it exists
)

save_file_path_trip_data_sdf <- file.path(getwd(), "data", "trip_data_sdf")
spark_write_delta(
  trip_data_sdf,
  save_file_path_trip_data_sdf,
  mode = "overwrite"  # Overwrite existing file if it exists
)

2.9 Disconnecting Spark context

Finally, we disconnect from our Spark context to release the memory being held by Spark.

# Disconnect from Spark session
spark_disconnect(sc)