In Python, the class Thread can be extended by a user defined class for multi-threaded applications. However, because of the interpreter lock, a multi-threaded solution can take longer computation times even if the computer has more than one cores on the cpu. The solution is using the multiprocessing library.
Suppose that we have the standard normal distribution function coded in Python:
def normal(x):
return ((1/math.sqrt(2*math.pi)) * math.exp(-0.5 * math.pow(x,2)))
Now we define an integrator function that takes the function, the bounds of the domain and a Queue object for storing the result as parameters:
def Integrator(func, a, b, lstresults):
epsilon = 0.00001
mysum = 0.0
i = float(a)
while(i < b):
mysum += func(i) * epsilon
i += epsilon
lstresults.put(mysum)
The function Integrator simply integrates the given function from a to b. This function uses a single core even the computer has more. However, this operation can be divided into several smaller parts. Now we define a MultipleIntegrator class to distribute these operation into n parts, where n is a user defined integer, optionally equals to number of threads.
class MultipleIntegrator:
def __init__(self):
"Init"
def do(self, func, a, b, cores):
self.cores = cores
integrationrange = (float(b) - float(a)) /float(cores)
self.integrators = [None]*cores
mya = float(a)
allresults = Queue()
result = 0.0
for i in range(0,cores):
self.integrators[i] = Process(target=Integrator, args=(func, mya, mya + integrationrange,allresults,))
self.integrators[i].start()
mya += integrationrange
for myIntegrator in self.integrators:
myIntegrator.join()
while(not allresults.empty()):
result += allresults.get()
return(result)
When we instantiate this class and call the do function on a user defined function, the integration process of func will be shared on several cores.
Lets integrate the normal function from -4 to 4 using 2 threads:
m = MultipleIntegrator()
print(m.do(normal, -10, 10,2))
This result and the computation time is:
1.0000039893377686
real 0m1.762s
user 0m3.384s
sys 0m0.024s
If we increase the number of threads to 4:
m = MultipleIntegrator()
print(m.do(normal, -10, 10,4))
The computation time is reported as:
1.0000039894435513
real 0m1.364s
user 0m5.056s
sys 0m0.028s
which is reduced by the multi-processing. The whole example is given below:
from multiprocessing import Process,Queue
import math
def normal(x):
return ((1/math.sqrt(2*math.pi)) * math.exp(-0.5 * math.pow(x,2)))
def Integrator(func, a, b, lstresults):
epsilon = 0.00001
mysum = 0.0
i = float(a)
while(i < b):
mysum += func(i) * epsilon
i += epsilon
lstresults.put(mysum)
class MultipleIntegrator:
def __init__(self):
"Init"
def do(self, func, a, b, cores):
self.cores = cores
integrationrange = (float(b) - float(a)) /float(cores)
self.integrators = [None]*cores
mya = float(a)
allresults = Queue()
result = 0.0
for i in range(0,cores):
self.integrators[i] = Process(target=Integrator, args=(func, mya, mya + integrationrange,allresults,))
self.integrators[i].start()
mya += integrationrange
for myIntegrator in self.integrators:
myIntegrator.join()
while(not allresults.empty()):
result += allresults.get()
return(result)
m = MultipleIntegrator()
print(m.do(normal, -10, 10,4))
Note that, the standard normal function is a probability density function and can not be integrated to a number bigger than 1. All the results include many floating-point rounding errors. Many corrections can be added to Integrator for rounding issues.
Hope you get fun!
No comments:
Post a Comment
Thanks