# Case Study 1: Row-wise equality checking

## Introduction

It shouldn’t come as a surprise that I like to optimize (Python) code. So when
a colleague (who is more experienced in **R** than Python/Numpy) mentioned in a
presentation that a certain calculation was rather slow, I was intrigued. Would
I be able to help speed it up? In this blog post, I’ll take you along in
the process I went through to optimize the code.

After confirming the code was indeed written in Python and asking for access, I took a look at the function. Below is a modified version showcasing what was relevant to the optimization.

```
def calculate(data, no_tests=100): # no_tests is limited due to function speed
n, d = data.shape # typically n = 1_000, 10_000 or 100_000, d = 9
for i in range(no_tests):
for j in range(d):
# prepare indices
a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,d)])
# prepare auxiliary informationn of every individual to find peers
aux = np.apply_along_axis(np.array2string,1, data[:,a_ind])
# save data of selected
selected = data[aux==aux[i],j]
... # perform some final processing of `selected`
```

## Step 1: What does it do?

At first glance, it’s a double for-loop which creates some indices, uses them to
transforms some of the data into strings as auxiliary data, and performs a
selection based on equality in that auxiliary data. The first thing that stands
out to me here is the `np.array2string`

function call, as such a transformation
is rather expensive, so I wonder if that is really needed. If not, that would
probably be the place to start.

But let’s not get ahead of ourselves. Let’s consider what this function does
step-by-step. Let’s set up a toy example with some sample data, pick some `i,j`

in the middle of the loops, flatten the function and progress through a single
run of the inner-most loop:

```
import numpy as np # intermediate values:
n, d = 3, 4
i, j = 1, 2
data = np.arange(n*d).reshape(n, d) / 100 # [[0. , 0.01, 0.02, 0.03],
# [0.04, 0.05, 0.06, 0.07],
# [0.08, 0.09, 0.1 , 0.11]]
range_1, range_2 = np.arange(0, j), np.arange(j+1,d) # [0, 1], [3]
a_ind = np.concatenate([range_1, range_2]) # [0, 1, 3]
tmp = data[:,a_ind] # [[0. , 0.01, 0.03],
# [0.04, 0.05, 0.07],
# [0.08, 0.09, 0.11]]
aux = np.apply_along_axis(np.array2string, 1, tmp) # ['[0. 0.01 0.03]',
# '[0.04 0.05 0.07]',
# '[0.08 0.09 0.11]']
comparisons = aux==aux[i] # [False, True, False]
selected = data[comparisons, j] # [0.1]
```

Looking at the flattened code and intermediate values from this sample run, we
can see the process performed by the function much more clearly: In short, we
exclude column `j`

from the data and check whether any rows are equal to the
selected row `i`

. The original data in column `j`

for each of those rows is then
used for further calculations.

## Step 2: What to Optimize

Now we know what the function does, we can try to optimize it. There are two
candidate statements to optimize: creating the `a_ind`

indices and determining
which rows are equal using `aux`

. Let’s test them both:

```
j, d = 3, 9
%timeit np.concatenate([range(0,j), range(j+1,d)])
```

7.18 µs ± 310 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

```
large_data = np.random.randn(1_000, 9) # test speed using larger dataset
%timeit aux = np.apply_along_axis(np.array2string, 1, large_data); aux == aux[0]
```

130 ms ± 3.14 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

The equality check using `np.array2string`

takes a factor of 20,000 more
time than creating the indices. So as expected, that’s going to be the place
to start. When comparing the speed, we’ll keep using the 1,000 x 9 `large_data`

array we created here. This is large enough to give a good idea of the speed,
but small enough to make testing fast.

## Step 3: Optimizing within loops

The `np.apply_along_axis(np.array2string, 1, large_data)`

statement is not just
the slowest statement we found, but it’s extra worthwhile to optimize since it’s
within the double nested loop. Any small improvement to a single run in that
statement will add up quickly when run so many times in the loops. This means
that, for now, we can just focus on optimizing that single statement as-is,
without looking at the loops too.

### Replacing *np.array2string*

First step is replacing row-equality check to no longer use strings.
Instead of using `np.array2string`

, we can check for element-wise equality
between rows with `np.allclose`

. This returns a single `True`

or `False`

when
comparing two rows, just like with the strings previously. This results in a
4x speedup:

```
comparisons = np.apply_along_axis(np.allclose, 1, tmp, tmp[i])
# [False, True, False]
%timeit np.apply_along_axis(np.allclose, 1, large_data, large_data[0])
```

32.8 ms ± 1.76 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

-> 3.9x speedup

*np.allclose* vs *np.equal*

If we don’t need the tolerance-checking by `np.allclose`

, we can even try just
using `np.equal`

instead. However, this returns the *element-wise* results,
not a single `True`

or `False`

for each row:

```
comparisons = np.apply_along_axis(np.equal, 1, tmp, tmp[i])
# [[False, False, False],
# [ True, True, True],
# [False, False, False]]
```

To end up with the correct result, each row has to be summarized into `True`

if
the whole row is `True`

, otherwise the row should be summarized to `False`

. For
this, we can use the `np.all`

function, which works like an array-wide boolean
AND operation.

```
# for example:
np.all([
[ True, True, True],
[ True, True, False],
[False, False, False],
], axis=1)
# [ True, False, False]
comparisons = np.all(np.apply_along_axis(np.equal, 1, tmp, tmp[i]), axis=1)
# [False, True, False]
%timeit np.all(np.apply_along_axis(np.equal, 1, large_data, large_data[0]), axis=1)
```

2.03 ms ± 59.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

-> 64x speedup

Perhaps surprisingly, this is another 16x faster! Even when adding the extra
`np.all`

call, it turns out that `np.equal`

is *much* faster
than `np.allclose`

.

### Removing *np.apply_along_axis*

Because of numpy’s broadcasting rules, we don’t
actually need `np.apply_along_axis`

to perform the element-wise comparison per
row of data using `np.equal`

. We can simply compare the entire array with the
relevant row.

```
tmp == tmp[i] # same as np.equal
# [[False, False, False],
# [ True, True, True],
# [False, False, False]]
np.all(tmp == tmp[i], axis=1)
# [False, True, False]
%timeit np.all(large_data == large_data[0], axis=1)
```

36.8 µs ± 2.06 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

-> 3,500x speedup

That’s a 3,500x speedup in total! Turns out that numpy by itself is pretty fast at performing these kinds of comparisons. With that, we’ve both simplified the code and made it a lot faster.

You may also notice that this version only takes ~5x as long as the index creation code we also timed earlier. It’s not worth it to spend time on this though, as it doesn’t scale with the amount of data rows, and we can speed it up using a different technique in the next step anyway.

## Step 4: Factoring code out of loops

We’ve now made the code within the loops as fast as possible. The next step is to see if there is any work we’re unnecessarily doing again in any of the loops and well… making sure we don’t.

### Inner loop

Remember that the inner loop was iterating over the 9 columns of our sample data? Each time we create the necessary indices and compare the data with the current row.

```
for i in range(no_tests): # Outer loop
for j in range(d): # Inner loop
a_ind = np.concatenate([range(0,j), range(j+1,d)])
comparisons = np.all(data[:,a_ind] == data[i,a_ind], axis=1)
```

The indices depend on the column `j`

, so within this loop we’re not doing any
extra work there. But to calculate `comparisons`

, only the columns of the
`a_ind`

indices are used. As `a_ind`

only excludes a single column per
iteration, each column is compared `d-1`

times. We can check this by printing
`a_ind`

for each iteration:

```
for j in range(d):
a_ind = np.concatenate([range(0,j), range(j+1,d)])
print(a_ind)
# [1 2 3]
# [0 2 3]
# [0 1 3]
# [0 1 2]
%%timeit
for i in range(10):
for j in range(9):
a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,9)])
comparisons = np.all(large_data[:,a_ind] == large_data[i,a_ind], axis=1)
```

3.51 ms ± 109 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

The element-wise comparisons in `data[:,a_ind] == data[i,a_ind]`

won’t change
between iterations, so we’re actually making the same comparisons `d-1 = 8`

times! That makes this a good candidate for reintroducing an auxiliary variable,
but this time *outside* of the inner loop:

```
%%timeit
for i in range(10):
aux = large_data == large_data[i]
for j in range(9):
a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,9)])
comparisons = np.all(aux[a_ind], axis=1)
```

1.24 ms ± 65.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

-> 2.7x speedup

We’re saving 7/8th of the comparisons compared to previously, reducing the amount of work within the inner loop. But out of the potential 8x speedup, we ‘only’ get 2.7x. Why? I can only speculate… ¯\_(ツ)_/¯

### Outer loop

We’ve taken the element-wise data comparison out of the inner loop because
those results do not change depending on the index `j`

of the inner loop. Using
the same reasoning, we can extract the `a_ind`

creation out of the outer loop.
Yes, we need to determine `a_ind`

once for each `j`

, but they don’t change
depending on `i`

. That does mean we need to have a separate `for j in range(d)`

loop outside of the outer loop to make the list of indices, but then we can
keep reusing them in the inner loop.

You might wonder why we even bother with this if each run only takes 7 µs. As
the size of `large_data`

increases to many thousands or even millions, those
microseconds per iteration can still add up. Besides, I think it makes the code
cleaner in a way: it clearly separates what *needs* to be done within the inner
and outer loop from what does *not*.

```
%%timeit
a_indices = np.array([
np.concatenate([np.arange(0,j), np.arange(j+1,d)])
for j in range(d)
])
for i in range(10):
aux = large_data == large_data[i]
for j, a_ind in enumerate(a_indices):
comparisons = np.all(aux[a_ind], axis=1)
```

910 µs ± 24.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

-> 3.8x speedup

Nothing major, but an improvement nonetheless.

## Step 5: Parallelize and benchmark

As a final step, we can see that the code within the outer loop is so-called
“embarassingly parallel”. This means each outer loop iteration is independent
of the others. In this case, that’s because an iteration only depends on `i`

.
Succesfully parallelizing code can give an additional speedup of up to the
number of cores used. This can range anywhere from 4x on your local laptop to
256x on servers with the latest AMD EPYC cpus^{1}

However, that doesn’t mean it’s always worth it to actually write a parallel implementation. Parallelism in Python comes with some overhead, both in code complexity and in runtime. If these downsides don’t outweigh the speedup, it’s probably best to stick to a serial implementation.

In this case, we consider the parallel implementation below because the code
we’re optimizing is simple to begin with and will be run on large data arrays
with up to a million rows. This implementation is built using Python’s built-in
`multiprocessing.Pool`

and `functools.partial`

.

```
from functools import partial
from multiprocessing import Pool, cpu_count
def _parallel_helper(i, data, a_indices):
"""Helper function to perform single run for parallelization"""
n, d = data.shape
aux = data == data[i]
results = np.empty(d)
for j, a_ind in enumerate(a_indices):
comparisons = np.all(aux[a_ind], axis=1)
results[j] = np.sum(comparisons) # 'sum' as stand-in for final processing
def calc_parallel(data, no_tests, n_procs=cpu_count()):
"""Parallel implementation"""
n, d = data.shape
a_indices = np.array([
np.concatenate([np.arange(0,j), np.arange(j+1,d)])
for j in range(d)
])
parallel_func = partial(_parallel_helper, data=data, a_indices=a_indices)
with Pool(n_procs) as p:
result = p.map(parallel_func, range(no_tests))
return np.array(result)
```

The runtime overhead of starting a `Pool`

is at least a second. This makes it
unfair to compare to the previous optimized implementation for the `large_data`

of size `(1_000, 9)`

, but as the data consists of more rows and more rows are
compared, the parallel implementation should do better at some point.

To show this as clearly as possible, you can see the results of a small
benchmarking experiment I’ve run in the graph below. It compares the parallel
implementation with the implementation after each optimization step described
in this blog. Each implementation is run with increasingly more rows of data,
with `no_tests`

set to `n`

, until it takes more than 100 seconds to complete.
The entire benchmarking code can be seen by clicking the spoiler tag below.

## Benchmarking code

```
from functools import partial
from multiprocessing import Pool, cpu_count
from time import time
import numpy as np
import matplotlib.pyplot as plt
def calc_orig(data):
"""Original implementation"""
n, d = data.shape
results = np.empty((n,d))
for i in range(n):
for j in range(d):
a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,d)])
aux = np.apply_along_axis(np.array2string,1, data[:,a_ind])
peers_data = data[aux==aux[i],j]
results[i,j] = np.sum(peers_data)
return results
def calc_inner_loop_optimized(data):
"""Optimized code within inner loop"""
n, d = data.shape
results = np.empty((n,d))
for i in range(n):
for j in range(d):
a_ind = np.concatenate([np.arange(0,j), np.arange(j+1,d)])
comparisons = np.all(data[:,a_ind] == data[i,a_ind], axis=1)
results[i,j] = np.sum(comparisons)
return results
def calc_optimized(data):
"""Optimized code, including extractions from loop"""
n, d = data.shape
a_indices = np.array([
np.concatenate([np.arange(0,j), np.arange(j+1,d)])
for j in range(d)
])
results = np.empty((n,d))
for i in range(n):
aux = data == data[i]
for j, a_ind in enumerate(a_indices):
comparisons = np.all(aux[a_ind], axis=1)
results[i,j] = np.sum(comparisons)
return results
def _parallel_helper(i, data, a_indices):
"""Helper function to perform single run for parallelization"""
n, d = data.shape
aux = data == data[i]
results = np.empty(d)
for j, a_ind in enumerate(a_indices):
comparisons = np.all(aux[a_ind], axis=1)
results[j] = np.sum(comparisons)
def calc_parallel(data, n_procs=cpu_count()):
"""Parallel implementation"""
n, d = data.shape
a_indices = np.array([
np.concatenate([np.arange(0,j), np.arange(j+1,d)])
for j in range(d)
])
parallel_func = partial(_parallel_helper, data=data, a_indices=a_indices)
with Pool(n_procs) as p:
result = p.map(parallel_func, range(n))
return np.array(result)
if __name__ == '__main__':
magnitudes = range(1, 6)
numbers = [1, 2, 5]
N = [n * 10**m for m in magnitudes for n in numbers]
functions = {
'original': calc_orig,
'inner loop optimized': calc_inner_loop_optimized,
'fully optimized': calc_optimized,
'parallel (4)': partial(calc_parallel, n_procs=4),
'parallel (8)': partial(calc_parallel, n_procs=8),
'parallel (12)': partial(calc_parallel, n_procs=12),
}
durations = {name: np.full(len(N), np.nan) for name in functions.keys()}
skip = {name: False for name in functions.keys()}
for idx, n in enumerate(N):
print(n)
data = np.random.randn(n, 9)
for name, func in functions.items():
if skip[name]:
continue
start = time()
func(data)
end = time()
durations[name][idx] = end-start
print(f' {name}: {durations[name][idx]}')
if durations[name][idx] > 100:
skip[name] = True
fig, ax = plt.subplots()
ax.set_title('Speed comparison of implementations')
ax.set_xscale('log')
ax.set_xlabel('$n$')
ax.set_yscale('log')
ax.set_ylabel('time (s)')
for name, duration in durations.items():
ax.plot(N, duration, marker='+', ms=12, label=name)
ax.legend(loc='lower right')
ax.grid(which='major', color='black', alpha=.5, linestyle='-')
ax.grid(which='minor', color='black', alpha=.25, linestyle=':')
fig.savefig('case-study-1-benchmark.png') # or .pdf or .svg
```

The log-log plot shows nicely that with each optimization step, we’re able
to process more rows of data in the same amount of time. The parallel
implementations are a slight exception to that rule when the number of
rows `n`

is less than 10,000. In those cases the constant runtime overhead
of about one second means they’re actually slower than the fully optimized
serial implementation.

For the parallel implementation, I’ve run it with a pool of 4, 8 and 12
processes on my 12-thread CPU. Similarly to inner loop optimization in step 4,
we don’t see the speedup we would expect, but at least using more processes
(up to `n_procs=multiprocessing.cpu_count()`

) does reduce the runtime. Again,
I can only speculate why we don’t get the speedup we might expect ¯\_(ツ)_/¯

## Conclusion

And so we’ve gone from an **R**-inspired string-based implementation past a
clean and minimal pure-Python one all the way to a fully optimized and
parallelized implementation that can scale over many cores. I’ve had a lot of
fun figuring out all these optimizations, and I hope you enjoyed coming along
on this optimization journey with me.

Finally, I would like to thank my colleague for being open to having me tinker on their code.