Introduction

It shouldn’t come as a surprise that I like to optimize (Python) code. So when a colleague (who is more experienced in R than Python/Numpy) mentioned in a presentation that a certain calculation was rather slow, I was intrigued. Would I be able to help speed it up? In this blog post, I’ll take you along in the process I went through to optimize the code.

After confirming the code was indeed written in Python and asking for access, I took a look at the function. Below is a modified version showcasing what was relevant to the optimization.

def calculate(data, no_tests=100):  # no_tests is limited due to function speed
    n, d = data.shape               # typically n = 1_000, 10_000 or 100_000, d = 9

    for i in range(no_tests):
        for j in range(d):
            # prepare indices
            a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,d)])
            # prepare auxiliary informationn of every individual to find peers
            aux = np.apply_along_axis(np.array2string,1, data[:,a_ind])
            # save data of selected
            selected = data[aux==aux[i],j]

            ... # perform some final processing of `selected`

Step 1: What does it do?

At first glance, it’s a double for-loop which creates some indices, uses them to transforms some of the data into strings as auxiliary data, and performs a selection based on equality in that auxiliary data. The first thing that stands out to me here is the np.array2string function call, as such a transformation is rather expensive, so I wonder if that is really needed. If not, that would probably be the place to start.

But let’s not get ahead of ourselves. Let’s consider what this function does step-by-step. Let’s set up a toy example with some sample data, pick some i,j in the middle of the loops, flatten the function and progress through a single run of the inner-most loop:

import numpy as np                                    # intermediate values:
n, d = 3, 4
i, j = 1, 2
data = np.arange(n*d).reshape(n, d) / 100             # [[0.  , 0.01, 0.02, 0.03],
                                                      #  [0.04, 0.05, 0.06, 0.07],
                                                      #  [0.08, 0.09, 0.1 , 0.11]]

range_1, range_2 = np.arange(0, j), np.arange(j+1,d)  # [0, 1], [3]
a_ind = np.concatenate([range_1, range_2])            # [0, 1, 3]

tmp = data[:,a_ind]                                   # [[0.  , 0.01, 0.03],
                                                      #  [0.04, 0.05, 0.07],
                                                      #  [0.08, 0.09, 0.11]]

aux = np.apply_along_axis(np.array2string, 1, tmp)    # ['[0.   0.01 0.03]',
                                                      #  '[0.04 0.05 0.07]',
                                                      #  '[0.08 0.09 0.11]']
comparisons = aux==aux[i]                             # [False, True, False]

selected = data[comparisons, j]                       # [0.1]

Looking at the flattened code and intermediate values from this sample run, we can see the process performed by the function much more clearly: In short, we exclude column j from the data and check whether any rows are equal to the selected row i. The original data in column j for each of those rows is then used for further calculations.

Step 2: What to Optimize

Now we know what the function does, we can try to optimize it. There are two candidate statements to optimize: creating the a_ind indices and determining which rows are equal using aux. Let’s test them both:

j, d = 3, 9
%timeit np.concatenate([range(0,j), range(j+1,d)])

7.18 µs ± 310 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

large_data = np.random.randn(1_000, 9)  # test speed using larger dataset
%timeit aux = np.apply_along_axis(np.array2string, 1, large_data); aux == aux[0]

130 ms ± 3.14 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

The equality check using np.array2string takes a factor of 20,000 more time than creating the indices. So as expected, that’s going to be the place to start. When comparing the speed, we’ll keep using the 1,000 x 9 large_data array we created here. This is large enough to give a good idea of the speed, but small enough to make testing fast.

Step 3: Optimizing within loops

The np.apply_along_axis(np.array2string, 1, large_data) statement is not just the slowest statement we found, but it’s extra worthwhile to optimize since it’s within the double nested loop. Any small improvement to a single run in that statement will add up quickly when run so many times in the loops. This means that, for now, we can just focus on optimizing that single statement as-is, without looking at the loops too.

Replacing np.array2string

First step is replacing row-equality check to no longer use strings. Instead of using np.array2string, we can check for element-wise equality between rows with np.allclose. This returns a single True or False when comparing two rows, just like with the strings previously. This results in a 4x speedup:

comparisons = np.apply_along_axis(np.allclose, 1, tmp, tmp[i])
# [False, True, False]
%timeit np.apply_along_axis(np.allclose, 1, large_data, large_data[0])

32.8 ms ± 1.76 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
-> 3.9x speedup

np.allclose vs np.equal

If we don’t need the tolerance-checking by np.allclose, we can even try just using np.equal instead. However, this returns the element-wise results, not a single True or False for each row:

comparisons = np.apply_along_axis(np.equal, 1, tmp, tmp[i])
# [[False, False, False],
#  [ True,  True,  True],
#  [False, False, False]]

To end up with the correct result, each row has to be summarized into True if the whole row is True, otherwise the row should be summarized to False. For this, we can use the np.all function, which works like an array-wide boolean AND operation.

# for example:
np.all([
    [ True,  True,  True],
    [ True,  True, False],
    [False, False, False],
], axis=1)
# [ True, False, False]

comparisons = np.all(np.apply_along_axis(np.equal, 1, tmp, tmp[i]), axis=1)
# [False, True, False]
%timeit np.all(np.apply_along_axis(np.equal, 1, large_data, large_data[0]), axis=1)

2.03 ms ± 59.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
-> 64x speedup

Perhaps surprisingly, this is another 16x faster! Even when adding the extra np.all call, it turns out that np.equal is much faster than np.allclose.

Removing np.apply_along_axis

Because of numpy’s broadcasting rules, we don’t actually need np.apply_along_axis to perform the element-wise comparison per row of data using np.equal. We can simply compare the entire array with the relevant row.

tmp == tmp[i]  # same as np.equal
# [[False, False, False],
#  [ True,  True,  True],
#  [False, False, False]]
np.all(tmp == tmp[i], axis=1)
# [False, True, False]

%timeit np.all(large_data == large_data[0], axis=1)

36.8 µs ± 2.06 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
-> 3,500x speedup

That’s a 3,500x speedup in total! Turns out that numpy by itself is pretty fast at performing these kinds of comparisons. With that, we’ve both simplified the code and made it a lot faster.

You may also notice that this version only takes ~5x as long as the index creation code we also timed earlier. It’s not worth it to spend time on this though, as it doesn’t scale with the amount of data rows, and we can speed it up using a different technique in the next step anyway.

Step 4: Factoring code out of loops

We’ve now made the code within the loops as fast as possible. The next step is to see if there is any work we’re unnecessarily doing again in any of the loops and well… making sure we don’t.

Inner loop

Remember that the inner loop was iterating over the 9 columns of our sample data? Each time we create the necessary indices and compare the data with the current row.

for i in range(no_tests):  # Outer loop
    for j in range(d):     # Inner loop
        a_ind = np.concatenate([range(0,j), range(j+1,d)])
        comparisons = np.all(data[:,a_ind] == data[i,a_ind], axis=1)

The indices depend on the column j, so within this loop we’re not doing any extra work there. But to calculate comparisons, only the columns of the a_ind indices are used. As a_ind only excludes a single column per iteration, each column is compared d-1 times. We can check this by printing a_ind for each iteration:

for j in range(d):
    a_ind = np.concatenate([range(0,j), range(j+1,d)])
    print(a_ind)
# [1 2 3]
# [0 2 3]
# [0 1 3]
# [0 1 2]

%%timeit
for i in range(10):
    for j in range(9):
        a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,9)])
        comparisons = np.all(large_data[:,a_ind] == large_data[i,a_ind], axis=1)

3.51 ms ± 109 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

The element-wise comparisons in data[:,a_ind] == data[i,a_ind] won’t change between iterations, so we’re actually making the same comparisons d-1 = 8 times! That makes this a good candidate for reintroducing an auxiliary variable, but this time outside of the inner loop:

%%timeit
for i in range(10):
    aux = large_data == large_data[i]
    for j in range(9):
        a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,9)])
        comparisons = np.all(aux[a_ind], axis=1)

1.24 ms ± 65.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
-> 2.7x speedup

We’re saving 7/8th of the comparisons compared to previously, reducing the amount of work within the inner loop. But out of the potential 8x speedup, we ‘only’ get 2.7x. Why? I can only speculate… ¯\_(ツ)_/¯

Outer loop

We’ve taken the element-wise data comparison out of the inner loop because those results do not change depending on the index j of the inner loop. Using the same reasoning, we can extract the a_ind creation out of the outer loop. Yes, we need to determine a_ind once for each j, but they don’t change depending on i. That does mean we need to have a separate for j in range(d) loop outside of the outer loop to make the list of indices, but then we can keep reusing them in the inner loop.

You might wonder why we even bother with this if each run only takes 7 µs. As the size of large_data increases to many thousands or even millions, those microseconds per iteration can still add up. Besides, I think it makes the code cleaner in a way: it clearly separates what needs to be done within the inner and outer loop from what does not.

%%timeit
a_indices = np.array([
    np.concatenate([np.arange(0,j), np.arange(j+1,d)])
    for j in range(d)
])
for i in range(10):
    aux = large_data == large_data[i]
    for j, a_ind in enumerate(a_indices):
        comparisons = np.all(aux[a_ind], axis=1)

910 µs ± 24.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
-> 3.8x speedup

Nothing major, but an improvement nonetheless.

Step 5: Parallelize and benchmark

As a final step, we can see that the code within the outer loop is so-called “embarassingly parallel”. This means each outer loop iteration is independent of the others. In this case, that’s because an iteration only depends on i. Succesfully parallelizing code can give an additional speedup of up to the number of cores used. This can range anywhere from 4x on your local laptop to 256x on servers with the latest AMD EPYC cpus1

However, that doesn’t mean it’s always worth it to actually write a parallel implementation. Parallelism in Python comes with some overhead, both in code complexity and in runtime. If these downsides don’t outweigh the speedup, it’s probably best to stick to a serial implementation.

In this case, we consider the parallel implementation below because the code we’re optimizing is simple to begin with and will be run on large data arrays with up to a million rows. This implementation is built using Python’s built-in multiprocessing.Pool and functools.partial.

from functools import partial
from multiprocessing import Pool, cpu_count

def _parallel_helper(i, data, a_indices):
    """Helper function to perform single run for parallelization"""
    n, d = data.shape
    aux = data == data[i]
    results = np.empty(d)
    for j, a_ind in enumerate(a_indices):
        comparisons = np.all(aux[a_ind], axis=1)
        results[j] = np.sum(comparisons)  # 'sum' as stand-in for final processing

def calc_parallel(data, no_tests, n_procs=cpu_count()):
    """Parallel implementation"""
    n, d = data.shape
    a_indices = np.array([
        np.concatenate([np.arange(0,j), np.arange(j+1,d)])
        for j in range(d)
    ])
    parallel_func = partial(_parallel_helper, data=data, a_indices=a_indices)
    with Pool(n_procs) as p:
        result = p.map(parallel_func, range(no_tests))
    return np.array(result)

The runtime overhead of starting a Pool is at least a second. This makes it unfair to compare to the previous optimized implementation for the large_data of size (1_000, 9), but as the data consists of more rows and more rows are compared, the parallel implementation should do better at some point.

To show this as clearly as possible, you can see the results of a small benchmarking experiment I’ve run in the graph below. It compares the parallel implementation with the implementation after each optimization step described in this blog. Each implementation is run with increasingly more rows of data, with no_tests set to n, until it takes more than 100 seconds to complete. The entire benchmarking code can be seen by clicking the spoiler tag below.

Benchmarking code
from functools import partial
from multiprocessing import Pool, cpu_count
from time import time
import numpy as np
import matplotlib.pyplot as plt

def calc_orig(data):
    """Original implementation"""
    n, d = data.shape
    results = np.empty((n,d))
    for i in range(n):
        for j in range(d):
            a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,d)])
            aux = np.apply_along_axis(np.array2string,1, data[:,a_ind])
            peers_data = data[aux==aux[i],j]
            results[i,j] = np.sum(peers_data)
    return results

def calc_inner_loop_optimized(data):
    """Optimized code within inner loop"""
    n, d = data.shape
    results = np.empty((n,d))
    for i in range(n):
        for j in range(d):
            a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,d)])
            comparisons = np.all(data[:,a_ind] == data[i,a_ind], axis=1)
            results[i,j] = np.sum(comparisons)
    return results

def calc_optimized(data):
    """Optimized code, including extractions from loop"""
    n, d = data.shape
    a_indices = np.array([
        np.concatenate([np.arange(0,j), np.arange(j+1,d)])
        for j in range(d)
    ])
    results = np.empty((n,d))
    for i in range(n):
        aux = data == data[i]
        for j, a_ind in enumerate(a_indices):
            comparisons = np.all(aux[a_ind], axis=1)
            results[i,j] = np.sum(comparisons)
    return results

def _parallel_helper(i, data, a_indices):
    """Helper function to perform single run for parallelization"""
    n, d = data.shape
    aux = data == data[i]
    results = np.empty(d)
    for j, a_ind in enumerate(a_indices):
        comparisons = np.all(aux[a_ind], axis=1)
        results[j] = np.sum(comparisons)

def calc_parallel(data, n_procs=cpu_count()):
    """Parallel implementation"""
    n, d = data.shape
    a_indices = np.array([
        np.concatenate([np.arange(0,j), np.arange(j+1,d)])
        for j in range(d)
    ])
    parallel_func = partial(_parallel_helper, data=data, a_indices=a_indices)
    with Pool(n_procs) as p:
        result = p.map(parallel_func, range(n))
    return np.array(result)

if __name__ == '__main__':
    magnitudes = range(1, 6)
    numbers = [1, 2, 5]
    N = [n * 10**m for m in magnitudes for n in numbers]
    functions = {
        'original': calc_orig,
        'inner loop optimized': calc_inner_loop_optimized,
        'fully optimized': calc_optimized,
        'parallel (4)': partial(calc_parallel, n_procs=4),
        'parallel (8)': partial(calc_parallel, n_procs=8),
        'parallel (12)': partial(calc_parallel, n_procs=12),
    }
    durations = {name: np.full(len(N), np.nan) for name in functions.keys()}
    skip = {name: False for name in functions.keys()}

    for idx, n in enumerate(N):
        print(n)
        data = np.random.randn(n, 9)
        for name, func in functions.items():
            if skip[name]:
                continue
            start = time()
            func(data)
            end = time()
            durations[name][idx] = end-start
            print(f'    {name}: {durations[name][idx]}')
            if durations[name][idx] > 100:
                skip[name] = True

    fig, ax = plt.subplots()
    ax.set_title('Speed comparison of implementations')
    ax.set_xscale('log')
    ax.set_xlabel('$n$')
    ax.set_yscale('log')
    ax.set_ylabel('time (s)')
    for name, duration in durations.items():
        ax.plot(N, duration, marker='+', ms=12, label=name)
    ax.legend(loc='lower right')
    ax.grid(which='major', color='black', alpha=.5, linestyle='-')
    ax.grid(which='minor', color='black', alpha=.25, linestyle=':')
    fig.savefig('case-study-1-benchmark.png')  # or .pdf or .svg

The log-log plot shows nicely that with each optimization step, we’re able to process more rows of data in the same amount of time. The parallel implementations are a slight exception to that rule when the number of rows n is less than 10,000. In those cases the constant runtime overhead of about one second means they’re actually slower than the fully optimized serial implementation.

For the parallel implementation, I’ve run it with a pool of 4, 8 and 12 processes on my 12-thread CPU. Similarly to inner loop optimization in step 4, we don’t see the speedup we would expect, but at least using more processes (up to n_procs=multiprocessing.cpu_count()) does reduce the runtime. Again, I can only speculate why we don’t get the speedup we might expect ¯\_(ツ)_/¯

Conclusion

And so we’ve gone from an R-inspired string-based implementation past a clean and minimal pure-Python one all the way to a fully optimized and parallelized implementation that can scale over many cores. I’ve had a lot of fun figuring out all these optimizations, and I hope you enjoyed coming along on this optimization journey with me.

Finally, I would like to thank my colleague for being open to having me tinker on their code.


  1. For single CPUs/machines. More is possible on (super)computer clusters using e.g. Dask or Ray