import escape as esc
# to activate serialization of objects
import escape.serialization as ser
import numpy as np
esc.require("0.9.7")
from escape.utils import show
import matplotlib.pyplot as plt
If the multithreading supported by the kernel is not sufficient for your task, ESCAPE also supports parallelization using a cluster of computers connected via ethernet. In this case, your task will be divided among the computers, and each subtask on each computer will run in parallel, using by default all available CPU cores.
The cluster can also run on a single PC, representing so-called multiprocessing parallelization.
The current implementation of this feature uses the ipyparallel module and each PC on the network must be properly configured as follows:
import escape.serialization
For this notebook, we have prepared two types of kernels: a simple and fast functor with vectorization over large arrays with millions of elements, and a slow functor with relatively small arrays with hundreds of elements.
When cores are divided into subtasks, parts of the arrays are serialized and sent by the controller to the cluster engines before computation. When all subtasks are completed, the resulting arrays are assembled from engines. Serialization and deserialization along with transporting the serialized arrays over the network incurs the cost of performing i/o operations.
Multithreading also has a performance cost due to the time it takes to run a single thread in a process. However, these costs can be reduced by using the so-called threadpool design pattern used by default in ESCAPE.
In this notebook, we compare the computational speeds of multiprocessing and multithreading on a single PC and demonstrate how to run the kernel on multiple PCs. For our tests below, we used a multi-core PC with a 6-core Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz with 64 Gb RAM and a linux operating system. The processor uses hyperthreading, providing 12 threads.
#creating a cluster with six nodes and connect a client
import ipyparallel as ipp
cluster = ipp.Cluster(n=16)
cluster.start_cluster_sync()
rc = cluster.connect_client_sync()
rc.wait_for_engines(16)
The kernel below calculates at each point given by the elements of the x
array the integral of the damped oscillatory function F
over the variable frequency b
with 61 quadrature points.
The results are stored in the y
array.
The integration is adaptive, and the number of iterations required to achieve convergence differs from point to point. Thus, we expect some threads or processes to be completed earlier, and the total computation time for the whole range of values will be equal to the slowest parallel subtask.
X = esc.var("X")
b = esc.par("b", 5)
a = esc.par("a", 100 )
c = esc.par("c", 0.5)
F = a*esc.pow(esc.cos(X*b), 2.0)*esc.exp(-c*X)
I = esc.integral(F, b, 0, 500, numpoints=61, epsabs=1e-5, epsrel=1e-5, maxiter=300)
#linspace return view of an array,
#thus making a copy of it to avoid copying by the kernel
x = np.linspace(-10, 10, 500, dtype=float).copy()
y = np.zeros(x.shape, dtype=float)
show(F, coordinates=x, title = 'F(X; a, b, c)')
show(I, coordinates=x, title = 'I(X; a, c)')
# computation times for cluster and multithreading
def compute_parallel(func, x, y):
#multithreading
nth = np.arange(1, 17, 1, dtype=int)
tms_mth = np.zeros(shape=nth.shape, dtype=float)
for i, n in enumerate(nth):
k = esc.kernel("Parallel threads", func, numthreads=n)
t = %timeit -o -q k(x, y)
tms_mth[i] = t.average*1000
#multiprocessing single threads per node
nrc1 = np.arange(1, 17, 1, dtype=int)
tms_rc_1 = np.zeros(shape=nrc1.shape, dtype=float)
for i, n in enumerate(nrc1):
k = esc.kernel("Cluster kernel", func, multithreaded=False, rc=rc, rc_ids=range(0, n))
t = %timeit -o -q k(x, y)
tms_rc_1[i] = t.average*1000
#multiprocessing two threads per node
nrc2 = np.arange(1, 9, 1, dtype=int)
tms_rc_2 = np.zeros(shape=nrc2.shape, dtype=float)
for i, n in enumerate(nrc2):
k = esc.kernel("Cluster kernel", func, numthreads=2, rc=rc, rc_ids=range(0, n))
t = %timeit -o -q k(x, y)
tms_rc_2[i] = t.average*1000
#multiprocessing three threads per node
nrc3 = np.arange(1, 6, 1, dtype=int)
tms_rc_3 = np.zeros(shape=nrc3.shape, dtype=float)
for i, n in enumerate(nrc3):
k = esc.kernel("Cluster kernel", func, numthreads=3, rc=rc, rc_ids=range(0, n))
t = %timeit -o -q k(x, y)
tms_rc_3[i] = t.average*1000
return ((nth, tms_mth), (nrc1, tms_rc_1), (nrc2*2, tms_rc_2), (nrc3*3, tms_rc_3))
def plot_data(data, title=""):
fig = plt.figure()
fig.suptitle(title)
ax = fig.add_subplot(111)
labels = ["Multithreading",
"Cluster - 1 thread per node",
"Cluster - 2 thread per node",
"Cluster - 3 thread per node",
]
for i in range(0, 4):
ax.plot(data[i][0], data[i][1], 'o--', label=labels[i])
ax.set_xlabel("Number of used threads")
ax.set_ylabel("Computation time [msec]")
ax.legend()
plt.show()
#single core
print("Single core results")
k = esc.kernel("Single kernel", I, multithreaded=False)
t = %timeit -o k(x, y)
data = compute_parallel(I, x, y)
plot_data(data, "Slow function, small arrays (500 items)")
The kernel below calculates functor F
in every point given by array x2
with 5x10⁶ items.
The results are saved in array y2
.
x2 = np.linspace(-10, 10, 5000000, dtype=float).copy()
y2 = np.zeros(x2.shape, dtype=float)
# simple kernel without parallel computation
kf = esc.kernel("Single kernel", F, multithreaded=False)
%timeit kf(x2, y2)
data = compute_parallel(F, x2, y2)
plot_data(data, "Fast function, large arrays (5x10⁶ items)")
In our computational experiments the number of threads was taken to be more than twice the number of physical cores. It is known (https://ppc.cs.aalto.fi/ch3/hyperthreading/) that on processors with hyperthreading we can get up to 30% performance using all available threads. And to some extent we see the same improvements for examples with multithreading.
In example 1, the multiprocessing speed is close to multi-threading, but splitting the array and collecting the results, of course, has its price as we can clearly see in the figure. Using fewer nodes and more threads per node improves the situation, but still the multi-threaded case is a winner.
For large arrays and fast functions (example 2) we see that the multi-threaded case is even more preferable, the serialization and forwarding of large arrays definitely reduces the efficiency of multiprocessing. Using fewer nodes and more threads per node does not greatly improve computational speed. Here, multithreading is also the favorite.
Below we explore parallelization between two PCs, a 6-core ntel(R) Core(TM) i7-8700 CPU @ 3.20GHz and a 4-core Intel(R) Core(TM) i7-2600K CPU @ 3.40GHz. Both are located on the same WiFi network.
We run a controller listening for all ip addresses on the first PC using the command ipcontroller --ip="0.0.0.0"
. This will also create configuration files
usually located at ~/.ipython/profile_default/security/
, ipcontroller-engine.json
for engines and ipcontroller-client.json
for clients.
These files must be copied to the same location on all computers running engines and clients.
After copying the configuration files, we can start the engines on our PCs by typing the ipengine
command.
Read the documentation at https://ipyparallel.readthedocs.io/en/latest/tutorial/process.html#general-considerations for details and other possibilities for the cluster configuration.
Below we run the same examples as above, with the slow integral functor I
and the fast functor F
for different sizes of the input array, starting with 500
elements and ending with 50000
.
#connecting to the engines
rc = ipp.Client()
rc.wait_for_engines(2)
#kernels for testing
#the fragmentation of arrays has been slightly corrected compared to the `rc_fragments='auto'`
kc_I = esc.kernel("Cluster kernel", I, rc=rc, rc_fragments=[0.7, 0.3])
kn_I = esc.kernel("Multithreading kernel", I)
kc_F = esc.kernel("Cluster kernel", F, rc=rc, rc_fragments=[0.7, 0.3])
kn_F = esc.kernel("Multithreading kernel", F)
tmsc_I, tmsn_I = [], []
tmsc_F, tmsn_F = [], []
for n in range(1, 100, 10):
x = np.linspace(-10, 10, 500*n, dtype=float).copy()
y = np.zeros(x.shape, dtype=float)
tc_I = %timeit -o -q -n 2 kc_I(x, y)
tn_I = %timeit -o -q -n 2 kn_I(x, y)
tc_F = %timeit -o -q -n 2 kc_F(x, y)
tn_F = %timeit -o -q -n 2 kn_F(x, y)
tmsc_I.append(tc_I.average*1000)
tmsn_I.append(tn_I.average*1000)
tmsc_F.append(tc_F.average*1000)
tmsn_F.append(tn_F.average*1000)
n = np.arange(1, 100, 10)*500
fig = plt.figure(figsize=(9, 4))
ax = fig.add_subplot(121)
ax.set_title("Slow function")
ax.plot(n, tmsc_I, 'o--', label="Cluster")
ax.plot(n, tmsn_I, 'o--', label="Multithreaded")
ax.set_xlabel("Length of input array")
ax.set_ylabel("Computation time [msec]")
ax.legend()
ax = fig.add_subplot(122)
ax.set_title("Fast function")
ax.plot(n, tmsc_F, 'o--', label="Cluster")
ax.plot(n, tmsn_F, 'o--', label="Multithreaded")
ax.set_xlabel("Length of input array")
ax.set_ylabel("Computation time [msec]")
ax.legend()
plt.show()
The results demonstrate that it is possible to get performance gain for a relatively slow function only. For the fast function the multithreading is preferable even for the large arrays. The splitting of arrays between nodes prior the computation of the fast function and collecting the results afterwards takes too long compared to the computation itself and doesn't give any advantage here.