Introduction

This project was a collaboration between Master’s students in the Harvard Institute for Applied Computational Science and Zoba, Inc., a startup currently operating out of the Harvard Innovation Lab who is using machine learning to make predictions about criminal risk, public health outcomes, and other events inside American cities and around the globe.

Students:

Learn more about Zoba:

Project Guidelines: http://iacs-courses.seas.harvard.edu/courses/cs205/projects.html


The Project

Business Situation

Zoba is a startup currently within the Harvard Innovation Lab. Its goal is to model distributions of events in a city, conditioned on the time of year and the presence of geospatial features such as restaurants, police stations, places of worship, hospitals, and more. These events could be related to various crimes, matters of public health, and other applications. To achieve its client and revenue growth goals, Zoba wanted to expand both the number of cities and features per city that it used to train its machine learning models.

The Problem

Zoba’s product is faced with a combination of compute- and data-intensive tasks. The company’s machine learning models, currently at the city-scale, rely on a substantial amount of data drawn from a variety of sources, including geospatial databases and city government databases on crime. For instance, a dataset of crimes in New York City over a four-year period reaches 1.2 GB; whereas, a New York City dataset on locations of businesses, restaurants, police stations, and more only amounts to 1.3 MB, yet a crucial operation (Kernel Density Estimation, detailed later) done on this data takes the most time of any phase in the preprocessing.

Preprocessing the data that Zoba relies on required a significant amount of time (more than 25 minutes on a typical node) as is, and increasing the amount of data used later on would potentially make model experimentation prohibitively costly. As such, Zoba is concerned about their capacity to handle increasing data volumes and more complex client requests.

After speaking with founders of the company at a career event at the Innovation lab, our team decided to reach out to Zoba with the following project: Could we deliver material performance improvements in Zoba’s existing, entirely serial code by finding opportunities for parallelism, based on techniques and technologies discussed in CS 205? This website details our collaboration for this project.

Project Goals

We had two goals going into this project:

1) Examine Zoba’s existing data pipeline and use parallelization tools to enable performance improvements

2) Prototype new tools that enable Zoba to process more data than before


Existing Model and Data

Data

Zoba relies on two main sources of data:

1) Public crime data provided by city governments. This is either retrieved online via an API, or saved to disk in a one-time request. For simplicity, we assumed the latter situation. We imagine that as Zoba’s infrastructure improves, the former solution will become more viable.

2) Data on locations of roads, shops, public transit and other amenities queried from the public OpenStreetMap (OSM) API and downloaded as XML files. The data is processed to reduce it to a list of coordinates with associated type of amenity.

Code Base & Architecture Characteristics

Thanks to our colleagues at Zoba, we were given access to the portion of their existing codebase that handled all data preprocessing. We found that this existing code was serial and written entirely in Python. The rough structure of this code is described in the below phases for a given city:

Phase 1: Create a grid of latitude-longitude cells over a predefined boundary for the city, keeping track of the bounding box and center point for each cell, and save.

Phase 2: Construct geospatial histograms of crime occurrances over the cells, and save.

Phase 3: Construct KDEs for each type of amenity in the OSM data, and save.

Phase 4: Consolidate the results of (2) and (3) above into a single file.

The code relies on reliable and popular packages such as Pandas and SciPy for data manipulation and statistical functions. We hoped to explore alternative technologies that might be more amenable to much larger scales of data.


Finding Parallelism Candidates

Methodology

  1. Identify bottlenecks: in what sections of the application is significantly more time spent? What are the related functions?
  2. Determine whether they can be parallelized.
  3. Find appropriate tools to parallelize them.

An underlying theme to the above methodology is the idea of partitioning (decomposing) Zoba’s preprocessing routine: how can we segment their preprocessing routine in terms of:

How to Identify Bottlenecks?


cProfile Results

Image 

Image 

Where Did all the Time Go?

These results give us a good overview of methods taking a lot of time, but where exactly in the codebase are these methods being called?


line_profiler Results

    Total time: 690.049 s
    File: /home/ubuntu/Pipeline/zoba_util_ns_v2.py
    Function: build_kde_osm_data at line 412
    
    Line #      Hits         Time  Per Hit   % Time  Line Contents
    ==============================================================
       412                                           @profile
       413                                           def build_kde_osm_data(new_df, osm_df, kde_cutoff):
       414         3      62888.0  20962.7      0.0      lat_step, lon_step = get_lat_lon_step(new_df)
       415         3       1278.0    426.0      0.0      feature_cols = new_df.columns.drop(['lat','lon','grid_id'])
       416         3          6.0      2.0      0.0      kde_cols = []
       417       221        441.0      2.0      0.0      for col in feature_cols: # perform KDE on substantive features
       418       218    1999926.0   9174.0      0.3          sub_df = osm_df[osm_df['indicator'] == col]
       419       218       1318.0      6.0      0.0          if len(sub_df) >= kde_cutoff:
       420        65       2976.0     45.8      0.0              print('calculating...',col,'KDE')
       421        65      19166.0    294.9      0.0              values = np.array(sub_df['lat']), np.array(sub_df['lon'])
       422        65      53273.0    819.6      0.0              kernel = sp.gaussian_kde(values)
       423        65  687899489.0 10583069.1     99.7              new_df[col] = generate_kde(new_df, kernel, lat_step, lon_step)
       424        65        192.0      3.0      0.0              kde_cols.append(col)
       425         3       8082.0   2694.0      0.0      return new_df[['lat','lon','grid_id'] + kde_cols]

Now let’s go one step further:

  Total time: 678.369 s
  File: /home/ubuntu/Pipeline/zoba_util_ns_v2.py
  Function: generate_kde at line 428

  Line #      Hits         Time  Per Hit   % Time  Line Contents
  ==============================================================
   428                                           @profile
   429                                           def generate_kde(df, kernel, lat_step, lon_step):
   430        65         88.0      1.4      0.0      num_samples = 10
   431        65        228.0      3.5      0.0      grid_densities, random_samples = [], np.zeros([num_samples, 2]) # set placeholder
   432   1688270  153286625.0     90.8     22.6      for i, row in df.iterrows():
   433   1688205   94257454.0     55.8     13.9          lat, lon = row['lat'], row['lon']
   435   1688205  427225616.0    253.1     63.0          val = kernel.evaluate([lat, lon])[0]
   436   1688205    3598949.0      2.1      0.5          grid_densities.append(val)
   437        65         80.0      1.2      0.0      return grid_densities

Highlights

line_profiler enables us to continue drilling down until we find the exact lines impairing solution times. With the above example, first we see that generating KDEs take a significant amount of time.

new_df[col] = generate_kde(new_df, kernel, lat_step, lon_step)`

We continue stepping into the function in order to understand where the time is going:

val = kernel.evaluate([lat, lon])[0]

Parallelization Techniques

From our profiling results obtained using cProfile and line_by_line, we concluded that there were two primary sections and ways to parallelize our code, as follows.

Loops (via multiprocessing module)

A significant amount of time was spent in various preprocessing loops, leading to:

Question: Is there a dependence between different iterations of the loop?

Answer: No. These loops lent themselves well to worksharing constructs: splitting up the existing problem into multiple parallel units of execution and assigning the work to individual processors.

Process-based parallelism with multiprocessing module

In Python, running multiple threads is effectively not possible because the global interpreter lock (GIL) enforces a shared memory space in the most common Python implementation: CPython. Python processes, however, have separate memory and are (can be) assigned their own CPU, allowing us to do multiprocessing around the GIL.

Image 

Stage 1: Preprocessing Each City as its Own Process

For example, in the following code snippet we initiate a process pool and assign the task of preprocessing features for an individual city to its own multiprocessing.Process object, then continue on with the serial code once the parallel city preprocessing is complete.

def setup_city_pipeline():
    # init Pool
    num_processors =  mp.cpu_count()
    pool = NonDaemonPool(processes = min(num_processors, len(CITIES)))

    # run city_pipeline for each city
    pool.starmap(city_pipeline, [(city,) for city in CITIES])
    
    core_benchmark_phase()  # needs to be serial

def city_pipeline(city):
    phase_1(city)
    phase_2(city)
    phase_3(city)
    ...

Pool.starmap (and Pool.map) lock the calling program until all the constituent processes are finished. This works fine for us because we need to preprocess each city before continuing to the next stage.

Stage 2: Parallelizing Phases within Cities

Next, we introduced an additional layer of process separation by creating a Process for phases 2 and 3 within city_pipeline(), since these do not have to be performed in order:

def city_pipeline (city):
    phase_1(city)

    """Now let's run each phase of the pipeline as its own process."""
    proc_1 = NonDaemonProcess(target=phase_2, args=(city,))
    proc_2 = NonDaemonProcess(target=phase_3, args=(city,))

    proc_1.start()
    proc_2.start()

    # Hang until above two processes are finished.
    while proc_1.is_alive() or proc_2.is_alive():
        continue
    ...

Additional Considerations

Circumventing the Daemon

When instantiating a pool using the multiprocessing.Pool wrapper function, the underlying Process objects are initialized as daemonic processes (Process.daemon = True). This essentially prevents a Process from creating its own child Process, which would prevent us from parallelizing phases within cities. To bypass this, we can create our own Pool subclass where the constituent Process objects are non-daemonic by ensuring their daemon attribute always returns False.

class NonDaemonProcess(Process):
    """Subclass for `Process` where `daemon` attribute always False."""

    @property
    def daemon (self):
        return False

    @daemon.setter
    def daemon (self, value):
        pass

class NonDaemonPool(Pool):
    Process = NonDaemonProcess

Scaling Efficiency

Our solution exhibits strong scaling but only to a limited degree: the solution time improves with a growing number of CPUs up to the number of cities whose data we need to preprocess but not beyond that. Once Zoba increases the number of cities they incorporate into their models past the number of available CPUs, they can improve solution time by adding CPUs.

Maximum Speedup

To determine what the maximum speedup is for parallelizing between cities, we first identified all the functions using for loops to go through each city, because these can be made parallel:

Function Seconds
geo_phase 692
crime_phase 181

Together, they take up approximately 14.6 minutes. Using Amdahl’s Law, we can calculate a (highly) theoretical maximum speedup S achievable using Zoba’s 16 processors:

Image 

However, Zoba’s preprocessing routine is not one you can partition into arbitrary chunks. The data is most logically and computationally efficiently partitioned by city, and an N in excess of the number of cities is not very useful. Nonetheless, the fact that the majority (58%) of Zoba’s preprocessing routine is parallelizable bodes well for these and future parallelization efforts.

Limits and Overhead

The above theoretical speedup analysis was naive in that it implied the workload by city is relatively equal. This is not the case, as larger cities (or cities with more crime) tend to require greater preprocessing times. The preprocessing time for Philadelphia, for example, is much faster than it is for Chicago. Therefore, our parallelize-by-city solution is limited by the amount of time it takes the most computationally-intensive city to preprocess, i.e., our solution suffers from load imbalance.

The overhead is minimal in parallelizing the pipeline for cities because each city’s Process can preprocess data for that city without communicating with the other Processes. Any overhead predominantly comes from spawning the Process objects, which in our solution is only done once with (presumably) little impact on the overall execution time. However, this may make a bigger difference when we parallelize between phases (in addition to cities).

KDEs (via pyspark)

Kernel Density Estimation (KDE) is a common method of estimating a (usually unknown) probability distribution over a set of points, given a sample of n points from that distribution. Zoba uses the SciPy implementation of the Gaussian KDE method to approximate the geographic (latitude-longitude) distribution of various kinds of features of a city, such as schools or restaurants. This data is then used to make predictions on events related to crime, public health, and more. Computing KDEs for many points at a time can be time-intensive; indeed, we observed that between all of the phases described in the above section Existing Model and Data, Phase 2 consistently took the longest of all. However, as we will see below, the problem is embarassingly parallel.

The general formula to evaluate the kernel density function at a given point x is
Image 

where in the above, the summation is done over all points x i in the data drawn from the unknown distribution. As we will discuss later, this feature of the KDE computation will allow us to very conveniently parallelize our code.

The function KH accepts the vector specified in the earlier summation and is of the form Image 

Here, the function K is known as the kernel function, and must be a symmetric probability density function over some dimension. H is the so-called “bandwidth” matrix, and is of dimension dxd, where d is the dimensionality of the data. The choice for H depends on the desired properties of the KDE, such as smoothness.

It is usually the case that the choice of K is non-crucial to the performance. Therefore, one common choice for K is the multivariate Gaussian distribution. When this is applied to KH, we obtain the following:

Image 

In this context, H assumes the value of the covariance matrix of the data, which in this context is computed from the longitudes and latitudes of the data. This matrix is scaled by a “bandwidth” parameter h. Following the default option for h in the Scipy implementation of the Gaussian KDE, we use Scott’s rule: h = n -1/(d+4) . Here, n is the number of data points and d is the dimensionality of the data.

Having summarized the computation of the KDE, we now discuss its usage by Zoba and how it can be parallelized. The KDE computation can be summarized by the following pseudocode:

kde_values = {}
for feature in features:
    for grid_point in city_grid:
        data = OSM_DATA[feature]
        kde_values[feature].append( sp.gaussian_kde(data).evaluate(grid_point) )

In short, the code (1) forms a subset of the data from OSM (essentially, a list of features such as schools, restaurants, and bars, along with their coordinates) feature by feature, and (2) evaluates the kernel function for that feature at each point in the longitude-latitude grid of points formed in Phase 1 from Existing Model and Data, the basic idea being illustrated below for a portion of New York. As Zoba specifies each cell to correspond to 250x250 m2 in reality, this grid can reach several tens of thousands of points in size.

Image 

We refer the reader to the earlier-mentioned SciPy documentation and a helpful guide here for more detailed theoretical information on the KDE.

PySpark Code

The two factors above led us to develop a multivariate Gaussian KDE function using the pyspark module, which is the Python API for Spark. Spark is useful here because we are applying a relatively straightforward operation without interprocess dependencies on a collection of many data points (nearly 40k grid points for New York), and its data structures allow for fast and easy statistical computations such as the covariance matrix, on which the earlier-mentioned matrix H depends.

We will now discuss the actual implementation of the KDE function in PySpark. It is evident from the earlier discussion of the KDE that every evaluation of the KDE function on a grid point is independent of the others. Therefore, these can be done in parallel, and Spark makes this very convenient to operate on different chunks of our grid through the map() function. A basic pseudocode version of the PySpark KDE function is below:

osm_data = sqlContext.read.csv("osm_data.csv") # read in OSM data into a Spark DataFrame
osm_pandas = pandas.read_csv("osm_data.csv")
city_grid = sqlContext.read.csv("grid.csv") # read in city grid points

results = []
for feature in features:
    data = osm_data.filter(osm_data.feature == feature)
    h = n**(-1./(d + 4))
    H = h**2 * covariance_matrix(data)
    results.append(city_grid.map(lambda x: kde_function(osm_pandas, x, H)))

We note that we used two copies of our OSM data - one read in as a Spark dataframe, and the other read in as a Pandas dataframe. The reason for this is that Spark prevents access to a (by default, distributed) dataframe when we are performing a map over another dataframe without some sort of serialization. Indeed, this would be the case if we map over the city_grid dataframe, and then reference the osm_data variable from kde_function. We made a design decision to utilize a Pandas dataframe for this portion, but keep the Spark dataframe version for speedy computation of the covariance matrix, the computation of which can get expensive. For instance, the New York City OSM data we had access to had 36,113 rows to consider across all types of features of the city. Doing this allowed us to utilize some useful and mature Pandas features (such as the apply function) that would be more tedious to use or implement with Spark dataframes.

One consideration of note is that the grid file is not particularly large; we noticed that as a result, Spark distributed the file among fewer partitions than were available. This caused us some initial confusion, as we were not observing speedups proportional to added resources as we expected. However, we realized that the RDD had to be repartitioned in order for us to utilize all available partitions, and therefore maximize the number of operations being done in parallel. This was crucial as the KDE is compute-intensive, and so the more work we can do on different sections of the grid at once, the more time we would save.

Finally, we wrote a script to validate the results of our Spark KDE against the SciPy Gaussian KDE. This was provided to the instructors to validate our results. We showed that on a toy dataset, the Spark KDE consistently generated results at randomly generated points within close to 10e-15 of the SciPy results at those same points.

The section to follow contains a performance comparison between the original SciPy-based KDE code, and the PySpark based one.


Infrastructure and Code Dependencies

We were given access to Zoba’s current data processing code through an Amazon Web Services (AWS) g3.4xlarge instance running Ubuntu, containing all necessary data and dependencies. This was the environment in which we benchmarked the original code, as well as our parallelized versions from our earlier discussion of the use of multiprocessing.

To run our PySpark KDE function, we were given access to a Spark cluster on AWS, comprised of a master and four M4.xlarge worker instances. We installed necessary dependencies on each of the nodes. Although we are unable to publicly provide the PySpark code that we wrote, we will describe the setup we went through for general reference in use cases similar to ours.

First, each worker node required Python 3, as well as Pandas to be installed. The default AWS Spark cluster configuration for ERM 5.8.0 should have Python 3.4 installed by default. You can run the below to install Pandas:

sudo pip-3.4 install pandas --upgrade

In addition, put the following in the .bashrc file of each node, including the master:

export PYSPARK_PYTHON=python3

Finally, at the master node, move the provided NY directory (containing our data) onto the Hadoop file system via:

hadoop fs -put NY

Our code was then runnable via:

spark-submit --num-executors 4 --executor-cores 4 spark_kde.py

Results and Performance Analysis

multiprocessing Execution Time

Version Time
Serial 25m6s
Parallelism Between Cities 17m18s
Parallelism Between Cities and Phases 15m41s

By preprocessing each city as its own Process, we were able to achieve a speedup of 1.45x over the serial version. By creating a separate Process for each phase within the preprocessing of a city, we were able to achieve a speedup of 1.6x over the serial version. The gains from these simple optimizations were useful, as they took little time to implement and resulted in no difference in the functionality of the existing code.

Analysis of Spark KDE results

First, we do a qualitative analysis of our Spark KDE results. In our results shown here, we chose to focus on one feature of the OSM data: the locations of bars in the New York City metropolitan area. This allowed us to do a simple comparison between our results and Zoba’s.

Below is a screenshot of Zoba’s current result for the KDE of NYC bars, after some preprocessing and normalization of the data. Image 

And here, we have the result from our Spark KDE function. Unlike in Zoba’s visualization, this result is obtained without any preprocessing on the data. This is why they appear different, but our goal was to see that our Spark KDE function returned comparable values to the existing solution.

Image 

We can see between these two figures that the general geographic distribution of the Spark KDE is pretty consistent with that of the current Zoba implementation.

Next, we look at the runtimes for this same scenario (computing the KDE of bar distributions in NYC). Zoba’s current implementation takes 5 seconds to compute this KDE across all cells. Our parallelized version across cities discussed earlier takes 35.3 seconds for the same (likely due to fact that the cores have access to limited bandwidth with the RAM, but we note that this version takes a shorter amount of time overall).

Finally, below is a series of runtimes of our Spark code, for various cluster configurations and partitionings of the grid:

# Partitions Configuration Runtime (s)
4 1-node cluster, 4 cores per executor 141.9
8 2-node cluster, 4 cores per executor 79.6
16 4-node cluster, 4 cores per executor 46.3

We can see that the best achieved performance under our setup is close to the performance of the parallelized version, but substantially slower than the current solution. We believe that this is a sign that we are not in a data size regime that takes full advantage of the Spark framework. If we were to increase the size of the grid, for example, we would benefit from being able to compute the KDE simultaneously across portions of the grid, versus doing so serially.

As we hoped, the speedups in the Spark code seemed to occur linearly with the added resources. We fit a log-log linear regression between the number of executors and runtime, and found that the trend has a good exponential fit:

Image 

Image 

We believe that our prototype Spark KDE function shows the feasibility of Zoba integrating Spark into their product in the future. Although Zoba’s current implementation using Pandas dataframes and SciPy functions is efficient, we believe that these solutions will not scale well if Zoba takes in much more data, or tries to model a much larger geographic area than a single city (such as the entire North American continent). Using Spark, however, would allow Zoba to circumvent such data challenges.


Conclusion

Goals Achieved

We believe that we have achieved the following in this project:

Lessons Learned

Improvements Suggested to Zoba / Future Work


References

Duong, Tarn. ks: Kernel Density Estimation for Bivariate Data. April 2018.

Microsoft .NET Customer Solution Case Study. “Trading Firm Increases Revenue Fourfold Through Parallel Development Environment.” May 2011.

SciPy Gaussian KDE Documentation. https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.gaussian_kde.html

Wang, Bao et al. “Deep Learning for Real-Time Crime Forecasting and Its Ternarization.” UCLA. 2017.

Zaccone, Giancarlo. Python Parallel Programming Cookbook: Master Efficient Parallel Programming to Build Powerful Applications Using Python. Birmingham: Packt, 2015.