Introduction
This project was a collaboration between Master’s students in the Harvard Institute for Applied Computational Science and Zoba, Inc., a startup currently operating out of the Harvard Innovation Lab who is using machine learning to make predictions about criminal risk, public health outcomes, and other events inside American cities and around the globe.
Students:
- Justin S. Lee
- Nate Stein
Learn more about Zoba:
- Website: https://www.zoba.com/
Project Guidelines: http://iacs-courses.seas.harvard.edu/courses/cs205/projects.html
The Project
Business Situation
Zoba is a startup currently within the Harvard Innovation Lab. Its goal is to model distributions of events in a city, conditioned on the time of year and the presence of geospatial features such as restaurants, police stations, places of worship, hospitals, and more. These events could be related to various crimes, matters of public health, and other applications. To achieve its client and revenue growth goals, Zoba wanted to expand both the number of cities and features per city that it used to train its machine learning models.
The Problem
Zoba’s product is faced with a combination of compute- and data-intensive tasks. The company’s machine learning models, currently at the city-scale, rely on a substantial amount of data drawn from a variety of sources, including geospatial databases and city government databases on crime. For instance, a dataset of crimes in New York City over a four-year period reaches 1.2 GB; whereas, a New York City dataset on locations of businesses, restaurants, police stations, and more only amounts to 1.3 MB, yet a crucial operation (Kernel Density Estimation, detailed later) done on this data takes the most time of any phase in the preprocessing.
Preprocessing the data that Zoba relies on required a significant amount of time (more than 25 minutes on a typical node) as is, and increasing the amount of data used later on would potentially make model experimentation prohibitively costly. As such, Zoba is concerned about their capacity to handle increasing data volumes and more complex client requests.
After speaking with founders of the company at a career event at the Innovation lab, our team decided to reach out to Zoba with the following project: Could we deliver material performance improvements in Zoba’s existing, entirely serial code by finding opportunities for parallelism, based on techniques and technologies discussed in CS 205? This website details our collaboration for this project.
Project Goals
We had two goals going into this project:
1) Examine Zoba’s existing data pipeline and use parallelization tools to enable performance improvements
- As Zoba is relatively early-stage, they are still in the process of refining their core product, and have only so many hands to work on a given task. We therefore hoped to introduce the company to some parallelization tools that they could use with only small modifications to their existing codebase. This would require us to study portions of their existing codebase. Thanks to regular correspondence and meetings at the i-Lab with the Zoba team over the course of the project, we were able to gain an understanding of how their codebase functions.
2) Prototype new tools that enable Zoba to process more data than before
- We hoped to share with Zoba our exposure to technologies that we learned about in CS 205, by implementing programs using such technologies that they may be interested in at a later stage in their development.
Existing Model and Data
Data
Zoba relies on two main sources of data:
1) Public crime data provided by city governments. This is either retrieved online via an API, or saved to disk in a one-time request. For simplicity, we assumed the latter situation. We imagine that as Zoba’s infrastructure improves, the former solution will become more viable.
2) Data on locations of roads, shops, public transit and other amenities queried from the public OpenStreetMap (OSM) API and downloaded as XML files. The data is processed to reduce it to a list of coordinates with associated type of amenity.
Code Base & Architecture Characteristics
Thanks to our colleagues at Zoba, we were given access to the portion of their existing codebase that handled all data preprocessing. We found that this existing code was serial and written entirely in Python. The rough structure of this code is described in the below phases for a given city:
Phase 1: Create a grid of latitude-longitude cells over a predefined boundary for the city, keeping track of the bounding box and center point for each cell, and save.
Phase 2: Construct geospatial histograms of crime occurrances over the cells, and save.
Phase 3: Construct KDEs for each type of amenity in the OSM data, and save.
Phase 4: Consolidate the results of (2) and (3) above into a single file.
The code relies on reliable and popular packages such as Pandas and SciPy for data manipulation and statistical functions. We hoped to explore alternative technologies that might be more amenable to much larger scales of data.
Finding Parallelism Candidates
Methodology
- Identify bottlenecks: in what sections of the application is significantly more time spent? What are the related functions?
- Determine whether they can be parallelized.
- Find appropriate tools to parallelize them.
An underlying theme to the above methodology is the idea of partitioning (decomposing) Zoba’s preprocessing routine: how can we segment their preprocessing routine in terms of:
- Partitions of data
- Processes associated with each partition of data
How to Identify Bottlenecks?
- Beware of only using
time
at the function level! - Use multiple profiling tools (
cProfile
,line_profiler
,snakeviz
)
cProfile
Results
Where Did all the Time Go?
- Reading and writing to CSV files.
- Accessing the
numpy.ndarray.values
property. - Computing KDEs.
These results give us a good overview of methods taking a lot of time, but where exactly in the codebase are these methods being called?
line_profiler
Results
Total time: 690.049 s
File: /home/ubuntu/Pipeline/zoba_util_ns_v2.py
Function: build_kde_osm_data at line 412
Line # Hits Time Per Hit % Time Line Contents
==============================================================
412 @profile
413 def build_kde_osm_data(new_df, osm_df, kde_cutoff):
414 3 62888.0 20962.7 0.0 lat_step, lon_step = get_lat_lon_step(new_df)
415 3 1278.0 426.0 0.0 feature_cols = new_df.columns.drop(['lat','lon','grid_id'])
416 3 6.0 2.0 0.0 kde_cols = []
417 221 441.0 2.0 0.0 for col in feature_cols: # perform KDE on substantive features
418 218 1999926.0 9174.0 0.3 sub_df = osm_df[osm_df['indicator'] == col]
419 218 1318.0 6.0 0.0 if len(sub_df) >= kde_cutoff:
420 65 2976.0 45.8 0.0 print('calculating...',col,'KDE')
421 65 19166.0 294.9 0.0 values = np.array(sub_df['lat']), np.array(sub_df['lon'])
422 65 53273.0 819.6 0.0 kernel = sp.gaussian_kde(values)
423 65 687899489.0 10583069.1 99.7 new_df[col] = generate_kde(new_df, kernel, lat_step, lon_step)
424 65 192.0 3.0 0.0 kde_cols.append(col)
425 3 8082.0 2694.0 0.0 return new_df[['lat','lon','grid_id'] + kde_cols]
Now let’s go one step further:
Total time: 678.369 s
File: /home/ubuntu/Pipeline/zoba_util_ns_v2.py
Function: generate_kde at line 428
Line # Hits Time Per Hit % Time Line Contents
==============================================================
428 @profile
429 def generate_kde(df, kernel, lat_step, lon_step):
430 65 88.0 1.4 0.0 num_samples = 10
431 65 228.0 3.5 0.0 grid_densities, random_samples = [], np.zeros([num_samples, 2]) # set placeholder
432 1688270 153286625.0 90.8 22.6 for i, row in df.iterrows():
433 1688205 94257454.0 55.8 13.9 lat, lon = row['lat'], row['lon']
435 1688205 427225616.0 253.1 63.0 val = kernel.evaluate([lat, lon])[0]
436 1688205 3598949.0 2.1 0.5 grid_densities.append(val)
437 65 80.0 1.2 0.0 return grid_densities
Highlights
line_profiler
enables us to continue drilling down until we find the exact lines impairing solution times. With the above example, first we see that generating KDEs take a significant amount of time.
new_df[col] = generate_kde(new_df, kernel, lat_step, lon_step)`
We continue stepping into the function in order to understand where the time is going:
val = kernel.evaluate([lat, lon])[0]
Parallelization Techniques
From our profiling results obtained using cProfile
and line_by_line
, we concluded that there were two primary sections and ways to parallelize our code, as follows.
Loops (via multiprocessing
module)
A significant amount of time was spent in various preprocessing loops, leading to:
Question: Is there a dependence between different iterations of the loop?
Answer: No. These loops lent themselves well to worksharing constructs: splitting up the existing problem into multiple parallel units of execution and assigning the work to individual processors.
Process-based parallelism with multiprocessing
module
In Python, running multiple threads is effectively not possible because the global interpreter lock (GIL) enforces a shared memory space in the most common Python implementation: CPython. Python processes, however, have separate memory and are (can be) assigned their own CPU, allowing us to do multiprocessing around the GIL.
Stage 1: Preprocessing Each City as its Own Process
For example, in the following code snippet we initiate a process pool and assign the task of preprocessing features for an individual city to its own multiprocessing.Process
object, then continue on with the serial code once the parallel city preprocessing is complete.
def setup_city_pipeline():
# init Pool
num_processors = mp.cpu_count()
pool = NonDaemonPool(processes = min(num_processors, len(CITIES)))
# run city_pipeline for each city
pool.starmap(city_pipeline, [(city,) for city in CITIES])
core_benchmark_phase() # needs to be serial
def city_pipeline(city):
phase_1(city)
phase_2(city)
phase_3(city)
...
Pool.starmap
(and Pool.map
) lock the calling program until all the constituent processes are finished. This works fine for us because we need to preprocess each city before continuing to the next stage.
Stage 2: Parallelizing Phases within Cities
Next, we introduced an additional layer of process separation by creating a Process
for phases 2 and 3 within city_pipeline()
, since these do not have to be performed in order:
def city_pipeline (city):
phase_1(city)
"""Now let's run each phase of the pipeline as its own process."""
proc_1 = NonDaemonProcess(target=phase_2, args=(city,))
proc_2 = NonDaemonProcess(target=phase_3, args=(city,))
proc_1.start()
proc_2.start()
# Hang until above two processes are finished.
while proc_1.is_alive() or proc_2.is_alive():
continue
...
Additional Considerations
Circumventing the Daemon
When instantiating a pool using the multiprocessing.Pool
wrapper function, the underlying Process
objects are initialized as daemonic processes (Process.daemon = True
). This essentially prevents a Process
from creating its own child Process
, which would prevent us from parallelizing phases within cities. To bypass this, we can create our own Pool
subclass where the constituent Process
objects are non-daemonic by ensuring their daemon
attribute always returns False
.
class NonDaemonProcess(Process):
"""Subclass for `Process` where `daemon` attribute always False."""
@property
def daemon (self):
return False
@daemon.setter
def daemon (self, value):
pass
class NonDaemonPool(Pool):
Process = NonDaemonProcess
Scaling Efficiency
Our solution exhibits strong scaling but only to a limited degree: the solution time improves with a growing number of CPUs up to the number of cities whose data we need to preprocess but not beyond that. Once Zoba increases the number of cities they incorporate into their models past the number of available CPUs, they can improve solution time by adding CPUs.
Maximum Speedup
To determine what the maximum speedup is for parallelizing between cities, we first identified all the functions using for
loops to go through each city, because these can be made parallel:
Function | Seconds |
---|---|
geo_phase | 692 |
crime_phase | 181 |
Together, they take up approximately 14.6 minutes. Using Amdahl’s Law, we can calculate a (highly) theoretical maximum speedup S achievable using Zoba’s 16 processors:
However, Zoba’s preprocessing routine is not one you can partition into arbitrary chunks. The data is most logically and computationally efficiently partitioned by city, and an N in excess of the number of cities is not very useful. Nonetheless, the fact that the majority (58%) of Zoba’s preprocessing routine is parallelizable bodes well for these and future parallelization efforts.
Limits and Overhead
The above theoretical speedup analysis was naive in that it implied the workload by city is relatively equal. This is not the case, as larger cities (or cities with more crime) tend to require greater preprocessing times. The preprocessing time for Philadelphia, for example, is much faster than it is for Chicago. Therefore, our parallelize-by-city solution is limited by the amount of time it takes the most computationally-intensive city to preprocess, i.e., our solution suffers from load imbalance.
The overhead is minimal in parallelizing the pipeline for cities because each city’s Process
can preprocess data for that city without communicating with the other Process
es. Any overhead predominantly comes from spawning the Process
objects, which in our solution is only done once with (presumably) little impact on the overall execution time. However, this may make a bigger difference when we parallelize between phases (in addition to cities).
KDEs (via pyspark
)
Kernel Density Estimation (KDE) is a common method of estimating a (usually unknown) probability distribution over a set of points, given a sample of n points from that distribution. Zoba uses the SciPy implementation of the Gaussian KDE method to approximate the geographic (latitude-longitude) distribution of various kinds of features of a city, such as schools or restaurants. This data is then used to make predictions on events related to crime, public health, and more. Computing KDEs for many points at a time can be time-intensive; indeed, we observed that between all of the phases described in the above section Existing Model and Data, Phase 2 consistently took the longest of all. However, as we will see below, the problem is embarassingly parallel.
The general formula to evaluate the kernel density function at a given point x is
where in the above, the summation is done over all points x i in the data drawn from the unknown distribution. As we will discuss later, this feature of the KDE computation will allow us to very conveniently parallelize our code.
The function KH accepts the vector specified in the earlier summation and is of the form
Here, the function K is known as the kernel function, and must be a symmetric probability density function over some dimension. H is the so-called “bandwidth” matrix, and is of dimension dxd, where d is the dimensionality of the data. The choice for H depends on the desired properties of the KDE, such as smoothness.
It is usually the case that the choice of K is non-crucial to the performance. Therefore, one common choice for K is the multivariate Gaussian distribution. When this is applied to KH, we obtain the following:
In this context, H assumes the value of the covariance matrix of the data, which in this context is computed from the longitudes and latitudes of the data. This matrix is scaled by a “bandwidth” parameter h. Following the default option for h in the Scipy implementation of the Gaussian KDE, we use Scott’s rule: h = n -1/(d+4) . Here, n is the number of data points and d is the dimensionality of the data.
Having summarized the computation of the KDE, we now discuss its usage by Zoba and how it can be parallelized. The KDE computation can be summarized by the following pseudocode:
kde_values = {}
for feature in features:
for grid_point in city_grid:
data = OSM_DATA[feature]
kde_values[feature].append( sp.gaussian_kde(data).evaluate(grid_point) )
In short, the code (1) forms a subset of the data from OSM (essentially, a list of features such as schools, restaurants, and bars, along with their coordinates) feature by feature, and (2) evaluates the kernel function for that feature at each point in the longitude-latitude grid of points formed in Phase 1 from Existing Model and Data, the basic idea being illustrated below for a portion of New York. As Zoba specifies each cell to correspond to 250x250 m2 in reality, this grid can reach several tens of thousands of points in size.
We refer the reader to the earlier-mentioned SciPy documentation and a helpful guide here for more detailed theoretical information on the KDE.
PySpark Code
The two factors above led us to develop a multivariate Gaussian KDE function using the pyspark
module, which is the Python API for Spark. Spark is useful here because we are applying a relatively straightforward operation without interprocess dependencies on a collection of many data points (nearly 40k grid points for New York), and its data structures allow for fast and easy statistical computations such as the covariance matrix, on which the earlier-mentioned matrix H depends.
We will now discuss the actual implementation of the KDE function in PySpark. It is evident from the earlier discussion of the KDE that every evaluation of the KDE function on a grid point is independent of the others. Therefore, these can be done in parallel, and Spark makes this very convenient to operate on different chunks of our grid through the map()
function. A basic pseudocode version of the PySpark KDE function is below:
osm_data = sqlContext.read.csv("osm_data.csv") # read in OSM data into a Spark DataFrame
osm_pandas = pandas.read_csv("osm_data.csv")
city_grid = sqlContext.read.csv("grid.csv") # read in city grid points
results = []
for feature in features:
data = osm_data.filter(osm_data.feature == feature)
h = n**(-1./(d + 4))
H = h**2 * covariance_matrix(data)
results.append(city_grid.map(lambda x: kde_function(osm_pandas, x, H)))
We note that we used two copies of our OSM data - one read in as a Spark dataframe, and the other read in as a Pandas dataframe. The reason for this is that Spark prevents access to a (by default, distributed) dataframe when we are performing a map
over another dataframe without some sort of serialization. Indeed, this would be the case if we map over the city_grid
dataframe, and then reference the osm_data
variable from kde_function
. We made a design decision to utilize a Pandas dataframe for this portion, but keep the Spark dataframe version for speedy computation of the covariance matrix, the computation of which can get expensive. For instance, the New York City OSM data we had access to had 36,113 rows to consider across all types of features of the city. Doing this allowed us to utilize some useful and mature Pandas features (such as the apply function) that would be more tedious to use or implement with Spark dataframes.
One consideration of note is that the grid file is not particularly large; we noticed that as a result, Spark distributed the file among fewer partitions than were available. This caused us some initial confusion, as we were not observing speedups proportional to added resources as we expected. However, we realized that the RDD had to be repartitioned in order for us to utilize all available partitions, and therefore maximize the number of operations being done in parallel. This was crucial as the KDE is compute-intensive, and so the more work we can do on different sections of the grid at once, the more time we would save.
Finally, we wrote a script to validate the results of our Spark KDE against the SciPy Gaussian KDE. This was provided to the instructors to validate our results. We showed that on a toy dataset, the Spark KDE consistently generated results at randomly generated points within close to 10e-15 of the SciPy results at those same points.
The section to follow contains a performance comparison between the original SciPy-based KDE code, and the PySpark based one.
Infrastructure and Code Dependencies
We were given access to Zoba’s current data processing code through an Amazon Web Services (AWS) g3.4xlarge instance running Ubuntu, containing all necessary data and dependencies. This was the environment in which we benchmarked the original code, as well as our parallelized versions from our earlier discussion of the use of multiprocessing
.
To run our PySpark KDE function, we were given access to a Spark cluster on AWS, comprised of a master and four M4.xlarge worker instances. We installed necessary dependencies on each of the nodes. Although we are unable to publicly provide the PySpark code that we wrote, we will describe the setup we went through for general reference in use cases similar to ours.
First, each worker node required Python 3, as well as Pandas to be installed. The default AWS Spark cluster configuration for ERM 5.8.0 should have Python 3.4 installed by default. You can run the below to install Pandas:
sudo pip-3.4 install pandas --upgrade
In addition, put the following in the .bashrc
file of each node, including the master:
export PYSPARK_PYTHON=python3
Finally, at the master node, move the provided NY
directory (containing our data) onto the Hadoop file system via:
hadoop fs -put NY
Our code was then runnable via:
spark-submit --num-executors 4 --executor-cores 4 spark_kde.py
Results and Performance Analysis
multiprocessing
Execution Time
Version | Time |
---|---|
Serial | 25m6s |
Parallelism Between Cities | 17m18s |
Parallelism Between Cities and Phases | 15m41s |
By preprocessing each city as its own Process
, we were able to achieve a speedup of 1.45x over the serial version. By creating a separate Process
for each phase within the preprocessing of a city, we were able to achieve a speedup of 1.6x over the serial version. The gains from these simple optimizations were useful, as they took little time to implement and resulted in no difference in the functionality of the existing code.
Analysis of Spark KDE results
First, we do a qualitative analysis of our Spark KDE results. In our results shown here, we chose to focus on one feature of the OSM data: the locations of bars in the New York City metropolitan area. This allowed us to do a simple comparison between our results and Zoba’s.
Below is a screenshot of Zoba’s current result for the KDE of NYC bars, after some preprocessing and normalization of the data.
And here, we have the result from our Spark KDE function. Unlike in Zoba’s visualization, this result is obtained without any preprocessing on the data. This is why they appear different, but our goal was to see that our Spark KDE function returned comparable values to the existing solution.
We can see between these two figures that the general geographic distribution of the Spark KDE is pretty consistent with that of the current Zoba implementation.
Next, we look at the runtimes for this same scenario (computing the KDE of bar distributions in NYC). Zoba’s current implementation takes 5 seconds to compute this KDE across all cells. Our parallelized version across cities discussed earlier takes 35.3 seconds for the same (likely due to fact that the cores have access to limited bandwidth with the RAM, but we note that this version takes a shorter amount of time overall).
Finally, below is a series of runtimes of our Spark code, for various cluster configurations and partitionings of the grid:
# Partitions | Configuration | Runtime (s) |
---|---|---|
4 | 1-node cluster, 4 cores per executor | 141.9 |
8 | 2-node cluster, 4 cores per executor | 79.6 |
16 | 4-node cluster, 4 cores per executor | 46.3 |
We can see that the best achieved performance under our setup is close to the performance of the parallelized version, but substantially slower than the current solution. We believe that this is a sign that we are not in a data size regime that takes full advantage of the Spark framework. If we were to increase the size of the grid, for example, we would benefit from being able to compute the KDE simultaneously across portions of the grid, versus doing so serially.
As we hoped, the speedups in the Spark code seemed to occur linearly with the added resources. We fit a log-log linear regression between the number of executors and runtime, and found that the trend has a good exponential fit:
We believe that our prototype Spark KDE function shows the feasibility of Zoba integrating Spark into their product in the future. Although Zoba’s current implementation using Pandas dataframes and SciPy functions is efficient, we believe that these solutions will not scale well if Zoba takes in much more data, or tries to model a much larger geographic area than a single city (such as the entire North American continent). Using Spark, however, would allow Zoba to circumvent such data challenges.
Conclusion
Goals Achieved
We believe that we have achieved the following in this project:
- Reduced execution time for Zoba’s existing preprocessing routine by ~37.5%, potentially enabling the addition of more data to preprocess
- Prototyped highly scalable KDE calculation code that will keep execution times tolerable with an increasing number of cities and/or larger geographies.
- Successfully relayed some of the parallel programming tools and paradigms from CS 205(
multiprocessing
,pyspark
) to Zoba so that they can be mindful of them when considering their pipeline architecture
Lessons Learned
- In the wild, you may encounter algorithms written to perform many tasks sequentially, but this might not out of innate necessessity.
- Using multiple profiling tools is essential to properly optimize existing code base and identify candidates for parallelism.
- In real-world applications where you are limited to existing architectures, simpler is sometimes better.
Improvements Suggested to Zoba / Future Work
- Continue analyzing profiling results to find methods that could be run more efficiently (even if not parallelizable).
- Determine how to account for load imbalance between cities and phases. Certain cities (e.g., NYC) are much more computationally expensive than others (e.g., PHI), which limits speedup benefits of parallelism to our lowest-common-denominator cities. Furthermore, the execution time between phases within a city can be very imbalanced. We believe that these considerations can be used to determine what tools to use where. For example, if the KDE computations for New York take particularly long, then we can use a Spark-based solution for that part only, but use the existing solution for the KDEs of the other cities.
References
Duong, Tarn. ks: Kernel Density Estimation for Bivariate Data. April 2018.
Microsoft .NET Customer Solution Case Study. “Trading Firm Increases Revenue Fourfold Through Parallel Development Environment.” May 2011.
SciPy Gaussian KDE Documentation. https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.gaussian_kde.html
Wang, Bao et al. “Deep Learning for Real-Time Crime Forecasting and Its Ternarization.” UCLA. 2017.
Zaccone, Giancarlo. Python Parallel Programming Cookbook: Master Efficient Parallel Programming to Build Powerful Applications Using Python. Birmingham: Packt, 2015.