import escape as esc
# to activate serialization of objects
import escape.serialization as ser
import numpy as np
esc.require("0.9.8")
Loading material database from /home/dkor/Data/Development/workspace_escape/escape-core/python/src/escape/scattering/../data/mdb/materials.db
Cluster kernel. Parallel computing using networked computers.¶
If the multithreading supported by the kernel is not sufficient for your task, ESCAPE also supports parallelization using a cluster of computers connected via ethernet. In this case, your task will be divided among the computers, and each subtask on each computer will run in parallel, using by default all available CPU cores.
The cluster can also run on a single PC, representing so-called multiprocessing parallelization.
The current implementation of this feature uses the ipyparallel module and each PC on the network must be properly configured as follows:
- Install the same version of ESCAPE on all PCs along with the ipyparallel module, if it is not already installed.
- Read the documentation at https://ipyparallel.readthedocs.io/en/latest/tutorial/process.html#general-considerations to configure the cluster correctly. In this example, we use a cluster that normally runs on localhost. In order to distribute tasks among multiple PCs, you will have to configure and run the controller and engines yourself.
- In your notebook, import the serialization module as follows
import escape.serialization
For this notebook, we have prepared two types of kernels: a simple and fast functor with vectorization over large arrays with millions of elements, and a slow functor with relatively small arrays with hundreds of elements.
When cores are divided into subtasks, parts of the arrays are serialized and sent by the controller to the cluster engines before computation. When all subtasks are completed, the resulting arrays are assembled from engines. Serialization and deserialization along with transporting the serialized arrays over the network incurs the cost of performing i/o operations.
Multithreading also has a performance cost due to the time it takes to run a single thread in a process. However, these costs can be reduced by using the so-called threadpool design pattern used by default in ESCAPE.
In this notebook, we compare the computational speeds of multiprocessing and multithreading on a single PC and demonstrate how to run the kernel on multiple PCs. For our tests below, we used a multi-core PC with a 6-core Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz with 64 Gb RAM and a linux operating system. The processor uses hyperthreading, providing 12 threads.
# creating a cluster with six nodes and connect a client
import ipyparallel as ipp
cluster = ipp.Cluster(n=16)
cluster.start_cluster_sync()
rc = cluster.connect_client_sync()
rc.wait_for_engines(16)
Starting 16 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Example 1. "Slow" functor. Small arrays.¶
The kernel below calculates at each point given by the elements of the x
array the integral of the damped oscillatory function F
over the variable frequency b
with 61 quadrature points.
The results are stored in the y
array.
The integration is adaptive, and the number of iterations required to achieve convergence differs from point to point. Thus, we expect some threads or processes to be completed earlier, and the total computation time for the whole range of values will be equal to the slowest parallel subtask.
X = esc.var("X")
b = esc.par("b", 5)
a = esc.par("a", 100)
c = esc.par("c", 0.5)
F = a * esc.pow(esc.cos(X * b), 2.0) * esc.exp(-c * X)
I = esc.integral(F, b, 0, 500, numpoints=61, epsabs=1e-5, epsrel=1e-5, maxiter=300)
# linspace return view of an array,
# thus making a copy of it to avoid copying by the kernel
x = np.linspace(-10, 10, 500, dtype=float).copy()
y = np.zeros(x.shape, dtype=float)
w1 = F.show(coordinates=x).config(title="F(X; a, b, c)")
w2 = I.show(coordinates=x).config(title="I(X; a, c)")
esc.show(w1, w2)
# computation times for cluster and multithreading
def compute_parallel(func, x, y):
# multithreading
nth = np.arange(1, 17, 1, dtype=int)
tms_mth = np.zeros(shape=nth.shape, dtype=float)
for i, n in enumerate(nth):
k = esc.kernel("Parallel threads", func, numthreads=int(n))
t = %timeit -o -q k(x, y)
tms_mth[i] = t.average * 1000
# multiprocessing single threads per node
nrc1 = np.arange(1, 17, 1, dtype=int)
tms_rc_1 = np.zeros(shape=nrc1.shape, dtype=float)
for i, n in enumerate(nrc1):
k = esc.kernel(
"Cluster kernel", func, multithreaded=False, rc=rc, rc_ids=list(range(0, n))
)
t = %timeit -o -q k(x, y)
tms_rc_1[i] = t.average * 1000
# multiprocessing two threads per node
nrc2 = np.arange(1, 9, 1, dtype=int)
tms_rc_2 = np.zeros(shape=nrc2.shape, dtype=float)
for i, n in enumerate(nrc2):
k = esc.kernel("Cluster kernel", func, numthreads=2, rc=rc, rc_ids=list(range(0, n)))
t = %timeit -o -q k(x, y)
tms_rc_2[i] = t.average * 1000
# multiprocessing three threads per node
nrc3 = np.arange(1, 6, 1, dtype=int)
tms_rc_3 = np.zeros(shape=nrc3.shape, dtype=float)
for i, n in enumerate(nrc3):
k = esc.kernel("Cluster kernel", func, numthreads=3, rc=rc, rc_ids=list(range(0, n)))
t = %timeit -o -q k(x, y)
tms_rc_3[i] = t.average * 1000
return (
(nth, tms_mth),
(nrc1, tms_rc_1),
(nrc2 * 2, tms_rc_2),
(nrc3 * 3, tms_rc_3),
)
def plot_data(data, title="", ylog=False):
import matplotlib.pyplot as plt
fig = plt.figure()
fig.suptitle(title)
ax = fig.add_subplot(111)
labels = [
"Multithreading",
"Cluster - 1 thread per node",
"Cluster - 2 thread per node",
"Cluster - 3 thread per node",
]
for i in range(0, 4):
ax.plot(data[i][0], data[i][1], "o--", label=labels[i])
if ylog:
ax.set_yscale('log')
ax.set_xlabel("Number of used threads")
ax.set_ylabel("Computation time [msec]")
ax.legend()
plt.show()
# single core
print("Single core results")
k = esc.kernel("Single kernel", I, multithreaded=False)
t = %timeit -o k(x, y)
Single core results 545 ms ± 12.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
data = compute_parallel(I, x, y)
plot_data(data, "Slow function, small arrays (500 items)")
Example 2. "Fast" functor, large arrays.¶
The kernel below calculates functor F
in every point given by array x2
with 5x10⁶ items.
The results are saved in array y2
.
x2 = np.linspace(-10, 10, 5000000, dtype=float).copy()
y2 = np.zeros(x2.shape, dtype=float)
# simple kernel without parallel computation
kf = esc.kernel("Single kernel", F, multithreaded=False)
%timeit kf(x2, y2)
396 ms ± 3.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
data = compute_parallel(F, x2, y2)
plot_data(data, "Fast function, large arrays (5x10⁶ items)")
Example 3. "Slow" functor, large arrays.¶
x3 = np.linspace(-10, 10, 50000, dtype=float).copy()
y3 = np.zeros(x3.shape, dtype=float)
# simple kernel without parallel computation
kis = esc.kernel("Single kernel", I, multithreaded=False)
%timeit kis(x3, y3)
53.9 s ± 656 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
data = compute_parallel(I, x3, y3)
plot_data(data, "Slow function, large arrays (50000 items)", ylog=True)
Interpretation of the results¶
In our computational experiments the number of threads was taken to be more than twice the number of physical cores. It is known (https://ppc.cs.aalto.fi/ch3/hyperthreading/) that on processors with hyperthreading we can get up to 30% performance using all available threads. And to some extent we see the same improvements for examples with multithreading.
In example 1, the multiprocessing speed is close to multi-threading, but splitting the array and collecting the results, of course, has its price as we can clearly see in the figure. Using fewer nodes and more threads per node improves the situation, but still the multi-threaded case is a winner.
For large arrays and fast functions (example 2) we see that the multi-threaded case is even more preferable, the serialization and forwarding of large arrays definitely reduces the efficiency of multiprocessing. Using fewer nodes and more threads per node does not greatly improve computational speed. Here, multithreading is also preferable.
And finally, when we deal with slow functions and large arrays, the serialization and desrialization of arrays doesn't play any significant role. Most of ther time is spent for the calculations and here we see nearly no difference for all cases.
Cluster with several PCs¶
It is possible to run the code above on several PCs.
You should run a controller listening for all ip addresses on the main PC using the command ipcontroller --ip="0.0.0.0"
. This will also create configuration files
usually located at ~/.ipython/profile_default/security/
, ipcontroller-engine.json
for engines and ipcontroller-client.json
for clients.
These files must be copied to the same location on all computers running engines and clients.
After copying the configuration files, you can start the engines on our PCs by typing the ipengine
command.
Read the documentation at https://ipyparallel.readthedocs.io/en/latest/tutorial/process.html#general-considerations for details and other possibilities for the cluster configuration.