install.packages("arrow")
install.packages("sparklyr")
install.packages("rJavaEnv")
2 Preprocessing and Feature Engineering for Yellow Cab Trip Data
2.1 Introduction
In this chapter, we shall demonstrate how to perform basic data cleaning and feature engineering using Sparklyr, and how to save the data in the Delta Lake format.
The primary dataset was downloaded from Kaggle here. It provides information about taxi trips, including the pickup and dropoff times and locations. Our goal is to enrich it using additional data obtained from geospatial sources, and leave the rest to you to visualise it and perform analysis that predicts taxi trip durations.
The overarching goal is to show you how to go about using Delta Lake, Sparklyr, Apache Sedona, and R for big data geospatial analysis when you only have an ordinary computer at your disposal.
For my actual analysis, I used the entire 7.4 GB dataset provided, containing about 47 million rows. However, for this published tutorial, I sometimes use less data so as to timely publish and update this website. For reference, I am using an M1 MacBook with 16 GB of RAM and 500 GB of disk space.
If you have 8 GB of RAM, I would suggest that you use one of the four datasets available, as they are also relatively massive with about 12 million rows each!
Anyhow, enough talking — let us get to work.
2.2 Installing and loading packages
We shall start by installing and loading the necessary libraries: arrow
, sparklyr
, rJavaEnv
, and dplyr
.
We use sparklyr
to interface with Apache Spark in R, allowing us to work efficiently with large datasets using distributed computing. Spark operates on a cluster-based architecture, where a driver program coordinates tasks and executors perform computations across multiple nodes in parallel. However, when clusters are unavailable, Spark can also run in local mode, using a single machine while still leveraging parallelism to speed up computations. This makes it accessible for development and smaller-scale analyses. The dplyr
package provides powerful data manipulation functions that integrate seamlessly with Spark, making it easier to transform and summarise data. Meanwhile, rJavaEnv
is a package that allows us to automatically install Java and set the JAVA_HOME variable as it is a requirement for using Spark, because Spark runs on a Java Virtual Machine (JVM). Finally, we load arrow
, which enhances Spark’s performance when copying, collecting, and transforming data, thereby improving the overall efficiency of our analysis.
# Load required libraries
library(arrow) # Handle efficient data exchange between R and Spark
library(sparklyr) # Spark connection and data manipulation
library(dplyr) # Data manipulation functions
library(rJavaEnv) # Installs Java and sets JAVA_HOME environment
We can now use sparklyr
to download and install Spark. In this chapter, we shall install Spark version 3.5.5, Java version 17 and set the JAVA_HOME
and SPARK_HOME
environment variables. Although you can initialise these variables system-wide, it is often easier to set them within your working file, especially if you have multiple installations of Spark and Java on your system.
Whilst Spark 3.5 is compiled for Java 8, 11, and 17, I previously used Java 11 as I found it to be more stable than 8 and 17. However, support for Java versions 8 and 11 will be discontinued in the next Spark major release (source). Therefore, I will use version 17 for this tutorial, and so should you. If you encounter problems while using version 17, switch to 11. Thankfully, the rJavaEnv
package will automatically install Java and set the JAVA_HOME and PATH environment variables.
2.3 Installing Spark and setting environment variables
# Install and set up Spark environment
spark_install("3.5.5") # Install the specific version of Spark (3.5.5)
# Installs Java 17 and sets JAVA_HOME environment variable
java_quick_install(version = 17)
# Set Spark home directory path (not obligatory as it is done implicitly upon installing Spark)
Sys.setenv("SPARK_HOME"=spark_home_dir(version = "3.5.5")) # Set Spark home directory
2.4 Configuring Spark
We shall now create a folder where Spark will store temporary files. By default, Spark stores these files in memory, but in our case, we want them to be stored on disk. This is why we specify a spark_dir
path to direct Spark to use disk storage for its temporary file storage.
# Define path for Spark data
<- file.path(getwd(), "data", "spark") spark_dir
We now initialise a list and provide configuration settings for Spark. This is arguably one of the most important steps, as it determines both how fast your data is processed and whether it is successfully processed. A typical Spark process involves reading data from files (for instance), processing it, transmitting it between executors (cores), and then writing it back to files. All of this is made possible by serialising and deserialising the data into bytes. Naturally, your choice of serializer will heavily influence the performance of your application. Here, we use Kryo serialisation, as it is “significantly faster and more compact than Java serialisation” (source).
Spark runs on the Java Virtual Machine (JVM), and Java heap space refers to the memory allocated to the JVM during runtime for storing objects and data. The heap memory is divided into Spark memory (M), reserved memory, and user memory. Spark memory itself is divided into two parts: execution and storage (R). Execution memory is used for computations such as shuffles, joins, sorts, and aggregations. Storage memory, on the other hand, is used for caching and propagating internal data across the cluster (when running in cluster mode). Read more about this here.
In our case, since we are running our code in local mode, we set the JVM heap space to 10GB using sparklyr.shell.driver-memory
. We then allocate 70% of the JVM heap space to Spark memory (M) using the spark.memory.fraction
option. This means 7GB is reserved for both storage and execution. By default, 50% of M (i.e., 3.5GB) is reserved for storage (R). Although this can be adjusted using spark.memory.storageFraction
, we leave it at the default here. Importantly, when no execution memory is needed, R can make use of the entire 7GB.
Other configuration choices we make include enabling the storage of 2GB of data off-heap (i.e., outside the JVM) using the settings spark.memory.offHeap.enabled = "true"
and spark.memory.offHeap.size = "2g"
. We also instruct Spark not to write intermediate shuffle data to disk—to avoid I/O bottlenecks—by setting spark.sql.shuffle.spill = "false"
.
To manage memory efficiently, we enable periodic garbage collection every 60 seconds with spark.cleaner.periodicGC.interval = "60s"
, which helps reclaim unused space. Additionally, we set our maximum partition file size to 200MB. It is recommended to keep this between 128MB and 200MB, depending on your dataset size and cluster resources (source).
Finally, we enable Adaptive Query Execution (AQE), which allows Spark to automatically optimise query plans during runtime, such as when performing joins, thereby improving performance without manual interference (source).
Please update the configuration settings based on your available RAM.
# Create an empty list for Spark configuration settings
<- list()
config
# Set Spark configurations for memory and performance optimisation
# Use KryoSerializer for better performance
$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
config
# Set temporary directory for Spark
$`sparklyr.shell.driver-java-options` <- paste0("-Djava.io.tmpdir=", spark_dir)
config
# Use compressed Oops for JVM performance
$`sparklyr.shell.driver-java-options` <- "-XX:+UseCompressedOops"
config
# Allocate 10GB of memory for the Spark driver
$`sparklyr.shell.driver-memory` <- '10G'
config
# Set fraction of heap memory used for Spark storage
$spark.memory.fraction <- 0.7
config
# Set shuffle partitions (local setting based on workload)
$spark.sql.shuffle.partitions.local <- 24
config
# Set extra memory for driver
$spark.driver.extraJavaOptions <- "-Xmx1G"
config
# Enable off-heap memory usage
$spark.memory.offHeap.enabled <- "true"
config
# Set 4GB for off-heap memory
$spark.memory.offHeap.size <- "2g"
config
# Disable shuffle spill to disk
$spark.sql.shuffle.spill <- "false"
config
# Periodic garbage collection interval
$spark.cleaner.periodicGC.interval <- "60s"
config
# Set max partition size for shuffle files
$spark.sql.files.maxPartitionBytes <- "200m"
config
# Enable adaptive query execution
$spark.sql.adaptive.enabled <- "true" config
After configuring our setup, we now connect to Spark. Note that we have also instructed Spark to install the Delta package. This is a necessary step if you want to read from or write to Delta tables, which are commonly used for managing large-scale data with ACID transaction support among many other advantages. By including local[*] in our spark context, we have told Spark to use all available cores in our computer. If, for instance, you only wanted to use 4, you would change this to local[4].
# Connect to Spark with the specified configurations
<- spark_connect(
sc master = "local[*]", # Use all available cores for local execution
config = config, # Use the specified configurations
packages = "delta" # Install the Delta Lake package for optimised storage
)
I recommend using the Spark Web User Interface (UI) to track metrics associated with your Spark application. You can access it as shown below.
# Open Spark web UI for monitoring the connection
spark_web(sc)
After successfully setting up a Spark context, we now turn to loading our data. We start by specifying the path where the files are located. Note that we are instructing Spark to read all CSV files within the yellow_tripdata
subfolder.
Additionally, we organise our data into 24 partitions. We chose 24 because it is three times the number of our total cores (8). This approach helps ensure parallelism during processing and prevents data skew, which could otherwise slow down our computations.
2.5 Loading the data
# Define the path for the yellow cab data
<- file.path(getwd(), "data", "yellow_tripdata")
yellow_cab_parent_folder <- file.path(yellow_cab_parent_folder, "*csv")
yellow_cab_filepattern
# Read the yellow cab data from CSV files into a Spark DataFrame
<- spark_read_csv(
yellow_cab_sdf
sc, path = yellow_cab_filepattern,
name = "yellow_cab_sdf"
%>%
) sdf_repartition(24)
# Print the structure of the DataFrame for inspection
print(yellow_cab_sdf, width = Inf)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: table<`sparklyr_tmp_bb0e7e12_3f41_4c55_a042_c6215cf965a0`> [?? x 19]
# Database: spark_connection
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
<chr> <chr> <chr> <chr>
1 1 2015-01-26 12:25:20 2015-01-26 12:39:02 1
2 2 2015-01-15 22:08:32 2015-01-15 22:23:12 1
3 1 2015-01-27 11:30:29 2015-01-27 11:34:35 1
4 1 2015-01-06 22:48:59 2015-01-06 22:59:12 1
5 1 2015-01-16 21:28:01 2015-01-16 21:58:08 2
6 2 2015-01-09 10:10:06 2015-01-09 10:17:24 2
7 2 2015-01-24 00:43:49 2015-01-24 00:45:52 1
8 1 2015-01-18 16:27:09 2015-01-18 16:35:31 2
9 1 2015-01-14 22:57:21 2015-01-14 23:10:52 1
10 2 2015-01-02 02:09:53 2015-01-02 02:13:07 1
trip_distance pickup_longitude pickup_latitude RateCodeID
<chr> <chr> <chr> <chr>
1 1.60 -73.988983154296875 40.748603820800781 1
2 1.51 -73.994148254394531 40.751190185546875 1
3 .60 -73.902961730957031 40.770702362060547 1
4 1.80 -73.9700927734375 40.757877349853516 1
5 7.40 -73.99237060546875 40.742774963378906 1
6 1.15 -73.978851318359375 40.766895294189453 1
7 .60 -73.987869262695312 40.749561309814453 1
8 1.00 -73.991905212402344 40.755977630615234 1
9 1.50 -73.9891357421875 40.758594512939453 1
10 .72 -73.978500366210938 40.783191680908203 1
store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
<chr> <chr> <chr> <chr>
1 N -73.98883056640625 40.740432739257813 1
2 N -73.972984313964844 40.750438690185547 2
3 N -73.911933898925781 40.775138854980469 1
4 N -73.994132995605469 40.751636505126953 1
5 N -73.908111572265625 40.708057403564453 1
6 N -73.979690551757812 40.781406402587891 1
7 N -73.985969543457031 40.756549835205078 2
8 N -73.98345947265625 40.757297515869141 2
9 N -73.973533630371094 40.747013092041016 1
10 N -73.971961975097656 40.783878326416016 2
fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
<chr> <chr> <chr> <chr> <chr> <chr>
1 9.5 0 0.5 2.05 0 0.3
2 11 0.5 0.5 0 0 0.3
3 4.5 0 0.5 1.55 0 0.3
4 8.5 0.5 0.5 1.95 0 0.3
5 26.5 0.5 0.5 5.55 0 0.3
6 7 0 0.5 1.4 0 0.3
7 4 0.5 0.5 0 0 0.3
8 7 0 0.5 0 0 0.3
9 10 0.5 0.5 1 0 0.3
10 4.5 0.5 0.5 0 0 0.3
total_amount
<chr>
1 12.35
2 12.3
3 6.85
4 11.75
5 33.35
6 9.2
7 5.3
8 7.8
9 12.3
10 5.8
# ℹ more rows
Below we can see how many columns and rows our data has.
# Get the number of rows and columns in the DataFrame
sdf_ncol(yellow_cab_sdf)
[1] 19
sdf_nrow(yellow_cab_sdf)
[1] 47248845
Looking at the number of partitions, we see that each core will be responsible for an approximate equal number of rows for each task. This ensures that all cores are doing an equal amount of work, without any being overworked.
# Number of rows per each partition
%>%
yellow_cab_sdf sdf_partition_sizes()
partition_index partition_size
1 0 1968700
2 1 1968700
3 2 1968701
4 3 1968700
5 4 1968701
6 5 1968703
7 6 1968702
8 7 1968702
9 8 1968701
10 9 1968703
11 10 1968706
12 11 1968704
13 12 1968703
14 13 1968703
15 14 1968701
16 15 1968700
17 16 1968701
18 17 1968702
19 18 1968701
20 19 1968701
21 20 1968700
22 21 1968703
23 22 1968705
24 23 1968702
2.6 Preprocessing
2.6.1 Updating the schema
Depending on how much data you loaded, you may find that all the variables are in character format. This is not ideal, both for processing and memory allocation, as strings take up a significant amount of space.
# Print the schema (column types) of the DataFrame
sdf_schema(yellow_cab_sdf)
$VendorID
$VendorID$name
[1] "VendorID"
$VendorID$type
[1] "StringType"
$tpep_pickup_datetime
$tpep_pickup_datetime$name
[1] "tpep_pickup_datetime"
$tpep_pickup_datetime$type
[1] "StringType"
$tpep_dropoff_datetime
$tpep_dropoff_datetime$name
[1] "tpep_dropoff_datetime"
$tpep_dropoff_datetime$type
[1] "StringType"
$passenger_count
$passenger_count$name
[1] "passenger_count"
$passenger_count$type
[1] "StringType"
$trip_distance
$trip_distance$name
[1] "trip_distance"
$trip_distance$type
[1] "StringType"
$pickup_longitude
$pickup_longitude$name
[1] "pickup_longitude"
$pickup_longitude$type
[1] "StringType"
$pickup_latitude
$pickup_latitude$name
[1] "pickup_latitude"
$pickup_latitude$type
[1] "StringType"
$RateCodeID
$RateCodeID$name
[1] "RateCodeID"
$RateCodeID$type
[1] "StringType"
$store_and_fwd_flag
$store_and_fwd_flag$name
[1] "store_and_fwd_flag"
$store_and_fwd_flag$type
[1] "StringType"
$dropoff_longitude
$dropoff_longitude$name
[1] "dropoff_longitude"
$dropoff_longitude$type
[1] "StringType"
$dropoff_latitude
$dropoff_latitude$name
[1] "dropoff_latitude"
$dropoff_latitude$type
[1] "StringType"
$payment_type
$payment_type$name
[1] "payment_type"
$payment_type$type
[1] "StringType"
$fare_amount
$fare_amount$name
[1] "fare_amount"
$fare_amount$type
[1] "StringType"
$extra
$extra$name
[1] "extra"
$extra$type
[1] "StringType"
$mta_tax
$mta_tax$name
[1] "mta_tax"
$mta_tax$type
[1] "StringType"
$tip_amount
$tip_amount$name
[1] "tip_amount"
$tip_amount$type
[1] "StringType"
$tolls_amount
$tolls_amount$name
[1] "tolls_amount"
$tolls_amount$type
[1] "StringType"
$improvement_surcharge
$improvement_surcharge$name
[1] "improvement_surcharge"
$improvement_surcharge$type
[1] "StringType"
$total_amount
$total_amount$name
[1] "total_amount"
$total_amount$type
[1] "StringType"
We shall, therefore, update the schema accordingly.
# Data cleaning: Convert columns to appropriate types
<- yellow_cab_sdf |>
yellow_cab_sdf mutate(
VendorID = as.integer(VendorID), # Convert VendorID to integer
tpep_pickup_datetime = to_timestamp(tpep_pickup_datetime), # Convert to timestamp
tpep_dropoff_datetime = to_timestamp(tpep_dropoff_datetime), # Convert to timestamp
passenger_count = as.integer(passenger_count), # Convert to integer
trip_distance = as.numeric(trip_distance), # Convert to numeric
pickup_longitude = as.numeric(pickup_longitude), # Convert to numeric
pickup_latitude = as.numeric(pickup_latitude), # Convert to numeric
RateCodeID = as.character(RateCodeID), # Convert to character
store_and_fwd_flag = as.character(store_and_fwd_flag), # Convert to character
dropoff_longitude = as.numeric(dropoff_longitude), # Convert to numeric
dropoff_latitude = as.numeric(dropoff_latitude), # Convert to numeric
payment_type = as.character(payment_type), # Convert to character
fare_amount = as.numeric(fare_amount), # Convert to numeric
extra = as.numeric(extra), # Convert to numeric
mta_tax = as.numeric(mta_tax), # Convert to numeric
tip_amount = as.numeric(tip_amount), # Convert to numeric
tolls_amount = as.numeric(tolls_amount), # Convert to numeric
improvement_surcharge = as.numeric(improvement_surcharge), # Convert to numeric
total_amount = as.numeric(total_amount) # Convert to numeric
)
2.6.2 Missing values
We now want to check if we have any missing values. By calling collect()
, we are triggering an action. By default, Spark performs lazy evaluation, meaning it does not execute every line of code immediately. The code is only executed when actions are performed, such as collect()
and count()
. Learn more about this here.
By calling collect()
, we will change the class of the resulting object into an R dataframe rather than a Spark dataframe.
# Handle missing values: Summarise the missing values in each column
<- yellow_cab_sdf |>
missing_values_by_col summarise_all(~ sum(as.integer(is.na(.)))) |>
collect()
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
# Print missing values summary
print(missing_values_by_col, width = Inf)
# A tibble: 1 × 19
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
<int> <int> <int> <int>
1 0 0 0 0
trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag
<int> <int> <int> <int> <int>
1 0 0 0 0 0
dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax
<int> <int> <int> <int> <int> <int>
1 0 0 0 0 0 0
tip_amount tolls_amount improvement_surcharge total_amount
<int> <int> <int> <int>
1 0 0 3 0
# print classes of yellow_cab_sdf and missing_values_by_col
print(yellow_cab_sdf %>% class())
[1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
print(missing_values_by_col %>% class())
[1] "tbl_df" "tbl" "data.frame"
We can see that the only column with missing values is improvement_surcharge
. We shall impute the missing data using the median value of the column and create a new column called improvement_surcharge_imputed
.
# Impute missing values for specific columns (e.g., "improvement_surcharge")
<- c("improvement_surcharge")
input_cols <- paste0(input_cols, "_imputed")
output_cols
<- yellow_cab_sdf |>
yellow_cab_sdf ft_imputer(input_cols = input_cols, # Specify input columns
output_cols = output_cols, # Specify output columns
strategy = "median") # Use median strategy for imputation
2.6.3 Duplicates
We shall now remove duplicates based on specific columns.
# Remove duplicate rows based on specific columns
<- sdf_drop_duplicates(
yellow_cab_sdf
yellow_cab_sdf,cols = c(
"VendorID",
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude"
) )
2.6.4 Outliers
We shall also handle outliers by filtering out unreasonable values in our dataset.
# Handle outliers by filtering unreasonable values in columns
<- sdf_describe(
summary_stats
yellow_cab_sdf,cols = c(
"passenger_count",
"trip_distance",
"fare_amount",
"total_amount"
)|>
) collect()
print(summary_stats, width=Inf)
# A tibble: 5 × 5
summary passenger_count trip_distance fare_amount
<chr> <chr> <chr> <chr>
1 count 47231381 47231381 47231381
2 mean 1.6669565304474159 7.511100940283872 12.396603992375454
3 stddev 1.3220203537930795 6488.8576466189 78.63046129746266
4 min 0 -3390583.8 -450.0
5 max 9 1.90726288E7 429496.72
total_amount
<chr>
1 47231381
2 15.598314385358897
3 580.2462379599986
4 -450.3
5 3950611.6
# Filter out outliers based on summary statistics
<- yellow_cab_sdf |>
yellow_cab_sdf filter(fare_amount > 0 & fare_amount <= 1000,
> 0 & trip_distance < 100) trip_distance
2.6.5 Feauture Engineering
This is followed by performing feature engineering, where we derive certain columns such as the hour, day, week, and month of pickup and dropoff. We also derive variables indicating whether the pickup and dropoff occurred on a weekend and whether the pickup was during rush hour.
# Feature Engineering: Create new time-based features (pickup and dropoff times)
<- yellow_cab_sdf |>
yellow_cab_sdf mutate(
pickup_hour = hour(tpep_pickup_datetime), # Hour of the pickup
pickup_dayofweek = date_format(tpep_pickup_datetime, "E"), # Day of the week for pickup
pickup_week = weekofyear(tpep_pickup_datetime), # Week of the year for pickup
pickup_month = month(tpep_pickup_datetime), # Month of pickup
dropoff_hour = hour(tpep_dropoff_datetime), # Hour of the dropoff
dropoff_dayofweek = date_format(tpep_pickup_datetime, "E"), # Day of the week for dropoff
dropoff_week = weekofyear(tpep_dropoff_datetime), # Week of the year for dropoff
dropoff_month = month(tpep_dropoff_datetime), # Month of dropoff
is_weekend_pickup = ifelse(pickup_dayofweek %in% c("Sat", "Sun"), 1, 0), # Weekend pickup flag
is_weekend_dropoff = ifelse(dropoff_dayofweek %in% c("Sat", "Sun"), 1, 0), # Weekend dropoff flag
is_rush_hour_pickup = ifelse(pickup_hour %in% c(7:9, 16:19), 1, 0) # Rush hour pickup flag
)
2.6.6 Standardisation
We now normalise trip_distance
and fare_amount
to standardise our data for modelling.
# Normalise features to standardise data for machine learning
<- yellow_cab_sdf %>%
yellow_cab_sdf mutate(
trip_distance_scaled = (trip_distance - mean(trip_distance)) / sd(trip_distance), # Standardise trip distance
fare_amount_scaled = (fare_amount - mean(fare_amount)) / sd(fare_amount) # Standardise fare amount
)
# Print the first 5 rows of the updated data
print(yellow_cab_sdf, n=5, width = Inf)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: SQL [?? x 33]
# Database: spark_connection
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
<int> <dttm> <dttm> <int>
1 2 2015-01-01 00:00:00 2015-01-01 00:00:00 1
2 1 2015-01-01 00:02:57 2015-01-01 00:05:36 1
3 1 2015-01-01 00:06:44 2015-01-01 00:07:06 2
4 1 2015-01-01 00:01:51 2015-01-01 00:07:07 3
5 1 2015-01-01 00:03:58 2015-01-01 00:07:23 1
trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag
<dbl> <dbl> <dbl> <chr> <chr>
1 1.68 -74.0 40.8 1 N
2 0.6 -74.0 40.8 1 N
3 7.8 -74.0 40.8 1 N
4 0.7 -74.0 40.7 1 N
5 0.3 -74.0 40.7 1 N
dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax
<dbl> <dbl> <chr> <dbl> <dbl> <dbl>
1 0 0 2 10 0 0.5
2 -74.0 40.8 1 4 0.5 0.5
3 -74.0 40.8 2 2.5 0.5 0.5
4 -74.0 40.7 2 5.5 0.5 0.5
5 -74.0 40.7 2 4 0.5 0.5
tip_amount tolls_amount improvement_surcharge total_amount
<dbl> <dbl> <dbl> <dbl>
1 0 0 0.3 10.8
2 1.5 0 0 6.8
3 0 0 0 3.8
4 0 0 0 6.8
5 0 0 0 5.3
improvement_surcharge_imputed pickup_hour pickup_dayofweek pickup_week
<dbl> <int> <chr> <int>
1 0.3 0 Thu 1
2 0 0 Thu 1
3 0 0 Thu 1
4 0 0 Thu 1
5 0 0 Thu 1
pickup_month dropoff_hour dropoff_dayofweek dropoff_week dropoff_month
<int> <int> <chr> <int> <int>
1 1 0 Thu 1 1
2 1 0 Thu 1 1
3 1 0 Thu 1 1
4 1 0 Thu 1 1
5 1 0 Thu 1 1
is_weekend_pickup is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled
<dbl> <dbl> <dbl> <dbl>
1 0 0 0 -0.342
2 0 0 0 -0.648
3 0 0 0 1.39
4 0 0 0 -0.620
5 0 0 0 -0.733
fare_amount_scaled
<dbl>
1 -0.227
2 -0.811
3 -0.957
4 -0.665
5 -0.811
# ℹ more rows
2.7 Separating the data
At this point, I separate my data into two sets: location-related data and other non-location data. I do this because the next few steps involve obtaining additional geospatial variables solely based on pickup and dropoff coordinates. Instead of working with a dataset containing 20-plus columns, I will now only need four: trip_id
, latitude
, longitude
, and is_pickup
.
The only downside is that I will double the number of rows since pickup and dropoff coordinates for the same trip will now be in separate rows. I justify this decision because the alternative—performing heavy spatial joins twice on the same dataset—is quite resource-intensive. Another alternative would be to save the pickup and dropoff locations in separate datasets. Ultimately, you can make various design decisions based on the resources available to you.
# Separate data into two parts: location and trip metadata
<- yellow_cab_sdf %>%
yellow_cab_sdf sdf_with_unique_id(id = "trip_id") # Add unique trip ID
# Create separate DataFrames for pickup and dropoff locations
<- yellow_cab_sdf %>%
pickup_sdf transmute(
trip_id,latitude = pickup_latitude,
longitude = pickup_longitude,
is_pickup = 1 # Flag for pickup locations
)
<- yellow_cab_sdf %>%
dropoff_sdf transmute(
trip_id,latitude = dropoff_latitude,
longitude = dropoff_longitude,
is_pickup = 0 # Flag for dropoff locations
)
# Combine pickup and dropoff locations into one DataFrame
<- sdf_bind_rows(
locations_sdf
pickup_sdf,
dropoff_sdf
)
print(locations_sdf, width = Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: table<`sparklyr_tmp__403dd909_e019_4658_b714_341a9a08f70e`> [?? x 4]
# Database: spark_connection
trip_id latitude longitude is_pickup
<dbl> <dbl> <dbl> <dbl>
1 400 40.8 -74.0 1
2 401 40.8 -74.0 1
3 402 40.7 -74.0 1
4 403 40.7 -74.0 1
5 404 0 0 1
6 405 40.8 -74.0 1
7 400 40.8 -74.0 0
8 401 40.8 -73.9 0
9 402 40.7 -74.0 0
10 403 40.7 -74.0 0
# ℹ more rows
# Create another DataFrame for non-location trip data (excluding coordinates)
<- yellow_cab_sdf %>%
trip_data_sdf select(
-c(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude) # Exclude latitude and longitude
)
print(trip_data_sdf, width = Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: SQL [?? x 30]
# Database: spark_connection
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
<int> <dttm> <dttm> <int>
1 2 2015-01-01 00:32:14 2015-01-01 00:40:14 2
2 1 2015-01-01 00:30:35 2015-01-01 00:40:15 1
3 1 2015-01-01 00:30:28 2015-01-01 00:40:22 1
4 2 2015-01-01 00:28:51 2015-01-01 00:40:22 1
5 1 2015-01-01 00:23:18 2015-01-01 00:40:22 2
6 2 2015-01-01 00:24:48 2015-01-01 00:40:23 2
trip_distance RateCodeID store_and_fwd_flag payment_type fare_amount extra
<dbl> <chr> <chr> <chr> <dbl> <dbl>
1 1.82 1 N 2 8 0.5
2 2 1 N 2 9.5 0.5
3 4.7 1 N 1 14.7 0.5
4 1.46 1 N 1 9.5 0.5
5 2 1 N 2 11.5 0.5
6 2.46 1 N 1 12 0.5
mta_tax tip_amount tolls_amount improvement_surcharge total_amount
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.5 0 0 0.3 9.3
2 0.5 0 0 0 10.8
3 0.5 3.2 0 0 19.2
4 0.5 3 0 0.3 13.8
5 0.5 0 0 0 12.8
6 0.5 3.75 0 0.3 17.0
improvement_surcharge_imputed pickup_hour pickup_dayofweek pickup_week
<dbl> <int> <chr> <int>
1 0.3 0 Thu 1
2 0 0 Thu 1
3 0 0 Thu 1
4 0.3 0 Thu 1
5 0 0 Thu 1
6 0.3 0 Thu 1
pickup_month dropoff_hour dropoff_dayofweek dropoff_week dropoff_month
<int> <int> <chr> <int> <int>
1 1 0 Thu 1 1
2 1 0 Thu 1 1
3 1 0 Thu 1 1
4 1 0 Thu 1 1
5 1 0 Thu 1 1
6 1 0 Thu 1 1
is_weekend_pickup is_weekend_dropoff is_rush_hour_pickup trip_distance_scaled
<dbl> <dbl> <dbl> <dbl>
1 0 0 0 -0.302
2 0 0 0 -0.251
3 0 0 0 0.514
4 0 0 0 -0.404
5 0 0 0 -0.251
6 0 0 0 -0.121
fare_amount_scaled trip_id
<dbl> <dbl>
1 -0.421 400
2 -0.275 401
3 0.231 402
4 -0.275 403
5 -0.0806 404
6 -0.0320 405
2.8 Writing the data
Finally, we save the preprocessed data into Delta Lake. While we had no choice in determining the format of the initial dataset, we do have a choice in how we write it. Delta Lake is based on Parquet files, but incorporates additional metadata that improves the efficiency of dealing with multiple parquet files.
The main difference between Parquet files and CSV files is that Parquet is columnar-based, while CSV is row-based. This offers several advantages to Parquet files, such as faster reading and smaller file sizes. Delta Lake further enhances Parquet files by adding ACID capabilities, among other features. You can find a detailed discussion of the advantages of using Delta tables over Parquet files here.
# Save the location and trip data to disk using Delta Lake format
<- file.path(getwd(), "data", "locations_sdf")
save_file_path_locations_sdf spark_write_delta(
locations_sdf,
save_file_path_locations_sdf,mode = "overwrite" # Overwrite existing file if it exists
)
<- file.path(getwd(), "data", "trip_data_sdf")
save_file_path_trip_data_sdf spark_write_delta(
trip_data_sdf,
save_file_path_trip_data_sdf,mode = "overwrite" # Overwrite existing file if it exists
)
2.9 Disconnecting Spark context
Finally, we disconnect from our Spark context to release the memory being held by Spark.
# Disconnect from Spark session
spark_disconnect(sc)