## 10.3 Coding Parallel Algorithms

```
date: "Block 10"
author: "Daniel Lawson"
email: dan.lawson@bristol.ac.uk
output: html_document
version: 2.0.0
```

# 0. Introduction

Here we will put to practice the ideas described in the lecture, notably:

* Vectorisation,
* Native Accumulation,
* The **Map** function,
* The **Reduce** function, and its **accumulate** variant,
* The **multiprocessing** environment for parallelisation.

Until the final section, everything is only about preparing for parallelism. **Parallel code in python is non-trivial** because it needs to access additional cores that it may not have permission for. There are many different implementations that variously:

* Don't work across operating systems,
* Don't work in Jupyter but work in native python,
* Work in python 2/3 but not vice versa,
* Any combination of the above.

As a rule, **you should not include parallel code inside a notebook** because it is not likely to be cross-platform. By nature of needing to paralellise, it is worth outsourcing important parallel code to a script and running this remotely.

You will find that everything here **works on Notable via Blackboard**. I have also provided script files to complete the same parallelisation in Windows via native python. However the script solution we identify is adequate and is the recommended solution, so if your project group wants to work with parallel code and anyone runs Windows, **This Is The Way**.

## 0.1 Prerequisites:

In [1]:
!pip install numpy



In [2]:
!pipreqsnb --savepath requirements-block10.txt block10-CodingParallelAlgorithms.ipynb

pipreqs  --savepath requirements-block10.txt /Users/madjl/teach/dst-private/Workshops/__temp_pipreqsnb_folder
INFO: Successfully saved requirements file in requirements-block10.txt


In [3]:
print("\nREQUIREMENTS:\n")
with open('requirements-block10.txt', 'r') as f:
    print(f.read())


REQUIREMENTS:

numpy==1.24.4



## 1. Vectorisation

Vectorisation is straightforward in python (and R). You should always try to code with vectors/arrays, though For loops are sometimes necessary.

In python:

* For loops aren't intrinsically worse, but they encourage poor coding practice.

In R: 

* For loops are very inefficient. For efficiency you may have to go out of your way to vectorise.

In C/C++:

* OpenMP allows for loops to be parallelised without any additional effort - just remember to avoid using the results of previous loops.
* Modern updates (from C++-11) include many explicit vectorisations, allowing map/reduce vectorisations to be exploited directly.

### 1.1 Arrays in Python

First, an array representation reminder in python:

In [4]:
import numpy as np

In [5]:
a = np.array([[1,2],[3,4]])
b = np.array([[1,1],[1,1]])
print("a =")
print(a)
print("b =")
print(b)

a =
[[1 2]
 [3 4]]
b =
[[1 1]
 [1 1]]


In [6]:
# substraction and addition

print("a + b =")
print(a + b)
print("a - b =")
print(a - b)

a + b =
[[2 3]
 [4 5]]
a - b =
[[0 1]
 [2 3]]


In [7]:
# Element wise multiplication
print("a * b =")
print(a*b)

a * b =
[[1 2]
 [3 4]]


In [8]:
# Matrix multiplication
print("a@b =")
print(a@b)
print("np.dot(a,b) =")
print(np.dot(a,b))

a@b =
[[3 3]
 [7 7]]
np.dot(a,b) =
[[3 3]
 [7 7]]


In [9]:
# numpy array with one row
c =  np.array([1,2,3])
print("c.shape =")
print(c.shape)
# numpy array with three rows
d = np.array([[1],[2],[3]])
print("d.shape")
print(d.shape)

c.shape =
(3,)
d.shape
(3, 1)


In [10]:
# Define an 3x3 2d array
e = np.array([[1,2,3],[4,5,6],[7,8,9]])
print(e)
print("first element in the array, e[0,0] =")
print(e[0,0])
print("first row of the array. e[0,:] =")
print(e[0,:])
print("second column of the array. e[:,1] =")
print(e[:,1])

[[1 2 3]
 [4 5 6]
 [7 8 9]]
first element in the array, e[0,0] =
1
first row of the array. e[0,:] =
[1 2 3]
second column of the array. e[:,1] =
[2 5 8]


### 1.2 Comparing loops to vectorisations

Now we'll make a simulation to compare between vectorised and non-vectorised code. This is just a simple matrix times vector computation.


In [11]:
#create some test data and simulate results
N=100000 # Number of rows
K=200 # Number of columns
X = np.random.randn(N,K)
W = np.random.rand(K)

In [12]:
def matrix_times_vector_forloop(X,W):
    N,K=X.shape
    # Initialize theta
    forloop = []
    for i in range(N):
        hypo_i = 0
        for j in range(K):
            hypo_i += W[j]*X[i,j]
        forloop.append(hypo_i)
    return(forloop)

In [13]:
%%time
forloop=matrix_times_vector_forloop(X,W)

CPU times: user 3.04 s, sys: 14.9 ms, total: 3.05 s
Wall time: 3.05 s


In [14]:
%%time
# matrix format
vect = X@W

CPU times: user 33.8 ms, sys: 1.21 ms, total: 35 ms
Wall time: 9.11 ms


In [15]:
## Check the answers
print("for loop")
print(forloop[0:4])
print("vectorised")
print(vect[0:4])

for loop
[2.3599293334091396, -4.229230648988901, -0.6460015101907467, -6.515508636426773]
vectorised
[ 2.35992933 -4.22923065 -0.64600151 -6.51550864]


CHALLENGE:  Make a plot of how the two approaches change in performance as a function of N (and/or K). What is the computational scaling?

See https://towardsdatascience.com/vectorization-implementation-in-machine-learning-ca652920c55d for an example that is much more extreme.

### 1.3 Accumulate example

Here we accumulate, i.e. add up all of the values in a vector, either in a vectorised or loop way.

In [16]:
seq = np.random.randint(0, 100, size=5000000)
seq.shape

(5000000,)

In [17]:
def cumsum_diff_with_accumulate(x):
     x = np.asarray(x)
     return np.add.accumulate(x)[-1]
def cumsum_diff(x):
     sum_px = x[0]
     for px in x[1:]:
         sum_px = sum_px + px
     return sum_px

In [18]:
%%time
cumsum_diff(seq)

CPU times: user 343 ms, sys: 282 ms, total: 626 ms
Wall time: 249 ms


247451092

In [19]:
%%time
cumsum_diff_with_accumulate(seq)

CPU times: user 11.7 ms, sys: 4.48 ms, total: 16.1 ms
Wall time: 14.5 ms


247451092

## 2 Mapping and Reducing

### 2.1 Map in Python

Mapping with python (and R) is straightfoward. 

Source document: http://chryswoods.com/parallel_python/map.html

In [20]:
import math

def calc_distance(point1, point2):
    """
    Function to calculate and return the distance between
    two points
    """
    
    dx2 = (point1[0] - point2[0]) ** 2
    dy2 = (point1[1] - point2[1]) ** 2
    dz2 = (point1[2] - point2[2]) ** 2
    return math.sqrt(dx2 + dy2 + dz2)

In [21]:
points1 = [(1.0,1.0,1.0), (2.0,2.0,2.0), (3.0,3.0,3.0)]
points2 = [(4.0,4.0,4.0), (5.0,5.0,5.0), (6.0,6.0,6.0)]

distances = map(calc_distance, points1, points2)

print(distances)

<map object at 0x107e36290>


Python has not done the calculation! Instead it returns an object (an iterator) that would evaluate this computation. But it does so lazily.  To get the answer, we must use the result, for example, by coercing it to a list.

This behaviour is **standard** in parallel processing environments, in which the computation may be performed remotely and there may be additional remote computations to perform. By **caching** the computation, the software environment can sometimes obtain massive efficiency gains.

This is how we get the answer:

In [22]:
print(list(distances))

[5.196152422706632, 5.196152422706632, 5.196152422706632]


Another example, this time with multiple arguments.

In [23]:
def find_smallest(arg1, arg2, arg3):
    """
    Function used to return the smallest value out 
    of 'arg1', 'arg2' and 'arg3'
    """

    return min(arg1, min(arg2, arg3))

a = [1, 2, 3, 4, 5]
b = [5, 4, 3, 2, 1]
c = [1, 2, 1, 2, 1]

result = map(find_smallest, a, b, c)

In [24]:
list(result)

[1, 2, 1, 2, 1]

CHALLENGE: Generalise calc_distance so that it can accept points in any numbers of dimensions.  Generalise find_smallest so that it can accept any number of arguments.

### 2.2 Reduce in Python

This is also straightfoward, and exists anagously in R.

See http://chryswoods.com/parallel_python/reduce.html

Lets start by defining a mapping problem. We will need to define the **add** operation as a **function**, so that we can call it from map and reduce.

In [25]:
def add(x, y):
    """Function to return the sum of x and y"""
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

result = map(add, a, b)

print(list(result))

[7, 9, 11, 13, 15]


This is how we use reduce to automatically apply addition to the entire set of points.

In [26]:
from functools import reduce

result = map(add, a, b)

total = reduce(add, result)

print(total)

55


Reduce has an optional third argument which is the initial value that is used as the first value for the reduction.

In [27]:
result = map(add, a, b)
total = reduce(add, result, 10)
print(total)

65


The standard "reduce" function does *not* do anything clever with the computation tree. It simply evaluates the reduction using the sequential definition. That means that it does *not* assume commutivity, which means it can be used with other operations:

In [28]:
def join_strings(x, y):
    return "%s %s" % (x,y)
c = ["cat", "dog", "mouse", "fish"]

result = reduce(join_strings, c)
print(result)

cat dog mouse fish


### 2.2.1 accumulate vs reduce in python

Python defines **reduce** to give only the final answer, whereas **accumulate** gives the running total as a list (via an iterator, like map).

This is not a universally recognised separation!

In [29]:
from itertools import accumulate

result = map(add, a, b)
total = accumulate(result, add)
print(list(total))

[7, 16, 27, 40, 55]


In [30]:
print(list(accumulate(c, join_strings)))

['cat', 'cat dog', 'cat dog mouse', 'cat dog mouse fish']


## 3 parallel implementation

multiprocessing python code has to be written into a text file and executed using the python interpreter. It is not recommended to try to run a multiprocessing python script interactively, e.g. via ipython or ipython notebook.

This is because the required resources (CPUs) have to be requested from the system and appropriately returned, and the libraries are not reliable across platforms.

(it seems to work on linux and mac, but not in windows https://stackoverflow.com/questions/37103243/multiprocessing-pool-in-jupyter-notebook-works-on-linux-but-not-windows)

See http://chryswoods.com/parallel_python/multiprocessing.html

Multiprocessing achieves parallelism by running multiple copies of your script, but it forces you to write it in a particular way. All imports should be at the top of the script, followed by all function and class definitions. This is to ensure that all copies of the script have access to the same modules, functions and classes. Then, you should ensure that only the master copy of the script runs the code by protecting it behind an ``if __name__ == "__main__"`` statement.

In [31]:
import multiprocessing

In [32]:
print(multiprocessing.cpu_count())

8


### 3.1 Testing if parallelisation works within Jupyter

If the following works you should be ok in the notebook; otherwise you will be running via external %run commands, as we do by default.

**HEALTH WARNING: The following code willl CRASH your python kernel in Windows and MacOS Big Sur, which will need RESTARTING! So its commented out.**

In [33]:
## this is a test function, reporting happens further down.
#from multiprocessing import Pool
#def f(x):
#    return x**2
#if __name__ == "__main__":
#    pool = Pool(4)
#    pool.map(f,range(10))[-1]

### 3.2 Parallelisation for Map-Reduce

Here is how we can do a calculation, by writing a script to a file and then running it.

The script illustrates the key points:

* Distributing compute over cores
* Detecting the CPU architecture/count
* Ensuring the compute is parallelised
* Performing process-specific evaluation

In [34]:
f = open("10.3-pool-multiprocessing.py", "w")
f.write("""
from functools import reduce
from multiprocessing import Pool, cpu_count, current_process
from time import sleep

def square(x):
    \"\"\"Function to return the square of the argument\"\"\"
    print("Worker %s calculating square of %s" % (current_process().pid, x))
    sleep(0.01)
    return x * x

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())
    # create a pool of workers
    # start all worker processes
    pool = Pool(processes= cpu_count())
    # create an array of integers, from 1 to N
    r = range(1, N+1)
    result = pool.map(square, r)

    total = reduce(lambda x, y: x + y, result)

    print("The sum of the square of the first %s integers is %s" % (N, total))
""")
f.close()
N=50
%run -i 10.3-pool-multiprocessing.py

Number of cores available equals 8
The sum of the square of the first 50 integers is 42925


**Some notes:**

1. Annoyingly, `%run` doesn't always print to the console. If you re-run it, you will get the output as expected from the worker threads.

2. Note that here we could have used `%run` (without `-i`) which would **not** have access to anything defined in the jupyter notebook (here the parameter N=50). To run it interactively, and give it access to everything in memory, use `%run -i` (see https://ipython.readthedocs.io/en/stable/interactive/magics.html).

On the plus side, everything that was created in the script is available inside of jupyter:

In [36]:
print(square(2))
print("The sum of the square of the first %s integers is %s" % (N, total))

Worker 82734 calculating square of 2
4
The sum of the square of the first 50 integers is 42925


If you've written code externally, this is how you can examine it:

In [37]:
# %pycat 10.3-pool-multiprocessing.py

In parallel functions, only the main process has access to the "__main__" function, so the remaining processes need to be told about any functions they need before we start, or any intermediate results, explicitly.

The following script will **not work** because the "myfunction" function is created inside the "__main__" thread only:

In [38]:
f = open("10.3-pool-multiprocessing-bad.py", "w")
f.write("""
from functools import reduce
from multiprocessing import Pool, cpu_count, current_process

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())
    N=50
    # create a pool of workers
    # start all worker processes
    pool = Pool(processes= cpu_count())
    def myfunction(x):
        print(x)
        return x * x
    # create an array of integers, from 1 to N
    r = range(1, N+1)
    result = pool.map(myfunction, r)

    total = reduce(lambda x, y: x + y, result)

    print("The sum of the square of the first %s integers is %s" % (N, total))
""")
f.close()
## Don't run as it can mess up your processing environment
## %run 10.3-pool-multiprocessing-bad.py

### 3.3 Controlling the order of computations

What did you notice about the order of computation and printing?

In general it is a really bad idea to assume that printing appears to the screen in the correct order!

(NB: Christopher's code used "with Pool(processes=nprocs) as pool" which didn't work for me due to python version issues. Multicore processing is still in active development....)

When you can change the code above to use a function defined inside the __main__ loop. This **hangs**  because the worker nodes can't see it!  **CAREFUL** with this sort of thing.

However, we can reuse our pool of processes in a straightforward way.

We are also going to try using the `%%capture` jupyter magic (official phrase!) to capture the main output. This allows us to separate the multithreaded output (which is at best confusing) from the main output:

In [39]:
%%capture captured
f = open("10.3-pool-twice.py", "w")
f.write("""
from functools import reduce
from multiprocessing import Pool, cpu_count, current_process
from time import sleep

def square(x):
    \"\"\"Function to return the square of the argument\"\"\"
    print("Worker %s calculating square of %s" % (current_process().pid, x))
    sleep(0.01)
    return x * x

def cube(x):
    \"\"\"Function to return the cube of the argument\"\"\"
    print("Worker %s calculating cube of %s" % (current_process().pid, x))
    return x * x * x

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())
    N=100
    # create a pool of workers
    # start all worker processes
    pool = Pool(processes= cpu_count()) ## THIS is where all of the memory state is 
    ## created and all of the processes "know about" everything above. So they "know" N
    ## and hence all compute their own version of r correctly.
    
    # create an array of 5000 integers, from 1 to N 
    r = range(1, N+1)
    squares = pool.map(square, r)
    totalsquares = reduce(lambda x, y: x + y, squares)
    print("The sum of the square of the first %s integers is %s" % (N, totalsquares))
    cubes = pool.map(cube, r)
    totalcubes = reduce(lambda x, y: x + y, cubes)
    print("The sum of the cube of the first %s integers is %s" % (N, totalcubes))
""")
f.close()
%run -i 10.3-pool-twice.py

Worker 83039 calculating square of 5
Worker 83039 calculating square of 6
Worker 83039 calculating square of 19
Worker 83039 calculating square of 20
Worker 83039 calculating square of 43
Worker 83039 calculating square of 44
Worker 83036 calculating square of 1
Worker 83036 calculating square of 2
Worker 83036 calculating square of 17
Worker 83036 calculating square of 18
Worker 83036 calculating square of 39
Worker 83036 calculating square of 40
Worker 83040 calculating square of 3
Worker 83040 calculating square of 4
Worker 83040 calculating square of 23
Worker 83040 calculating square of 24
Worker 83040 calculating square of 45
Worker 83040 calculating square of 46
Worker 83035 calculating square of 7
Worker 83035 calculating square of 8
Worker 83035 calculating square of 25
Worker 83035 calculating square of 26
Worker 83035 calculating square of 41
Worker 83035 calculating square of 42
Worker 83042 calculating square of 9
Worker 83042 calculating square of 10
Worker 83042 calculat

In [40]:
captured()

Number of cores available equals 8
The sum of the square of the first 100 integers is 338350
The sum of the cube of the first 100 integers is 25502500


Notice the **order** of these outputs can vary quite considerably...

### 3.4 multiple inputs to map using starmap

To use mapping on multiple inputs, we have to either:

* create a tuple of the arguments
* or pass it through using **zip** and switch from the **map** to **starmap**

In [41]:
f = open("10.3-pool-starmap.py", "w")
f.write("""
from multiprocessing import Pool

def add(x, y):
    \"\"\"Return the sum of the tuple of two arguments\"\"\"
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.starmap(add, zip(a,b))

    print(result)
""")
f.close()
%run -i 10.3-pool-starmap.py

Worker 29528 calculating square of 21
Worker 29528 calculating square of 22
Worker 29528 calculating square of 23
Worker 29528 calculating square of 24
Worker 29528 calculating square of 61
Worker 29528 calculating square of 62
Worker 29528 calculating square of 63
Worker 29528 calculating square of 64
Worker 29528 calculating square of 85
Worker 29528 calculating square of 86
Worker 29528 calculating square of 87
Worker 29528 calculating square of 88
Worker 29528 calculating cube of 5
Worker 29528 calculating cube of 6
Worker 29528 calculating cube of 7
Worker 29528 calculating cube of 8
Worker 29528 calculating cube of 37
Worker 29528 calculating cube of 38
Worker 29528 calculating cube of 39
Worker 29528 calculating cube of 40
Worker 29528 calculating cube of 69
Worker 29528 calculating cube of 70
Worker 29528 calculating cube of 71
Worker 29528 calculating cube of 72
Worker 29526 calculating square of 17
Worker 29526 calculating square of 18
Worker 29526 calculating square of 19
Wo

Again using `%%capture`:

In [42]:
%%capture captured
%run -i 10.3-pool-starmap.py

In [43]:
captured()

[7, 9, 11, 13, 15]


There are other implementation niggles, such as support for lambda functions (which is missing) etc. These may be addressed in newer versions or alternative packages.



## 4. asynchronous functions

We saw above that the "print" statements were out of order. This is because the threads had to "race" to collect the next job, and also race to print to the screen. There is only one job queue and one screen.

The following script performs jobs asynchronously. You should see that there are 3 workers, which complete one task before taking the next.

In [44]:
f = open("10.3-pool-slow.py", "w")
f.write("""
import time
from multiprocessing import Pool, current_process

def slow_function(nsecs):
    \"\"\"
    Function that sleeps for 'nsecs' seconds, returning
    the number of seconds that it slept
    \"\"\"
    print("Process %s going to sleep for %s second(s)" % (current_process().pid, nsecs))
    # use the time.sleep function to sleep for nsecs seconds
    time.sleep(nsecs)
    print("Process %s waking up" % current_process().pid)
    return nsecs

if __name__ == "__main__":
    print("Master process is PID %s" % current_process().pid)

    with Pool(3) as pool:
        r = pool.map(slow_function, [8,2,3,4,5,6,7])

    print("Result is %s" % r)
""")
f.close()
%run -i 10.3-pool-slow.py

Master process is PID 82734
Process 29568 going to sleep for 3 second(s)
Process 29568 waking up
Process 29568 going to sleep for 5 second(s)
Process 29568 waking up
Process 29566 going to sleep for 8 second(s)
Process 29566 waking up
Process 29566 going to sleep for 7 second(s)
Process 29566 waking up
Process 29567 going to sleep for 2 second(s)
Process 29567 waking up
Process 29567 going to sleep for 4 second(s)
Process 29567 waking up
Process 29567 going to sleep for 6 second(s)
Process 29567 waking up
Result is [8, 2, 3, 4, 5, 6, 7]


Sometimes we don't want to wait for a computation to be completed before distributing some other computation. 

The async versions of map, apply, etc return immediately, providing a "future" object. These are evaluated asyncyronously. They can be blocked (here via "wait") and allow a [callback](https://docs.python.org/3.8/library/multiprocessing.html?highlight=map_async#multiprocessing.pool.Pool.map_async) which runs on completion.

In [45]:
f = open("10.3-pool-async.py", "w")
f.write("""
import time
from multiprocessing import Pool, current_process

def slow_function(nsecs):
    \"\"\"
    Function that sleeps for 'nsecs' seconds, returning
    the number of seconds that it slept
    \"\"\"
    print("Process %s going to sleep for %s second(s)" % (current_process().pid, nsecs))
    # use the time.sleep function to sleep for nsecs seconds
    time.sleep(nsecs)
    print("Process %s waking up" % current_process().pid)
    return nsecs

if __name__ == "__main__":
    print("Master process is PID %s" % current_process().pid)
    r=[]
    with Pool(3) as pool:
        print("Starting r1 pool")
        r1 = pool.map_async(slow_function, [1,2,3,4,5],callback=r.extend)
        # r1 starts first, finishes last
        
        ## Compare the following:
        # r1.wait() # Uncomment this
        print("Starting r2 pool")
        r2 = pool.map_async(slow_function, [0.5,0.5,0.5,0.5,0.5],callback=r.extend)

        ## With this:
        r1.wait() # Comment out this 
        
        r2.wait()
        print("Result one is %s" % r1.get())
        print("Result two is %s" % r2.get())
        print("Result appended is %s" % r)
""")
f.close()
%run -i 10.3-pool-async.py

Master process is PID 82734
Starting r1 pool
Starting r2 pool
Result one is [1, 2, 3, 4, 5]
Result two is [0.5, 0.5, 0.5, 0.5, 0.5]
Result appended is [0.5, 0.5, 0.5, 0.5, 0.5, 1, 2, 3, 4, 5]
Process 29835 going to sleep for 3 second(s)
Process 29835 waking up
Process 29835 going to sleep for 0.5 second(s)
Process 29835 waking up
Process 29835 going to sleep for 0.5 second(s)
Process 29835 waking up
Process 29835 going to sleep for 0.5 second(s)
Process 29835 waking up
Process 29835 going to sleep for 0.5 second(s)
Process 29835 waking up
Process 29832 going to sleep for 1 second(s)
Process 29832 waking up
Process 29832 going to sleep for 4 second(s)
Process 29832 waking up
Process 29832 going to sleep for 0.5 second(s)
Process 29832 waking up
Process 29836 going to sleep for 2 second(s)
Process 29836 waking up
Process 29836 going to sleep for 5 second(s)
Process 29836 waking up


#### Asynchronous computation 

Here we encountered "futures" which are computations that may not have completed and therefore may not be available.

Futures are a very common variable type in parallel programming across many languages. Futures provide several common functions;

* Block (wait) until the result is available. In multiprocessing, this is via the .wait() function, e.g. r1.wait() in the above script.
* Retrieve the result when it is available (blocking until it is available). This is the .get() function, e.g. r1.get().
* Test whether or not the result is available. This is the .ready() function, which returns True when the asynchronous function has finished and the result is available via .get().
* Test whether or not the function was a success, e.g. whether or not an exception was raised when running the function. This is the .successful() function, which returns True if the asynchronous function completed without raising an exception. Note that this function should only be called after the result is available (e.g. when .ready() returns True).

## Additional Information

There are additional ways to help parallelisation be efficient. One is the idea of "chunksize", or how many commands get sent to each worker; it is a parameter of starmap/map.