3  Processing Vector Data with Apache Sedona and Sparklyr in R

Using Vector Data to Obtain Median Household Income in Pickup and Droppoff Neighbourhoods

3.1 Introduction

In this part, we are going to use the saved locations data to determine the pickup and dropoff neighbourhoods associated with each trip. We shall then use these additional data to obtain the median household incomes of the pickup and dropoff neighbourhoods. We use income as a proxy for affluence. Humour me by assuming that there is a relationship between the duration of a taxi trip and the affluence of either or both the pickup or dropoff locations.

We shall introduce two new datasets: NYC Neighbourhood Tabulation Areas (NTAs) boundaries based on the 2020 census, and NTA household median income based on the 2022 American Community Survey (ACS). The boundaries data is in vector format, while the income data is in CSV format (originally .xlsx but filtered for GeoID and MdHHIncE columns, then saved as CSV). To do this, we shall use Apache Sedona (v1.7.1) to merge the NTA boundaries and income data. We will then perform a spatial join on the coordinates with the boundaries, determining the pickup and dropoff neighbourhoods.

By the way, this will be a new file and not the same as the one used in Chapter 2. When working in local mode, I have found that it is more feasible to separate preprocessing into multiple stages. Running too many transformations at once is likely to result in out-of-memory errors and take a very long time to complete.

3.2 Installing and loading packages

install.packages("apache.sedona")
# Load necessary libraries for Spark, geospatial data, and data manipulation
library(arrow)
library(sparklyr)
library(sf)
library(dplyr)

3.3 Configuring Spark

# Define the Spark directory for temporary files
spark_dir <- file.path(getwd(), "data", "spark")

The configuration will mostly be kept similar to the one in the first file. The main difference is that some of the Delta configurations are explicitly included and not added as a package. This is because Delta and Apache Sedona clash when Delta is installed as a package when creating the Spark context.

You will notice a few differences in how the Spark context is set up this time around. Be sure to use this type of setup when working with Apache Sedona and reading or writing to Delta Lake.

# Create an empty list for Spark configuration settings
config <- list()

# Set Spark configurations for memory and performance optimisation

# Configure some delta specific options
config$spark.sql.extensions <- "io.delta.sql.DeltaSparkSessionExtension"
config$spark.sql.catalog.spark_catalog <- "org.apache.spark.sql.delta.catalog.DeltaCatalog"

# Use KryoSerializer for better performance
config$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"  

# Set temporary directory for Spark
config$`sparklyr.shell.driver-java-options` <- paste0("-Djava.io.tmpdir=", spark_dir)  

# Use compressed Oops for JVM performance
config$`sparklyr.shell.driver-java-options` <- "-XX:+UseCompressedOops"  

# Allocate 8GB of memory for the Spark driver
config$`sparklyr.shell.driver-memory` <- '10G'  

# Set fraction of heap memory used for Spark storage
config$spark.memory.fraction <- 0.7  

# Set shuffle partitions (local setting based on workload)
config$spark.sql.shuffle.partitions.local <- 24  

# Set extra memory for driver
config$spark.driver.extraJavaOptions <- "-Xmx1G"  

# Enable off-heap memory usage
config$spark.memory.offHeap.enabled <- "true" 

# Set 4GB for off-heap memory
config$spark.memory.offHeap.size <- "2g"  

# Disable shuffle spill to disk
config$spark.sql.shuffle.spill <- "false"  

# Periodic garbage collection interval
config$spark.cleaner.periodicGC.interval <- "60s"  

# Set max partition size for shuffle files
config$spark.sql.files.maxPartitionBytes <- "200m"  

# Enable adaptive query execution
config$spark.sql.adaptive.enabled <- "true" 

3.4 Instantiating spark context

As you can see, when initiating our Spark context, we explicitly include files associated with Delta and Apache Sedona. By doing so, Apache Sedona and Delta packages do not clash with each other.

# Connect to Spark with the defined configuration and additional packages for geospatial processing
sc <- spark_connect(
  master = "local[*]",
  config = config,
  packages = c(
    "io.delta:delta-spark_2.12:3.3.0",
    "org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.0",
    "org.datasyslab:geotools-wrapper:1.7.0-28.5"
  )
)

3.5 Loading Apache Sedona

It is only after initializing our Spark context that we can now load the Apache Sedona library and initialize its context. We are now ready to get started!

library(apache.sedona)
invoke_static(
  sc,
  "org.apache.sedona.spark.SedonaContext",
  "create",
  spark_session(sc),
  "r"
)
<jobj[23]>
  org.apache.spark.sql.SparkSession
  org.apache.spark.sql.SparkSession@30f8754c
# Launch Spark web UI to monitor the Spark session
spark_web(sc)

3.6 Loading datasets

3.6.1 Locations data

We now read our location data, which is thankfully in Delta Lake format.

# Define the folder containing location data (latitude and longitude of yellow cabs)
locations_sdf_parent_folder <- file.path(getwd(), "data", "locations_sdf")
locations_sdf <- spark_read_delta(
  sc, 
  path = locations_sdf_parent_folder, 
  name = "locations_sdf"
  ) %>% 
    sdf_repartition(24)

print(locations_sdf, width=Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   table<`sparklyr_tmp_4c473781_95e4_4f97_8ceb_29e438553126`> [?? x 4]
# Database: spark_connection
    trip_id latitude longitude is_pickup
      <dbl>    <dbl>     <dbl>     <dbl>
 1 10626731     40.8     -74.0         1
 2  8962259     40.7     -74.0         1
 3  8129828     40.8     -74.0         1
 4  7977508     40.8     -74.0         1
 5  8775269     40.8     -73.9         1
 6  7608894     40.8     -74.0         1
 7  7466839     40.8     -74.0         1
 8  3762150     40.8     -74.0         1
 9 12333790     40.7     -74.0         1
10   562063     40.7     -74.0         1
# ℹ more rows

The data contains nearly 94 million rows!

# Print the number of rows in the locations SDF (Spark DataFrame)
sdf_nrow(locations_sdf)
[1] 93889664

And is partitioned equally for optimised wide transformations, especially joins. Wide transformations are those that require data shuffling (exchange) between multiple executors such as aggregations and joins. Meanwhile, narrow transformations do not require any exchange of data. Examples of narrow transformations include select and filter.

locations_sdf %>% sdf_partition_sizes()
   partition_index partition_size
1                0        3912070
2                1        3912069
3                2        3912070
4                3        3912070
5                4        3912070
6                5        3912071
7                6        3912070
8                7        3912070
9                8        3912070
10               9        3912070
11              10        3912070
12              11        3912070
13              12        3912069
14              13        3912069
15              14        3912069
16              15        3912069
17              16        3912069
18              17        3912068
19              18        3912068
20              19        3912068
21              20        3912068
22              21        3912069
23              22        3912069
24              23        3912069

3.6.2 Median household income by neighbourhood data

We now load the average household income by neighbourhood in NYC.

# Load income data (household income by NYC neighbourhood)
nyc_nta_hh_income_file_path <- file.path(getwd(), "data", "nyc_nta_med_inc", "nyc_nta_med_inc.csv")
nyc_nta_hh_income <- spark_read_csv(sc, path = nyc_nta_hh_income_file_path, name = "nyc_nta_hh_income")
# Display the income data
print(nyc_nta_hh_income, width = Inf, n=10)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   table<`nyc_nta_hh_income`> [?? x 2]
# Database: spark_connection
   GeoID  MdHHIncE
   <chr>     <int>
 1 BK0101   125469
 2 BK0102   129838
 3 BK0103    36951
 4 BK0104    71107
 5 BK0201   179877
 6 BK0202   149181
 7 BK0203   107633
 8 BK0204   115160
 9 BK0301    76660
10 BK0302    69954
# ℹ more rows

3.6.3 NYC neighbourhoods data

We also load the shapefile using Apache Sedona. Note that we point Sedona to the entire folder and not just the specific .shp file, as is the case when reading shapefiles via sf.

# Load the shapefile for NYC neighbourhoods
ny_neighs_pathfile <- file.path(getwd(), "data", "shapefiles", "nynta2020_25a")
ny_neighbourhoods_shp <- spark_read_shapefile(sc, path = ny_neighs_pathfile, name = "ny_neighbourhoods_shp")
Warning: `spark_read_shapefile()` was deprecated in apache.sedona 1.7.1.
ℹ Please use `spark_read_source()` instead.
# Display a quick summary of the shapefile data
ny_neighbourhoods_shp %>% glimpse()
Rows: ??
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
Columns: 12
Database: spark_connection
$ geometry   <arrw_bnr> <32, 00, 00, 00, db, 01, 00, 00, 00, 00, 90, fe, 67, 9…
$ BoroCode   <chr> "3", "3", "3", "3", "3", "3", "3", "3", "3", "3", "3", "3",…
$ BoroName   <chr> "Brooklyn", "Brooklyn", "Brooklyn", "Brooklyn", "Brooklyn",…
$ CountyFIPS <chr> "047", "047", "047", "047", "047", "047", "047", "047", "04…
$ NTA2020    <chr> "BK0101", "BK0102", "BK0103", "BK0104", "BK0201", "BK0202",…
$ NTAName    <chr> "Greenpoint", "Williamsburg", "South Williamsburg", "East W…
$ NTAAbbrev  <chr> "Grnpt", "Wllmsbrg", "SWllmsbrg", "EWllmsbrg", "BkHts", "Dw…
$ NTAType    <chr> "0", "0", "0", "0", "0", "0", "0", "0", "6", "0", "0", "0",…
$ CDTA2020   <chr> "BK01", "BK01", "BK01", "BK01", "BK02", "BK02", "BK02", "BK…
$ CDTAName   <chr> "BK01 Williamsburg-Greenpoint (CD 1 Equivalent)", "BK01 Wil…
$ Shape_Leng <chr> "2.89195621509e+04", "2.80980268049e+04", "1.82502803058e+0…
$ Shape_Area <chr> "3.53218095620e+07", "2.88543140796e+07", "1.52089606156e+0…

3.7 Associating neighbourhood with median household income

We now join the income and boundaries data using their common ID.

# Join the neighbourhood shapefile with the income data
ny_neighbourhoods_shp <- ny_neighbourhoods_shp %>%
  left_join(nyc_nta_hh_income, by = c("NTA2020" = "GeoID"))

Now, we need to determine the relevant CRS that our shapefile uses. If it differs from EPSG:4326, we must convert it so that we can match it with the pickup and dropoff coordinates. I have not found a way to determine the CRS using Apache Sedona, so I use sf for that.

# Read the shapefile as an SF (Simple Features) object for geospatial operations
ny_neighs_sf <- st_read(file.path(ny_neighs_pathfile, "nynta2020.shp"))
Reading layer `nynta2020' from data source 
  `/Users/rodgersiradukunda/Library/CloudStorage/OneDrive-TheUniversityofLiverpool/geospatial_docker/sedona-tutorial/data/shapefiles/nynta2020_25a/nynta2020.shp' 
  using driver `ESRI Shapefile'
Simple feature collection with 262 features and 11 fields
Geometry type: MULTIPOLYGON
Dimension:     XY
Bounding box:  xmin: 913175.1 ymin: 120128.4 xmax: 1067383 ymax: 272844.3
Projected CRS: NAD83 / New York Long Island (ftUS)
st_crs(ny_neighs_sf)
Coordinate Reference System:
  User input: NAD83 / New York Long Island (ftUS) 
  wkt:
PROJCRS["NAD83 / New York Long Island (ftUS)",
    BASEGEOGCRS["NAD83",
        DATUM["North American Datum 1983",
            ELLIPSOID["GRS 1980",6378137,298.257222101,
                LENGTHUNIT["metre",1]]],
        PRIMEM["Greenwich",0,
            ANGLEUNIT["degree",0.0174532925199433]],
        ID["EPSG",4269]],
    CONVERSION["SPCS83 New York Long Island zone (US survey foot)",
        METHOD["Lambert Conic Conformal (2SP)",
            ID["EPSG",9802]],
        PARAMETER["Latitude of false origin",40.1666666666667,
            ANGLEUNIT["degree",0.0174532925199433],
            ID["EPSG",8821]],
        PARAMETER["Longitude of false origin",-74,
            ANGLEUNIT["degree",0.0174532925199433],
            ID["EPSG",8822]],
        PARAMETER["Latitude of 1st standard parallel",41.0333333333333,
            ANGLEUNIT["degree",0.0174532925199433],
            ID["EPSG",8823]],
        PARAMETER["Latitude of 2nd standard parallel",40.6666666666667,
            ANGLEUNIT["degree",0.0174532925199433],
            ID["EPSG",8824]],
        PARAMETER["Easting at false origin",984250,
            LENGTHUNIT["US survey foot",0.304800609601219],
            ID["EPSG",8826]],
        PARAMETER["Northing at false origin",0,
            LENGTHUNIT["US survey foot",0.304800609601219],
            ID["EPSG",8827]]],
    CS[Cartesian,2],
        AXIS["easting (X)",east,
            ORDER[1],
            LENGTHUNIT["US survey foot",0.304800609601219]],
        AXIS["northing (Y)",north,
            ORDER[2],
            LENGTHUNIT["US survey foot",0.304800609601219]],
    USAGE[
        SCOPE["Engineering survey, topographic mapping."],
        AREA["United States (USA) - New York - counties of Bronx; Kings; Nassau; New York; Queens; Richmond; Suffolk."],
        BBOX[40.47,-74.26,41.3,-71.8]],
    ID["EPSG",2263]]

Knowing that it is EPSG:2263, we can now convert it to EPSG:4326.

# Reproject the geometries to a different coordinate reference system (CRS) for consistency
ny_neighbourhoods_shp <- ny_neighbourhoods_shp %>%
  mutate(
    geometry = st_transform(
      geometry,
      "epsg:2263",  # Source CRS
      "epsg:4326",  # Target CRS
      F
    )
  ) %>%
  select(
    -c(
      BoroCode,
      CountyFIPS,
      NTAAbbrev,
      NTAType,
      CDTA2020,
      CDTAName,
      Shape_Leng,
      Shape_Area
    )
  )

3.8 Joining locations data with neighbourhoods data

Because the boundaries data is very small (about 2 MB on disk), we can cache it in memory for faster access. Generally, you are encouraged to cache data that is less than 10 MB. We are also broadcasting the neighbourhoods data to improve performance. Broadcasting means that our data is shared in its entirety with every executor so it is not shuffled when joining. This reduces data transfer overhead and improves performance.

Even if we did not explicitly broadcast our data, it most likely would have been broadcasted automatically due to Adaptive Query Execution (AQE) since we enabled it at the start using the option config$spark.sql.adaptive.enabled <- "true". AQE finds the optimal way of conducting joins, and since our neighbourhoods data is minuscule, chances are that it would have been broadcasted to prevent unnecessary shuffling.

# Persist the neighbourhood shapefile in memory for faster access
ny_neighbourhoods_shp <- sdf_broadcast(ny_neighbourhoods_shp)
sdf_persist(ny_neighbourhoods_shp, storage.level = "MEMORY_ONLY")
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   table<`sparklyr_tmp_63b9982f_5ebd_4759_bf03_e16f60191923`> [?? x 5]
# Database: spark_connection
# ℹ more rows
# ℹ 5 more variables: geometry <arrw_bnr>, BoroName <chr>, NTA2020 <chr>,
#   NTAName <chr>, MdHHIncE <int>

I have found that it is best to use Spatial SQL when conducting spatial joins or any other spatial analysis using Apache Sedona functions. To do this, we first need to register our dataframes as temporary SQL views. This will be our next step.

# Register the dataframes as temporary SQL views for querying
locations_sdf %>% sdf_register("locations_sdf_view")
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   table<`locations_sdf_view`> [?? x 4]
# Database: spark_connection
    trip_id latitude longitude is_pickup
      <dbl>    <dbl>     <dbl>     <dbl>
 1 40000037     40.7     -74.0         0
 2 40000093     40.7     -74.0         0
 3 40000016     40.8     -74.0         0
 4 40000091     40.8     -74.0         0
 5 40000100     40.8     -74.0         1
 6 40000031     40.8     -74.0         1
 7 40000044     40.8     -74.0         1
 8 40000015     40.8     -74.0         1
 9 40000005     40.7     -74.0         0
10 40000053     40.8     -74.0         0
# ℹ more rows
ny_neighbourhoods_shp %>% sdf_register("ny_neighbourhoods_shp_view")
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   table<`ny_neighbourhoods_shp_view`> [?? x 5]
# Database: spark_connection
# ℹ more rows
# ℹ 5 more variables: geometry <arrw_bnr>, BoroName <chr>, NTA2020 <chr>,
#   NTAName <chr>, MdHHIncE <int>

Upon registration, we can now conduct a spatial join, asking Apache Sedona to find neighbourhoods that contain specific coordinates using the ST_Contains function. You can find documentation on all available Apache Sedona vector functions here.

# Perform a spatial join to associate each location (latitude, longitude) with the corresponding neighbourhood
locations_sdf_updated <- sdf_sql(
  sc,
  "
  SELECT /*+ BROADCAST(b) */ a.*, b.*
  FROM locations_sdf_view a
  LEFT JOIN ny_neighbourhoods_shp_view b
    ON ST_Contains(b.geometry, ST_Point(a.longitude, a.latitude))
  "
)

3.9 Writing data to disk

Before saving our updated data, we remove the geometry column, as Delta Lake does not support geometry columns. Moreover, there is no need to keep it in the data, as we don’t plan on mapping the data just yet. If you need to write big data geometry files, consider using the GeoParquet format. You can do so using Spark’s spark_write_geoparquet function or the spark_write_source function with the mode set to “geoparquet”.

# Remove the geometry column from the final dataset for further analysis
locations_sdf_updated_no_geom <- locations_sdf_updated %>%
  select(-c(geometry))

Our final data is as shown below. Not too bad for the few lines of code written.

# Print the updated data with all relevant fields (no geometry)
withr::with_options(
  list(pillar.sigfig = 6),
  print(locations_sdf_updated_no_geom, n=10)
)
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
# Source:   SQL [?? x 8]
# Database: spark_connection
    trip_id latitude longitude is_pickup BoroName  NTA2020 NTAName      MdHHIncE
      <dbl>    <dbl>     <dbl>     <dbl> <chr>     <chr>   <chr>           <int>
 1 40000037  40.7113  -74.0107         0 Manhattan MN0101  Financial D…   195153
 2 40000093  40.7401  -73.9860         0 Manhattan MN0602  Gramercy       155905
 3 40000016  40.8148  -73.9553         0 Manhattan MN0902  Manhattanvi…    46367
 4 40000091  40.7743  -73.9614         0 Manhattan MN0802  Upper East …   194910
 5 40000100  40.7604  -73.9614         1 Manhattan MN0801  Upper East …   133349
 6 40000031  40.7794  -73.9849         1 Manhattan MN0701  Upper West …   158165
 7 40000044  40.7633  -73.9594         1 Manhattan MN0801  Upper East …   133349
 8 40000015  40.7776  -73.9551         1 Manhattan MN0802  Upper East …   194910
 9 40000005  40.7419  -73.9746         0 Manhattan MN0603  Murray Hill…   138337
10 40000053  40.7765  -73.9767         0 Manhattan MN0701  Upper West …   158165
# ℹ more rows
locations_sdf_updated_no_geom %>% glimpse()
Rows: ??
Warning in arrow_collect(object, ...): NAs introduced by coercion to integer
range
Columns: 8
Database: spark_connection
$ trip_id   <dbl> 40000037, 40000093, 40000016, 40000091, 40000100, 40000031, …
$ latitude  <dbl> 40.71130, 40.74007, 40.81481, 40.77428, 40.76038, 40.77941, …
$ longitude <dbl> -74.01067, -73.98603, -73.95528, -73.96144, -73.96136, -73.9…
$ is_pickup <dbl> 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, …
$ BoroName  <chr> "Manhattan", "Manhattan", "Manhattan", "Manhattan", "Manhatt…
$ NTA2020   <chr> "MN0101", "MN0602", "MN0902", "MN0802", "MN0801", "MN0701", …
$ NTAName   <chr> "Financial District-Battery Park City", "Gramercy", "Manhatt…
$ MdHHIncE  <int> 195153, 155905, 46367, 194910, 133349, 158165, 133349, 19491…

We now write our data to Delta Lake format as usual.

# Define the file path for saving the updated dataset
save_locations_sdf_updated_one_filepath <- file.path(getwd(), "data", "locations_sdf_updated_one")

# Save the updated dataset to Delta format
spark_write_delta(
  locations_sdf_updated_no_geom,
  path = save_locations_sdf_updated_one_filepath
)

And disconnect our spark instance.

# Disconnect from the Spark session once done
spark_disconnect(sc)