install.packages("apache.sedona")
3 Processing Vector Data with Apache Sedona and Sparklyr in R
Using Vector Data to Obtain Median Household Income in Pickup and Droppoff Neighbourhoods
3.1 Introduction
In this part, we are going to use the saved locations data to determine the pickup and dropoff neighbourhoods associated with each trip. We shall then use these additional data to obtain the median household incomes of the pickup and dropoff neighbourhoods. We use income as a proxy for affluence. Humour me by assuming that there is a relationship between the duration of a taxi trip and the affluence of either or both the pickup or dropoff locations.
We shall introduce two new datasets: NYC Neighbourhood Tabulation Areas (NTAs) boundaries based on the 2020 census, and NTA household median income based on the 2022 American Community Survey (ACS). The boundaries data is in vector format, while the income data is in CSV format (originally .xlsx but filtered for GeoID and MdHHIncE columns, then saved as CSV). To do this, we shall use Apache Sedona (v1.7.1) to merge the NTA boundaries and income data. We will then perform a spatial join on the coordinates with the boundaries, determining the pickup and dropoff neighbourhoods.
By the way, this will be a new file and not the same as the one used in Chapter 2. When working in local mode, I have found that it is more feasible to separate preprocessing into multiple stages. Running too many transformations at once is likely to result in out-of-memory errors and take a very long time to complete.
3.2 Installing and loading packages
# Load necessary libraries for Spark, geospatial data, and data manipulation
library(arrow)
library(sparklyr)
library(sf)
library(dplyr)
3.3 Configuring Spark
# Define the Spark directory for temporary files
<- file.path(getwd(), "data", "spark") spark_dir
The configuration will mostly be kept similar to the one in the first file. The main difference is that some of the Delta configurations are explicitly included and not added as a package. This is because Delta and Apache Sedona clash when Delta is installed as a package when creating the Spark context.
You will notice a few differences in how the Spark context is set up this time around. Be sure to use this type of setup when working with Apache Sedona and reading or writing to Delta Lake.
# Create an empty list for Spark configuration settings
<- list()
config
# Set Spark configurations for memory and performance optimisation
# Configure some delta specific options
$spark.sql.extensions <- "io.delta.sql.DeltaSparkSessionExtension"
config$spark.sql.catalog.spark_catalog <- "org.apache.spark.sql.delta.catalog.DeltaCatalog"
config
# Use KryoSerializer for better performance
$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
config
# Set temporary directory for Spark
$`sparklyr.shell.driver-java-options` <- paste0("-Djava.io.tmpdir=", spark_dir)
config
# Use compressed Oops for JVM performance
$`sparklyr.shell.driver-java-options` <- "-XX:+UseCompressedOops"
config
# Allocate 8GB of memory for the Spark driver
$`sparklyr.shell.driver-memory` <- '10G'
config
# Set fraction of heap memory used for Spark storage
$spark.memory.fraction <- 0.7
config
# Set shuffle partitions (local setting based on workload)
$spark.sql.shuffle.partitions.local <- 24
config
# Set extra memory for driver
$spark.driver.extraJavaOptions <- "-Xmx1G"
config
# Enable off-heap memory usage
$spark.memory.offHeap.enabled <- "true"
config
# Set 4GB for off-heap memory
$spark.memory.offHeap.size <- "2g"
config
# Disable shuffle spill to disk
$spark.sql.shuffle.spill <- "false"
config
# Periodic garbage collection interval
$spark.cleaner.periodicGC.interval <- "60s"
config
# Set max partition size for shuffle files
$spark.sql.files.maxPartitionBytes <- "200m"
config
# Enable adaptive query execution
$spark.sql.adaptive.enabled <- "true" config
3.4 Instantiating spark context
As you can see, when initiating our Spark context, we explicitly include files associated with Delta and Apache Sedona. By doing so, Apache Sedona and Delta packages do not clash with each other.
# Connect to Spark with the defined configuration and additional packages for geospatial processing
<- spark_connect(
sc master = "local[*]",
config = config,
packages = c(
"io.delta:delta-spark_2.12:3.3.0",
"org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.0",
"org.datasyslab:geotools-wrapper:1.7.0-28.5"
) )
3.5 Loading Apache Sedona
It is only after initializing our Spark context that we can now load the Apache Sedona library and initialize its context. We are now ready to get started!
library(apache.sedona)
invoke_static(
sc,"org.apache.sedona.spark.SedonaContext",
"create",
spark_session(sc),
"r"
)
<jobj[23]>
org.apache.spark.sql.SparkSession
org.apache.spark.sql.SparkSession@30f8754c
# Launch Spark web UI to monitor the Spark session
spark_web(sc)
3.6 Loading datasets
3.6.1 Locations data
We now read our location data, which is thankfully in Delta Lake format.
# Define the folder containing location data (latitude and longitude of yellow cabs)
<- file.path(getwd(), "data", "locations_sdf")
locations_sdf_parent_folder <- spark_read_delta(
locations_sdf
sc, path = locations_sdf_parent_folder,
name = "locations_sdf"
%>%
) sdf_repartition(24)
print(locations_sdf, width=Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: table<`sparklyr_tmp_4c473781_95e4_4f97_8ceb_29e438553126`> [?? x 4]
# Database: spark_connection
trip_id latitude longitude is_pickup
<dbl> <dbl> <dbl> <dbl>
1 10626731 40.8 -74.0 1
2 8962259 40.7 -74.0 1
3 8129828 40.8 -74.0 1
4 7977508 40.8 -74.0 1
5 8775269 40.8 -73.9 1
6 7608894 40.8 -74.0 1
7 7466839 40.8 -74.0 1
8 3762150 40.8 -74.0 1
9 12333790 40.7 -74.0 1
10 562063 40.7 -74.0 1
# ℹ more rows
The data contains nearly 94 million rows!
# Print the number of rows in the locations SDF (Spark DataFrame)
sdf_nrow(locations_sdf)
[1] 93889664
And is partitioned equally for optimised wide transformations, especially joins. Wide transformations are those that require data shuffling (exchange) between multiple executors such as aggregations and joins. Meanwhile, narrow transformations do not require any exchange of data. Examples of narrow transformations include select and filter.
%>% sdf_partition_sizes() locations_sdf
partition_index partition_size
1 0 3912070
2 1 3912069
3 2 3912070
4 3 3912070
5 4 3912070
6 5 3912071
7 6 3912070
8 7 3912070
9 8 3912070
10 9 3912070
11 10 3912070
12 11 3912070
13 12 3912069
14 13 3912069
15 14 3912069
16 15 3912069
17 16 3912069
18 17 3912068
19 18 3912068
20 19 3912068
21 20 3912068
22 21 3912069
23 22 3912069
24 23 3912069
3.6.2 Median household income by neighbourhood data
We now load the average household income by neighbourhood in NYC.
# Load income data (household income by NYC neighbourhood)
<- file.path(getwd(), "data", "nyc_nta_med_inc", "nyc_nta_med_inc.csv")
nyc_nta_hh_income_file_path <- spark_read_csv(sc, path = nyc_nta_hh_income_file_path, name = "nyc_nta_hh_income") nyc_nta_hh_income
# Display the income data
print(nyc_nta_hh_income, width = Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: table<`nyc_nta_hh_income`> [?? x 2]
# Database: spark_connection
GeoID MdHHIncE
<chr> <int>
1 BK0101 125469
2 BK0102 129838
3 BK0103 36951
4 BK0104 71107
5 BK0201 179877
6 BK0202 149181
7 BK0203 107633
8 BK0204 115160
9 BK0301 76660
10 BK0302 69954
# ℹ more rows
3.6.3 NYC neighbourhoods data
We also load the shapefile using Apache Sedona. Note that we point Sedona to the entire folder and not just the specific .shp file, as is the case when reading shapefiles via sf.
# Load the shapefile for NYC neighbourhoods
<- file.path(getwd(), "data", "shapefiles", "nynta2020_25a")
ny_neighs_pathfile <- spark_read_shapefile(sc, path = ny_neighs_pathfile, name = "ny_neighbourhoods_shp") ny_neighbourhoods_shp
Warning: `spark_read_shapefile()` was deprecated in apache.sedona 1.7.1.
ℹ Please use `spark_read_source()` instead.
# Display a quick summary of the shapefile data
%>% glimpse() ny_neighbourhoods_shp
Rows: ??
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
Columns: 12
Database: spark_connection
$ geometry <arrw_bnr> <32, 00, 00, 00, db, 01, 00, 00, 00, 00, 90, fe, 67, 9…
$ BoroCode <chr> "3", "3", "3", "3", "3", "3", "3", "3", "3", "3", "3", "3",…
$ BoroName <chr> "Brooklyn", "Brooklyn", "Brooklyn", "Brooklyn", "Brooklyn",…
$ CountyFIPS <chr> "047", "047", "047", "047", "047", "047", "047", "047", "04…
$ NTA2020 <chr> "BK0101", "BK0102", "BK0103", "BK0104", "BK0201", "BK0202",…
$ NTAName <chr> "Greenpoint", "Williamsburg", "South Williamsburg", "East W…
$ NTAAbbrev <chr> "Grnpt", "Wllmsbrg", "SWllmsbrg", "EWllmsbrg", "BkHts", "Dw…
$ NTAType <chr> "0", "0", "0", "0", "0", "0", "0", "0", "6", "0", "0", "0",…
$ CDTA2020 <chr> "BK01", "BK01", "BK01", "BK01", "BK02", "BK02", "BK02", "BK…
$ CDTAName <chr> "BK01 Williamsburg-Greenpoint (CD 1 Equivalent)", "BK01 Wil…
$ Shape_Leng <chr> "2.89195621509e+04", "2.80980268049e+04", "1.82502803058e+0…
$ Shape_Area <chr> "3.53218095620e+07", "2.88543140796e+07", "1.52089606156e+0…
3.7 Associating neighbourhood with median household income
We now join the income and boundaries data using their common ID.
# Join the neighbourhood shapefile with the income data
<- ny_neighbourhoods_shp %>%
ny_neighbourhoods_shp left_join(nyc_nta_hh_income, by = c("NTA2020" = "GeoID"))
Now, we need to determine the relevant CRS that our shapefile uses. If it differs from EPSG:4326, we must convert it so that we can match it with the pickup and dropoff coordinates. I have not found a way to determine the CRS using Apache Sedona, so I use sf for that.
# Read the shapefile as an SF (Simple Features) object for geospatial operations
<- st_read(file.path(ny_neighs_pathfile, "nynta2020.shp")) ny_neighs_sf
Reading layer `nynta2020' from data source
`/Users/rodgersiradukunda/Library/CloudStorage/OneDrive-TheUniversityofLiverpool/geospatial_docker/sedona-tutorial/data/shapefiles/nynta2020_25a/nynta2020.shp'
using driver `ESRI Shapefile'
Simple feature collection with 262 features and 11 fields
Geometry type: MULTIPOLYGON
Dimension: XY
Bounding box: xmin: 913175.1 ymin: 120128.4 xmax: 1067383 ymax: 272844.3
Projected CRS: NAD83 / New York Long Island (ftUS)
st_crs(ny_neighs_sf)
Coordinate Reference System:
User input: NAD83 / New York Long Island (ftUS)
wkt:
PROJCRS["NAD83 / New York Long Island (ftUS)",
BASEGEOGCRS["NAD83",
DATUM["North American Datum 1983",
ELLIPSOID["GRS 1980",6378137,298.257222101,
LENGTHUNIT["metre",1]]],
PRIMEM["Greenwich",0,
ANGLEUNIT["degree",0.0174532925199433]],
ID["EPSG",4269]],
CONVERSION["SPCS83 New York Long Island zone (US survey foot)",
METHOD["Lambert Conic Conformal (2SP)",
ID["EPSG",9802]],
PARAMETER["Latitude of false origin",40.1666666666667,
ANGLEUNIT["degree",0.0174532925199433],
ID["EPSG",8821]],
PARAMETER["Longitude of false origin",-74,
ANGLEUNIT["degree",0.0174532925199433],
ID["EPSG",8822]],
PARAMETER["Latitude of 1st standard parallel",41.0333333333333,
ANGLEUNIT["degree",0.0174532925199433],
ID["EPSG",8823]],
PARAMETER["Latitude of 2nd standard parallel",40.6666666666667,
ANGLEUNIT["degree",0.0174532925199433],
ID["EPSG",8824]],
PARAMETER["Easting at false origin",984250,
LENGTHUNIT["US survey foot",0.304800609601219],
ID["EPSG",8826]],
PARAMETER["Northing at false origin",0,
LENGTHUNIT["US survey foot",0.304800609601219],
ID["EPSG",8827]]],
CS[Cartesian,2],
AXIS["easting (X)",east,
ORDER[1],
LENGTHUNIT["US survey foot",0.304800609601219]],
AXIS["northing (Y)",north,
ORDER[2],
LENGTHUNIT["US survey foot",0.304800609601219]],
USAGE[
SCOPE["Engineering survey, topographic mapping."],
AREA["United States (USA) - New York - counties of Bronx; Kings; Nassau; New York; Queens; Richmond; Suffolk."],
BBOX[40.47,-74.26,41.3,-71.8]],
ID["EPSG",2263]]
Knowing that it is EPSG:2263, we can now convert it to EPSG:4326.
# Reproject the geometries to a different coordinate reference system (CRS) for consistency
<- ny_neighbourhoods_shp %>%
ny_neighbourhoods_shp mutate(
geometry = st_transform(
geometry,"epsg:2263", # Source CRS
"epsg:4326", # Target CRS
F
)%>%
) select(
-c(
BoroCode,
CountyFIPS,
NTAAbbrev,
NTAType,
CDTA2020,
CDTAName,
Shape_Leng,
Shape_Area
) )
3.8 Joining locations data with neighbourhoods data
Because the boundaries data is very small (about 2 MB on disk), we can cache it in memory for faster access. Generally, you are encouraged to cache data that is less than 10 MB. We are also broadcasting the neighbourhoods data to improve performance. Broadcasting means that our data is shared in its entirety with every executor so it is not shuffled when joining. This reduces data transfer overhead and improves performance.
Even if we did not explicitly broadcast our data, it most likely would have been broadcasted automatically due to Adaptive Query Execution (AQE) since we enabled it at the start using the option config$spark.sql.adaptive.enabled <- "true"
. AQE finds the optimal way of conducting joins, and since our neighbourhoods data is minuscule, chances are that it would have been broadcasted to prevent unnecessary shuffling.
# Persist the neighbourhood shapefile in memory for faster access
<- sdf_broadcast(ny_neighbourhoods_shp)
ny_neighbourhoods_shp sdf_persist(ny_neighbourhoods_shp, storage.level = "MEMORY_ONLY")
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: table<`sparklyr_tmp_63b9982f_5ebd_4759_bf03_e16f60191923`> [?? x 5]
# Database: spark_connection
# ℹ more rows
# ℹ 5 more variables: geometry <arrw_bnr>, BoroName <chr>, NTA2020 <chr>,
# NTAName <chr>, MdHHIncE <int>
I have found that it is best to use Spatial SQL when conducting spatial joins or any other spatial analysis using Apache Sedona functions. To do this, we first need to register our dataframes as temporary SQL views. This will be our next step.
# Register the dataframes as temporary SQL views for querying
%>% sdf_register("locations_sdf_view") locations_sdf
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: table<`locations_sdf_view`> [?? x 4]
# Database: spark_connection
trip_id latitude longitude is_pickup
<dbl> <dbl> <dbl> <dbl>
1 40000037 40.7 -74.0 0
2 40000093 40.7 -74.0 0
3 40000016 40.8 -74.0 0
4 40000091 40.8 -74.0 0
5 40000100 40.8 -74.0 1
6 40000031 40.8 -74.0 1
7 40000044 40.8 -74.0 1
8 40000015 40.8 -74.0 1
9 40000005 40.7 -74.0 0
10 40000053 40.8 -74.0 0
# ℹ more rows
%>% sdf_register("ny_neighbourhoods_shp_view") ny_neighbourhoods_shp
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: table<`ny_neighbourhoods_shp_view`> [?? x 5]
# Database: spark_connection
# ℹ more rows
# ℹ 5 more variables: geometry <arrw_bnr>, BoroName <chr>, NTA2020 <chr>,
# NTAName <chr>, MdHHIncE <int>
Upon registration, we can now conduct a spatial join, asking Apache Sedona to find neighbourhoods that contain specific coordinates using the ST_Contains function. You can find documentation on all available Apache Sedona vector functions here.
# Perform a spatial join to associate each location (latitude, longitude) with the corresponding neighbourhood
<- sdf_sql(
locations_sdf_updated
sc,"
SELECT /*+ BROADCAST(b) */ a.*, b.*
FROM locations_sdf_view a
LEFT JOIN ny_neighbourhoods_shp_view b
ON ST_Contains(b.geometry, ST_Point(a.longitude, a.latitude))
"
)
3.9 Writing data to disk
Before saving our updated data, we remove the geometry column, as Delta Lake does not support geometry columns. Moreover, there is no need to keep it in the data, as we don’t plan on mapping the data just yet. If you need to write big data geometry files, consider using the GeoParquet format. You can do so using Spark’s spark_write_geoparquet
function or the spark_write_source
function with the mode set to “geoparquet”.
# Remove the geometry column from the final dataset for further analysis
<- locations_sdf_updated %>%
locations_sdf_updated_no_geom select(-c(geometry))
Our final data is as shown below. Not too bad for the few lines of code written.
# Print the updated data with all relevant fields (no geometry)
::with_options(
withrlist(pillar.sigfig = 6),
print(locations_sdf_updated_no_geom, n=10)
)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source: SQL [?? x 8]
# Database: spark_connection
trip_id latitude longitude is_pickup BoroName NTA2020 NTAName MdHHIncE
<dbl> <dbl> <dbl> <dbl> <chr> <chr> <chr> <int>
1 40000037 40.7113 -74.0107 0 Manhattan MN0101 Financial D… 195153
2 40000093 40.7401 -73.9860 0 Manhattan MN0602 Gramercy 155905
3 40000016 40.8148 -73.9553 0 Manhattan MN0902 Manhattanvi… 46367
4 40000091 40.7743 -73.9614 0 Manhattan MN0802 Upper East … 194910
5 40000100 40.7604 -73.9614 1 Manhattan MN0801 Upper East … 133349
6 40000031 40.7794 -73.9849 1 Manhattan MN0701 Upper West … 158165
7 40000044 40.7633 -73.9594 1 Manhattan MN0801 Upper East … 133349
8 40000015 40.7776 -73.9551 1 Manhattan MN0802 Upper East … 194910
9 40000005 40.7419 -73.9746 0 Manhattan MN0603 Murray Hill… 138337
10 40000053 40.7765 -73.9767 0 Manhattan MN0701 Upper West … 158165
# ℹ more rows
%>% glimpse() locations_sdf_updated_no_geom
Rows: ??
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
Columns: 8
Database: spark_connection
$ trip_id <dbl> 40000037, 40000093, 40000016, 40000091, 40000100, 40000031, …
$ latitude <dbl> 40.71130, 40.74007, 40.81481, 40.77428, 40.76038, 40.77941, …
$ longitude <dbl> -74.01067, -73.98603, -73.95528, -73.96144, -73.96136, -73.9…
$ is_pickup <dbl> 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, …
$ BoroName <chr> "Manhattan", "Manhattan", "Manhattan", "Manhattan", "Manhatt…
$ NTA2020 <chr> "MN0101", "MN0602", "MN0902", "MN0802", "MN0801", "MN0701", …
$ NTAName <chr> "Financial District-Battery Park City", "Gramercy", "Manhatt…
$ MdHHIncE <int> 195153, 155905, 46367, 194910, 133349, 158165, 133349, 19491…
We now write our data to Delta Lake format as usual.
# Define the file path for saving the updated dataset
<- file.path(getwd(), "data", "locations_sdf_updated_one")
save_locations_sdf_updated_one_filepath
# Save the updated dataset to Delta format
spark_write_delta(
locations_sdf_updated_no_geom,path = save_locations_sdf_updated_one_filepath
)
And disconnect our spark instance.
# Disconnect from the Spark session once done
spark_disconnect(sc)