Speed Up Your Code: Parallel Processing with multidplyr
Written on December 18, 2016
There’s nothing more frustrating than waiting for long-running R scripts to iteratively run. I’ve recently come across a new-ish package for parallel processing that plays nicely with the tidyverse:
multidplyr. The package has saved me countless hours when applied to long-running, iterative scripts. In this post, I’ll discuss the workflow to parallelize your code, and I’ll go through a real world example of collecting stock prices where it improves speed by over 5X for a process that normally takes 2 minutes or so. Once you grasp the workflow, the parallelization can be applied to almost any iterative scripts regardless of application.
Table of Contents
- Why Parallel Processing?
- Real World Example
- Further Reading
multidplyr package is not available on CRAN, but you can install it using
For those following along in R, you’ll need to load the following packages, which are available on CRAN:
If you don’t have these installed, run
install.packages(pkg_names) with the package names as a character vector (
pkg_names <- c("rvest", "quantmod", ...)).
I also recommend the open-source RStudio IDE, which makes R Programming easy and efficient.
Computer programming languages, including R and python, by default run scripts using only one processor (i.e. core). Under many circumstances this is fine since the computation speed is relatively fast. However, some scripts just take a long time to process, particularly during iterative programming (i.e. using loops to process a lot of data and/or very complex calculations).
Most modern PC’s have multiple cores that are underutilized. Parallel processing takes advantage of this by splitting the work across the multiple cores for maximum processor utilization. The result is a dramatic improvement in processing time.
While you may not realize it, most computations in R are loops that can be split using parallel processing. However, parallel processing takes more code and may not improve speeds, especially during fast computations because it takes time to transmit and recombine data. Therefore, parallel processing should only be used when speed is a significant issue.
When processing time is long, parallel processing could result in a significant improvement. Let’s check out how to parallelie your R code using the
Figure 1: multidplyr Workflow
Essentially, you start with some data set that you need to do things to multiple times. Your situation generally falls into one of two types:
- It could be a really large data set that you want to split up into several small data sets and perform the same thing on each.
- It could be one data set that you want to perform multiple things on (e.g. apply many models).
The good news is both situations follow the same basic workflow. The toughest part is getting your data in the format needed to process using the workflow. Don’t worry, we’ll go through a real world example next so you can see how this is accomplished.
We’ll go through the
multidplyr workflow using a real world example that I routinely use: collecting stock prices from the inter-web. Other uses include using modeling functions over grouped data sets, using many models on the same data set, and processing text (e.g. getting n-grams on large corpora). Basically anything with a loop.
In preparation for collecting stock prices, we need two things:
- A list of stocks
- A function to get stock prices from a stock symbol
First, we use
rvest to get the list of S&P500 stocks,
Second, we create a function that leverages the
quantmod::getSymbols to return the historical stock prices in tidy format. This function will be mapped to all of the 500+ stocks next.
The next computation is the routine that we wish to parallelize, but first we’ll time the script running on one processor, looping in series. We are collecting ten years of historical daily stock prices for each of the 500+ stocks. To do this, the script below uses the
purrr::map() function to map
get_stock_prices() to each stock
sp_500. The loop in our case is the iterative application of a function to each stock. This operation will be split by group in the next section. The
proc.time() function is used to time the routine running without parallel processing.
sp_500_processed_in_series is a
tibble (tidy data frame) that is nested with two levels: the first has the stock
stock.prices. The variable,
stock.prices, contains the historical stock prices for each stock.
Let’s verify we got the daily stock prices for every stock. We’ll use the
tidyr::unnest() function to expand the
stock.prices for the list of stocks. At 1,203,551 rows, the full list has been obtained.
And, let’s see how long it took when processing in series. The processing time is the time elapsed in seconds. Converted to minutes this is approximately 1.68 minutes.
We just collected ten years of daily stock prices for over 500 stocks in about 1.68 minutes. Let’s parallelize the computation to get an improvement. We will follow the six steps shown in Figure 1.
Prior to starting, you may want to determine how many cores your machine has. An easy way to do this is using
parallel::detectCores(). This will be used to determine the number of groups to split the data into in the next set.
Let’s add groups to
sp_500. The groups are needed to divide the data across your
cl number cores. For me, this is 8 cores. We create a
group vector, which is a sequential vector of
1:cl (1 to 8) repeated the length of the number of rows in
sp_500. We then add the
group vector to the
sp_500 tibble using the
create_cluster() function from the
multidplyr package. Think of a cluster as a work environment on a core. Therefore, the code below establishes a work environment on each of the 8 cores.
Next is partitioning. Think of partitioning as sending a subset of the initial
tibble to each of the clusters. The result is a partitioned data frame (
party_df), which we explore next. Use the
partition() function from the
multidplyr package to split the
sp_500 list by group and send each group to a different cluster.
by_group, looks similar to our original
tibble, but it is a
party_df, which is very different. The key is to notice that the there are 8
Shard has between 63 and 64 rows, which evenly splits our data among each shard. Now that our
tibble has been partitioned into a
party_df, we are ready to move onto setting up the clusters.
The clusters have a local, bare-bones R work environment, which doesn’t work for the vast majority of cases. Code typically depends on libraries, functions, expressions, variables, and/or data that are not available in base R. Fortunately, there is a way to add these items to the clusters. Let’s see how.
For our computation, we are going to need to add several libraries along with the
get_stock_prices() function to the clusters. We do this by using the
cluster_assign_value() functions, respectively.
We can verify that the libraries are loaded using the
We can also verify that the functions are loaded using the
Now that we have our clusters and partitions set up and everything looks good, we can run the parallelized code. The code chunk is the same as the series code chunk with two exceptions:
- Instead of starting with the
sp_500 tibble, we start with the
- We combine the results at the end using the
Let’s verify we got the list of stock prices. We’ll use the
tidyr::unnest() function to expand the
stock.prices for the list of stocks. This is the same list as
sp_500_processed_in_series, but it’s sorted in order by which groups finished first. If we want to return the
tibble in the same order of
sp_500, we can easily pipe (
as_tibble() in the code chunk above.
And, let’s see how long it took when processing in parallel.
The processing time is approximately 0.31 minutes, which is 5.4X faster! Note that it’s not a full 8X faster because of transmission time as data is sent to and from the nodes. With that said, the speed will approach 8X improvement as calculations become longer since the transmission time is fixed whereas the computation time is variable.
Parallelizing code can drastically improve speed on multi-core machines. It makes the most sense in situations involving many iterative computations. On an 8 core machine, processing time significantly improves. It will not be quite 8X faster, but the longer the computation the closer the speed gets to the full 8X improvement. For a computation that takes two minutes under normal conditions, we improved the processing speed by over 5X through parallel processing!
The focus of this post was on the
multidplyr package, a package designed to do parallel processing within the
tidyverse. We worked through the five main steps in the
multidplyr workflow on a real world example of collecting historical stock prices for a large set of stocks. The beauty is that the package and workflow can be applied to anything from collecting stocks to applying many models to computing n-grams on textual data and more!
multidplyron GitHub: The vignette explains the
multidplyrworkflow using the
flightsdata set from the