Parallel algorithms can divide a problem on several cores. Some
algorithms can easly divide a problem into smaller ones such as matrix calculation and numerical
function integration because divided parts of these operations are
independent each other, that is, integrating a function for
a < x < b and
b < x < c domains equals to integrating the same function for
a < x < c, when the divided numerical integrals are summed up.
In Python, the class
Thread can be extended by a user defined class for multi-threaded applications. However, because of the interpreter lock, a multi-threaded solution can take longer computation times even if the computer has more than one cores on the cpu. The solution is using the
multiprocessing library.
Suppose that we have the standard normal distribution function coded in Python:
def normal(x):
return ((1/math.sqrt(2*math.pi)) * math.exp(-0.5 * math.pow(x,2)))
Now we define an integrator function that takes the function, the bounds of the domain and a Queue object for storing the result as parameters:
def Integrator(func, a, b, lstresults):
epsilon = 0.00001
mysum = 0.0
i = float(a)
while(i < b):
mysum += func(i) * epsilon
i += epsilon
lstresults.put(mysum)
The function
Integrator simply integrates the given function from
a to
b. This function uses a single core even the computer has more. However, this operation can be divided into several smaller parts. Now we define a
MultipleIntegrator class to distribute these operation into
n parts, where
n is a user defined integer, optionally equals to number of threads.
class MultipleIntegrator:
def __init__(self):
"Init"
def do(self, func, a, b, cores):
self.cores = cores
integrationrange = (float(b) - float(a)) /float(cores)
self.integrators = [None]*cores
mya = float(a)
allresults = Queue()
result = 0.0
for i in range(0,cores):
self.integrators[i] = Process(target=Integrator, args=(func, mya, mya + integrationrange,allresults,))
self.integrators[i].start()
mya += integrationrange
for myIntegrator in self.integrators:
myIntegrator.join()
while(not allresults.empty()):
result += allresults.get()
return(result)
When we instantiate this class and call the
do function on a user defined function, the integration process of
func will be shared on several cores.
Lets integrate the
normal function from -4 to 4 using 2 threads:
m = MultipleIntegrator()
print(m.do(normal, -10, 10,2))
This result and the computation time is:
1.0000039893377686
real 0m1.762s
user 0m3.384s
sys 0m0.024s
If we increase the number of threads to 4:
m = MultipleIntegrator()
print(m.do(normal, -10, 10,4))
The computation time is reported as:
1.0000039894435513
real 0m1.364s
user 0m5.056s
sys 0m0.028s
which is reduced by the multi-processing. The whole example is given below:
from multiprocessing import Process,Queue
import math
def normal(x):
return ((1/math.sqrt(2*math.pi)) * math.exp(-0.5 * math.pow(x,2)))
def Integrator(func, a, b, lstresults):
epsilon = 0.00001
mysum = 0.0
i = float(a)
while(i < b):
mysum += func(i) * epsilon
i += epsilon
lstresults.put(mysum)
class MultipleIntegrator:
def __init__(self):
"Init"
def do(self, func, a, b, cores):
self.cores = cores
integrationrange = (float(b) - float(a)) /float(cores)
self.integrators = [None]*cores
mya = float(a)
allresults = Queue()
result = 0.0
for i in range(0,cores):
self.integrators[i] = Process(target=Integrator, args=(func, mya, mya + integrationrange,allresults,))
self.integrators[i].start()
mya += integrationrange
for myIntegrator in self.integrators:
myIntegrator.join()
while(not allresults.empty()):
result += allresults.get()
return(result)
m = MultipleIntegrator()
print(m.do(normal, -10, 10,4))
Note that, the standard normal function is a probability density function and can not be integrated to a number bigger than 1. All the results include many floating-point rounding errors. Many corrections can be added to Integrator for rounding issues.
Hope you get fun!