## 10.2 Coding Parallel Algorithms

```
date: "Block 10"
author: "Daniel Lawson"
email: dan.lawson@bristol.ac.uk
output: html_document
version: 1.0.2
```

Here we will put to practice the ideas described in the lecture, notably:

* Vectorisation,
* Native Accumulation,
* The **Map** function,
* The **Reduce** function, and its **accumulate** variant,
* The **multiprocessing** environment for parallelisation.

Until the final section, everything is only about preparing for parallelism. **Parallel code in python is non-trivial** because it needs to access additional cores that it may not have permission for. There are many different implementations that variously:

* Don't work across operating systems,
* Don't work in Jupyter but work in native python,
* Work in python 2/3 but not vice versa,
* Any combination of the above.

As a rule, **you should not include parallel code inside a notebook** because it is not likely to be cross-platform. By nature of needing to paralellise, it is worth outsourcing important parallel code to a script and running this remotely.

You will find that everything here **works on Notable via Blackboard**. I have also provided script files to complete the same parallelisation in Windows via native python. However the script solution we identify is adequate and is the recommended solution, so if your project group wants to work with parallel code and anyone runs Windows, **This Is The Way**.

### Part 1: Vectorisation

Vectorisation is straightforward in python (and R). You should always try to code with vectors/arrays, though For loops are sometimes necessary.

In python:

* For loops aren't intrinsically worse, but they encourage poor coding practice.

In R: 

* For loops are very inefficient. For efficiency you may have to go out of your way to vectorise.

In C/C++:

* OpenMP allows for loops to be parallelised without any additional effort - just remember to avoid using the results of previous loops.
* Modern updates (from C++-11) include many explicit vectorisations, allowing map/reduce vectorisations to be exploited directly.

First, an array representation reminder in python:

In [2]:
import numpy as np

In [3]:
a = np.array([[1,2],[3,4]])
b = np.array([[1,1],[1,1]])
print("a =")
print(a)
print("b =")
print(b)

a =
[[1 2]
 [3 4]]
b =
[[1 1]
 [1 1]]


In [4]:
# substraction and addition

print("a + b =")
print(a + b)
print("a - b =")
print(a - b)

a + b =
[[2 3]
 [4 5]]
a - b =
[[0 1]
 [2 3]]


In [5]:
# Element wise multiplication
print("a * b =")
print(a*b)

a * b =
[[1 2]
 [3 4]]


In [6]:
# Matrix multiplication
print("a@b =")
print(a@b)
print("np.dot(a,b) =")
print(np.dot(a,b))

a@b =
[[3 3]
 [7 7]]
np.dot(a,b) =
[[3 3]
 [7 7]]


In [7]:
# numpy array with one row
c =  np.array([1,2,3])
print("c.shape =")
print(c.shape)
# numpy array with three rows
d = np.array([[1],[2],[3]])
print("d.shape")
print(d.shape)

c.shape =
(3,)
d.shape
(3, 1)


In [8]:
# Define an 3x3 2d array
e = np.array([[1,2,3],[4,5,6],[7,8,9]])
print(e)
print("first element in the array, e[0,0] =")
print(e[0,0])
print("first row of the array. e[0,:] =")
print(e[0,:])
print("second column of the array. e[:,1] =")
print(e[:,1])

[[1 2 3]
 [4 5 6]
 [7 8 9]]
first element in the array, e[0,0] =
1
first row of the array. e[0,:] =
[1 2 3]
second column of the array. e[:,1] =
[2 5 8]


Now we'll make a simulation to compare between vectorised and non-vectorised code. This is just a simple matrix times vector computation.


In [9]:
#create some test data and simulate results
N=100000# Number of rows
K=100 # Number of columns
X = np.random.randn(N,K)
W = np.random.rand(K)

In [10]:
def matrix_times_vector_forloop(X,W):
    N,K=X.shape
    # Initialize theta
    forloop = []
    for i in range(N):
        hypo_i = 0
        for j in range(K):
            hypo_i += W[j]*X[i,j]
        forloop.append(hypo_i)
    return(forloop)

In [11]:
%%time
forloop=matrix_times_vector_forloop(X,W)

CPU times: user 4.01 s, sys: 17.5 ms, total: 4.03 s
Wall time: 4.03 s


In [12]:
%%time
# matrix format
vect = X@W

CPU times: user 12.9 ms, sys: 3.41 ms, total: 16.3 ms
Wall time: 4.2 ms


In [13]:
## Check the answers
print("for loop")
print(forloop[0:4])
print("vectorised")
print(vect[0:4])

for loop
[-7.071205071734369, -0.8932577605479217, -6.821716607725058, 6.532161564877307]
vectorised
[-7.07120507 -0.89325776 -6.82171661  6.53216156]


CHALLENGE:  Make a plot of how the two approaches change in performance as a function of N (and/or K). What is the computational scaling?

See https://towardsdatascience.com/vectorization-implementation-in-machine-learning-ca652920c55d for an example that is much more extreme.

### Accumulate example

In [14]:
seq = np.random.randint(0, 100, size=1000000)
seq.shape

(1000000,)

In [15]:
def cumsum_diff_with_accumulate(x):
     x = np.asarray(x)
     return np.add.accumulate(x)[-1]
def cumsum_diff(x):
     sum_px = x[0]
     for px in x[1:]:
         sum_px = sum_px + px
     return sum_px

In [16]:
%%time
cumsum_diff(seq)

CPU times: user 202 ms, sys: 1.84 ms, total: 204 ms
Wall time: 203 ms


49484814

In [17]:
%%time
cumsum_diff_with_accumulate(seq)

CPU times: user 6.37 ms, sys: 5.56 ms, total: 11.9 ms
Wall time: 8.85 ms


49484814

### Part 2

Mapping python (and R) is straightfoward. 

Source document: http://chryswoods.com/parallel_python/map.html

In [18]:
import math

def calc_distance(point1, point2):
    """
    Function to calculate and return the distance between
    two points
    """
    
    dx2 = (point1[0] - point2[0]) ** 2
    dy2 = (point1[1] - point2[1]) ** 2
    dz2 = (point1[2] - point2[2]) ** 2
    return math.sqrt(dx2 + dy2 + dz2)

In [19]:
points1 = [(1.0,1.0,1.0), (2.0,2.0,2.0), (3.0,3.0,3.0)]
points2 = [(4.0,4.0,4.0), (5.0,5.0,5.0), (6.0,6.0,6.0)]

distances = map(calc_distance, points1, points2)

print(distances)

<map object at 0x7ff0681e8250>


Python has not done the calculation! Instead it returns an object (an iterator) that would evaluate this computation. But it does so lazily.  To get the answer, we must use the result, for example, by coercing it to a list.

This behaviour is **standard** in parallel processing environments, in which the computation may be performed remotely and there may be additional remote computations to perform. By caching the computation, the software environment can sometimes obtain massive efficiency gains.

This is how we get the answer:

In [20]:
print(list(distances))

[5.196152422706632, 5.196152422706632, 5.196152422706632]


Another example, this time with multiple arguments.

In [21]:
def find_smallest(arg1, arg2, arg3):
    """
    Function used to return the smallest value out 
    of 'arg1', 'arg2' and 'arg3'
    """

    return min(arg1, min(arg2, arg3))

a = [1, 2, 3, 4, 5]
b = [5, 4, 3, 2, 1]
c = [1, 2, 1, 2, 1]

result = map(find_smallest, a, b, c)

In [22]:
list(result)

[1, 2, 1, 2, 1]

CHALLENGE: Generalise calc_distance so that it can accept points in any numbers of dimensions.  Generalise find_smallest so that it can accept any number of arguments.

## Part 3: Reducing

This is also straightfoward, and exists anagously in R.

See http://chryswoods.com/parallel_python/reduce.html

Lets start by defining a mapping problem

In [23]:
def add(x, y):
    """Function to return the sum of x and y"""
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

result = map(add, a, b)

print(list(result))

[7, 9, 11, 13, 15]


This is how we use reduce to automatically apply addition to the entire set of points.

In [24]:
?add

In [25]:
from functools import reduce

result = map(add, a, b)

total = reduce(add, result)

print(total)

55


Reduce has an optional third argument which is the initial value that is used as the first value for the reduction.

In [26]:
result = map(add, a, b)
total = reduce(add, result, 10)
print(total)

65


The standard "reduce" function does *not* do anything clever with the computation tree. It simply evaluates the reduction using the sequential definition. That means that it does *not* assume commutivity, which means it can be used with other operations:

In [27]:
def join_strings(x, y):
    return "%s %s" % (x,y)
c = ["cat", "dog", "mouse", "fish"]

result = reduce(join_strings, c)
print(result)

cat dog mouse fish


### accumulate vs reduce in python

Python defines **reduce** to give only the final answer, whereas **accumulate** gives the running total as a list (via an iterator, like map).

This is not a universally recognised separation!

In [28]:
from itertools import accumulate

result = map(add, a, b)
total = accumulate(result, add)
print(list(total))

[7, 16, 27, 40, 55]


In [29]:
print(list(accumulate(c, join_strings)))

['cat', 'cat dog', 'cat dog mouse', 'cat dog mouse fish']


## Part 4: parallel implementation

multiprocessing python code has to be written into a text file and executed using the python interpreter. It is not recommended to try to run a multiprocessing python script interactively, e.g. via ipython or ipython notebook.

This is because the required resources (CPUs) have to be requested from the system and appropriately returned, and the libraries are not reliable across platforms.

(it seems to work on linux and mac, but not in windows https://stackoverflow.com/questions/37103243/multiprocessing-pool-in-jupyter-notebook-works-on-linux-but-not-windows)

See http://chryswoods.com/parallel_python/multiprocessing.html

Multiprocessing achieves parallelism by running multiple copies of your script, it forces you to write it in a particular way. All imports should be at the top of the script, followed by all function and class definitions. This is to ensure that all copies of the script have access to the same modules, functions and classes. Then, you should ensure that only the master copy of the script runs the code by protecting it behind an ``if __name__ == "__main__"`` statement.

In [30]:
import multiprocessing

In [31]:
print(multiprocessing.cpu_count())

8


### Test function for multiprocessing functionality within Jupyter:

If the following works you should be ok in the notebook; otherwise switch to the scripts, which are run from within jupyter below.

**HEALTH WARNING: The following code willl CRASH your python kernel in Windows and MacOS Big Sur, which will need RESTARTING!**

In [None]:
%%time
## this is a test function, reporting happens further down.
from multiprocessing import Pool
def f(x):
    return x**2
if __name__ == "__main__":
    pool = Pool(4)
    pool.map(f,range(10))[-1]


### A Fully functional multiprocessing map and reduce example

The following script illustrates the key points:

* Distributing compute over cores
* Detecting the CPU architecture/count
* Ensuring the compute is parallelised
* Performing process-specific evaluation

Remember, don't run this if parallel in Jupyter isn't working on your machine. Skip to the line below.

In [None]:
from functools import reduce
from multiprocessing import Pool, cpu_count, current_process

def square(x):
    """Function to return the square of the argument"""
    print("Worker %s calculating square of %s" % (current_process().pid, x))
    return x * x

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())
    N=50
    # create a pool of workers
    # start all worker processes
    pool = Pool(processes= cpu_count())
    def myfunction(x):
        print(x)
    # create an array of 5000 integers, from 1 to 5000
    r = range(1, N+1)
    ## NB: TRY CHANGING square to myfunction?
    result = pool.map(square, r)

    total = reduce(lambda x, y: x + y, result)

    print("The sum of the square of the first %s integers is %s" % (N, total))


### Running a parallel script from Jupyter

The following code runs the python script `block10-parallel01.py` and **keeps the results in memory of this notebook**.

First we take a look at the code we are about to run. Remember that Jupyter provides a pleasant environment for writing scripts.

In [1]:
%pycat block10-parallel01.py

Now we  run it **interactively**, meaning that all the memory that is accessible in the notebook is accessible in your script.

In [2]:
%run -i block10-parallel01.py

Number of cores available equals 8
The sum of the square of the first 50 integers is 42925
0:00:00.129839


In [3]:
%run -i block10-parallel01.py

Number of cores available equals 8
The sum of the square of the first 50 integers is 42925
0:00:00.129861


In [4]:
print("The sum of the square of the first %s integers is %s" % (N, total))

The sum of the square of the first 50 integers is 42925


What do you notice about the order of computation and printing?

In general it is a really bad idea to assume that printing appears to the screen in the correct order!

(NB: Christopher's code used "with Pool(processes=nprocs) as pool" which didn't work for me due to python version issues. Multicore processing is still in active development....)

Note that you can change the code above to use a function defined inside the __main__ loop. This **hangs**  because the worker nodes can't see it!  **CAREFUL** with this sort of thing.

However, we can reuse our pool of processes in a straightforward way.

In [None]:
from functools import reduce
from multiprocessing import Pool, cpu_count, current_process

def square(x):
    """Function to return the square of the argument"""
    print("Worker %s calculating square of %s" % (current_process().pid, x))
    return x * x

def cube(x):
    """Function to return the cube of the argument"""
    print("Worker %s calculating cube of %s" % (current_process().pid, x))
    return x * x * x

if __name__ == "__main__":
    # print the number of cores
    print("Number of cores available equals %s" % cpu_count())
    N=5
    # create a pool of workers
    # start all worker processes
    pool = Pool(processes= cpu_count()) ## THIS is where all of the memory state is 
    ## created and all of the processes "know about" everything above. So they "know" N
    ## and hence all compute their own version of r correctly.
    
    # create an array of 5000 integers, from 1 to N 
    r = range(1, N+1)
    squares = pool.map(square, r)
    print("The sum of the square of the first %s integers is %s" % (N, totalsquares))
    cubes = pool.map(cube, r)
    print("The sum of the cube of the first %s integers is %s" % (N, totalcubes))
    totalsquares = reduce(lambda x, y: x + y, squares)
    totalcubes = reduce(lambda x, y: x + y, cubes)
 


To use mapping on multiple inputs, we have to either:

* create a tuple of the arguments
* or pass it through using **zip** and switch from the **map** to **starmap**

In [None]:
from multiprocessing import Pool

def add(x, y):
    """Return the sum of the tuple of two arguments"""
    return x + y

a = [1, 2, 3, 4, 5]
b = [6, 7, 8, 9, 10]

if __name__ == "__main__":
    with Pool() as pool:
        result = pool.starmap(add, zip(a,b))

    print(result)

There are other implementation niggles, such as support for lambda functions (which is missing) etc. These may be addressed in newer versions or alternative packages.



## Part 5: asynchronous functions

We saw above that the "print" statements were out of order. This is because the threads had to "race" to collect the next job, and also race to print to the screen. There is only one job queue and one screen.

The following script performs jobs asynchronously. You should see that there are 3 workers, which complete one task before taking the next.

In [None]:
import time
from multiprocessing import Pool, current_process

def slow_function(nsecs):
    """
    Function that sleeps for 'nsecs' seconds, returning
    the number of seconds that it slept
    """
    print("Process %s going to sleep for %s second(s)" % (current_process().pid, nsecs))
    # use the time.sleep function to sleep for nsecs seconds
    time.sleep(nsecs)
    print("Process %s waking up" % current_process().pid)
    return nsecs

if __name__ == "__main__":
    print("Master process is PID %s" % current_process().pid)

    with Pool(3) as pool:
        r = pool.map(slow_function, [8,2,3,4,5,6,7])

    print("Result is %s" % r)

In [None]:
%pycat block10-parallel02.py

In [None]:
%run -i block10-parallel02.py

Sometimes we don't want to wait for a computation to be completed before distributing some other computation. 

The async versions of map, apply, etc return immediately, providing a "future" object. These are evaluated asyncyronously. They can be blocked (here via "wait") and allow a [callback](https://docs.python.org/3.8/library/multiprocessing.html?highlight=map_async#multiprocessing.pool.Pool.map_async) which runs on completion.

In [None]:
import time
from multiprocessing import Pool, current_process

def slow_function(nsecs):
    """
    Function that sleeps for 'nsecs' seconds, returning
    the number of seconds that it slept
    """
    print("Process %s going to sleep for %s second(s)" % (current_process().pid, nsecs))
    # use the time.sleep function to sleep for nsecs seconds
    time.sleep(nsecs)
    print("Process %s waking up" % current_process().pid)
    return nsecs

if __name__ == "__main__":
    print("Master process is PID %s" % current_process().pid)
    r=[]
    with Pool(3) as pool:
        print("Starting r1 pool")
        r1 = pool.map_async(slow_function, [1,2,3,4,5],callback=r.extend)
        # r1 starts first, finishes last
        
        ## Compare the following:
        # r1.wait() # Uncomment this
        print("Starting r2 pool")
        r2 = pool.map_async(slow_function, [0.5,0.5,0.5,0.5,0.5],callback=r.extend)

        ## With this:
        r1.wait() # Comment out this 
        
        r2.wait()
        print("Result one is %s" % r1.get())
        print("Result two is %s" % r2.get())
        print("Result appended is %s" % r)

#### Asynchronous computation 

Here we encountered "futures" which are computations that may not have completed and therefore may not be available.

Futures are a very common variable type in parallel programming across many languages. Futures provide several common functions;

* Block (wait) until the result is available. In multiprocessing, this is via the .wait() function, e.g. r1.wait() in the above script.
* Retrieve the result when it is available (blocking until it is available). This is the .get() function, e.g. r1.get().
* Test whether or not the result is available. This is the .ready() function, which returns True when the asynchronous function has finished and the result is available via .get().
* Test whether or not the function was a success, e.g. whether or not an exception was raised when running the function. This is the .successful() function, which returns True if the asynchronous function completed without raising an exception. Note that this function should only be called after the result is available (e.g. when .ready() returns True).

## Additional Information

There are additional ways to help parallelisation be efficient. One is the idea of "chunksize", or how many commands get sent to each worker; it is a parameter of starmap/map.